Source code for epygram.untied.taylorism.schedulers

#!/usr/bin/env python
# -*- coding: utf-8 -*-
Contains classes for Schedulers.

Among a set of instructions to be passed to a Worker, and according to its own
criteria, the Scheduler determine at the current moment the ones that can be
launched right now simultaneously, and those that must be delayed.

A scheduler hence basically has one method:
launchable(pending_instructions, workers, report).

Its parameters (constant among execution) can be attributed in its constructor.
Other quantities, variables among execution, must be available within
*workers* (work being done) and *report* (work done). 

A set of basic schedulers is given.

[docs]class BaseScheduler(object): """Abstract class."""
[docs] def launchable(self, pending_instructions, workers, report): """ Split *pending_instructions* into "launchable" and "not_yet_launchable" instructions according to the scheduler own rules. For that purpose and in a generic manner, the scheduler may need: - *pending_instructions*: todo - *workers*: being done - *report*: done. """ raise NotImplementedError('launchable() method must be implemented in \ inheritant classes. (BaseScheduler is abstract).')
[docs]class LaxistScheduler(object): """No sorting is done !""" def launchable(self, pending_instructions, workers, report): return pending_instructions
[docs]class MaxThreadsScheduler(BaseScheduler): """ A basic scheduler that dequeue the pending list as long as a maximum number of tasks/threads is not reached. """ import multiprocessing as mpc def __init__(self, max_threads=mpc.cpu_count() / 2): """*max_threads* to be launched simultaneously.""" self.max_threads = max_threads def launchable(self, pending_instructions, workers, report): available_threads = self.max_threads - len(workers) launchable = pending_instructions[0:max(available_threads, 0)] not_yet_launchable = pending_instructions[max(available_threads, 0):] return (launchable, not_yet_launchable)
[docs]class MaxMemoryScheduler(BaseScheduler): """ A basic scheduler that dequeue the pending list as long as a critical memory level (according to 'memory' element of workers instructions (in MB) and total system memory) is not reached. """ def __init__(self, max_memory_percentage=0.75, total_system_memory='compute'): """ *max_memory_percentage*: max memory level as a percentage of the total system memory. *total_system_memory*: total system memory in MB; if 'compute', computed (Unix only). """ import os if total_system_memory == 'compute': total_system_memory = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') total_system_memory = float(total_system_memory) / (1024 ** 2.) self.max_memory = max_memory_percentage * total_system_memory def launchable(self, pending_instructions, workers, report): assert all([hasattr(w, 'memory') for w in workers.values()]) used_memory = sum([w.memory for w in workers.values()]) launchable = [] not_yet_launchable = [] for instructions in pending_instructions: if used_memory + instructions['memory'] < self.max_memory: launchable.append(instructions) used_memory += instructions['memory'] else: not_yet_launchable.append(instructions) return (launchable, not_yet_launchable)