Source code for epygram.untied.taylorism.schedulers
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Contains classes for Schedulers.
Among a set of instructions to be passed to a Worker, and according to its own
criteria, the Scheduler determine at the current moment the ones that can be
launched right now simultaneously, and those that must be delayed.
A scheduler hence basically has one method:
launchable(pending_instructions, workers, report).
Its parameters (constant among execution) can be attributed in its constructor.
Other quantities, variables among execution, must be available within
*workers* (work being done) and *report* (work done).
A set of basic schedulers is given.
"""
[docs]class BaseScheduler(object):
"""Abstract class."""
[docs] def launchable(self, pending_instructions, workers, report):
"""
Split *pending_instructions* into "launchable" and "not_yet_launchable"
instructions according to the scheduler own rules.
For that purpose and in a generic manner, the scheduler may need:
- *pending_instructions*: todo
- *workers*: being done
- *report*: done.
"""
raise NotImplementedError('launchable() method must be implemented in \
inheritant classes. (BaseScheduler is abstract).')
[docs]class LaxistScheduler(object):
"""No sorting is done !"""
def launchable(self, pending_instructions, workers, report):
return pending_instructions
[docs]class MaxThreadsScheduler(BaseScheduler):
"""
A basic scheduler that dequeue the pending list as long as a maximum number
of tasks/threads is not reached.
"""
import multiprocessing as mpc
def __init__(self, max_threads=mpc.cpu_count() / 2):
"""*max_threads* to be launched simultaneously."""
self.max_threads = max_threads
def launchable(self, pending_instructions, workers, report):
available_threads = self.max_threads - len(workers)
launchable = pending_instructions[0:max(available_threads, 0)]
not_yet_launchable = pending_instructions[max(available_threads, 0):]
return (launchable, not_yet_launchable)
[docs]class MaxMemoryScheduler(BaseScheduler):
"""
A basic scheduler that dequeue the pending list as long as a critical memory
level (according to 'memory' element of workers instructions (in MB) and
total system memory) is not reached.
"""
def __init__(self, max_memory_percentage=0.75, total_system_memory='compute'):
"""
*max_memory_percentage*: max memory level as a percentage of the total
system memory.
*total_system_memory*: total system memory in MB;
if 'compute', computed (Unix only).
"""
import os
if total_system_memory == 'compute':
total_system_memory = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
total_system_memory = float(total_system_memory) / (1024 ** 2.)
self.max_memory = max_memory_percentage * total_system_memory
def launchable(self, pending_instructions, workers, report):
assert all([hasattr(w, 'memory') for w in workers.values()])
used_memory = sum([w.memory for w in workers.values()])
launchable = []
not_yet_launchable = []
for instructions in pending_instructions:
if used_memory + instructions['memory'] < self.max_memory:
launchable.append(instructions)
used_memory += instructions['memory']
else:
not_yet_launchable.append(instructions)
return (launchable, not_yet_launchable)