taylorism — Framework for parallelisation of tasks

Framework for parallelisation of tasks.

The package works with three levels of subprocessing:

  • the main program, in which are defined each task “instructions”, and that communicates with the intermediate subprocess
  • the intermediate subprocess, that receive instructions sets from the main program and distributes them to a range of subprocesses
  • the working subprocesses, that are defined by their instructions, execute it and then return to the intermediate process.

This segmentation takes the image of a Boss distributing instructions to Workers, and getting their report when they finish.

The main program can:

  • initiate the Boss
  • give instructions to the Boss, for them to be executed (by the Workers)
  • tell the Boss to send the Workers to work
  • tell the Boss to stop the Workers
  • inform the Boss that there will not be any more instruction to do
  • wait until all instructions have been executed
  • get report (interim or final) from the Boss (composed of workers reports)

The Boss (in its subprocess loop):

  • receives either series of instructions or “control signals” from the main program. Control signals concern:

    • halt/resume execution of instructions
    • request for (interim) report
    • end of instructions information
    • stop signals (letting workers finish current tasks or not)
  • collects reports from the workers, with handling of their errors: if an error occur in a subprocess, all other subprocesses are stopped and the error is returned to the main program.

  • schedule execution of instructions, asking a Scheduler with regards to:

    • pending instructions (work to do)
    • current workers (work being done)
    • previous reports (work already done)

    (cf. schedulers doc for more details).

The Workers are defined BY their instructions (their attributes), and their task (what they do with their instructions). Each worker is an independent subprocess, triggered by the Boss subprocess and supposed to return it the report of its work when finished.

Use of the module

A proper use is to define a custom worker class inheriting from Worker, with all its necessary instructions as footprints attributes. Then implement the task to be done in _task() method.

The so-implemented workers will then be “hired”=generated by the Boss as soon as the instructions given to the Boss meet those of the workers (footprints magics !).

Cf. taylorism.examples for a simple example.

Dependencies

footprints (MF package)

opinel (MF package)

Functions

taylorism.run_as_server(common_instructions, individual_instructions, scheduler=<taylorism.schedulers.MaxThreadsScheduler object at 0x7f7af7cf38d0>, verbose=False)[source]

Build a Boss instance, make him hire workers, run the workers, and returns the Boss instance.

Be aware that the Boss MUST be told when no more instructions will be appended, or the subprocess will continue to live alone (until destruction of the Boss instance).

Args:

  • common_instructions: to be passed to the workers
  • individual_instructions: to be passed to the workers
  • scheduler: scheduler to rule scheduling of workers/threads
  • verbose: is the Boss verbose or not.
taylorism.batch_main(common_instructions, individual_instructions, scheduler=<taylorism.schedulers.MaxThreadsScheduler object at 0x7f7af7cf37d0>, verbose=False)[source]

Run execution of the instructions as a batch process, waiting for all instructions are finished and finally printing report.

Args and kwargs are those of run_as_server() function.

Actors

class taylorism.Boss(scheduler=<taylorism.schedulers.MaxThreadsScheduler object at 0x7f7af7fa0cd0>, name=None, verbose=False)[source]

Bases: object

Template for bosses. A Boss is an object supposed to order tasks to a series of workers.

Optionally can be attributed to the Boss a name and a verboseity (to report in log, the workers reports).

Also, a scheduler can be assigned, to rule the ordering of tasks to workers. Custom schedulers can be used, they only need to inherit from .schedulers.BaseScheduler and implemented launchable() method.

end()[source]

Ends the listening process once instructions are treated. MUST BE CALLED (or wait_till_finished) for each Boss to avoid zombies processes.

get_report(interim=True)[source]

Get report of the work executed. If interim, ask for an interim report if no report is available, i.e. containing the work done by the time of calling.

make_them_work(terminate=False, stop_listening=False)[source]

Order the workers to work. If terminate, no other instructions could be appended later. If stop_listening, alive workers go on their jobs, but they are not listened to anymore; this is a bit tricky but might be used ?

set_instructions(common_instructions={}, individual_instructions={})[source]

Set instructions to be distributed to workers.

  • common_instructions are a series of arguments shared by each worker, to be passed to the Worker factory.
  • individual_instructions are a series of arguments proper to each worker, hence all individual instructions must have the same length
stop_them_working()[source]

Stop the workers.

wait_till_finished()[source]

Block the calling tree until all instructions have been executed.

class taylorism.Worker(*args, **kwargs)[source]

Bases: footprints.FootprintBase

Template for workers. A Worker is an object supposed to do a task, according to instructions. The instructions has to be added to footprint attributes in actual classes.

Footprint:

dict(
    attr = dict(
        name = dict(
            access = 'rwx', 
            alias = set([]), 
            default = None, 
            info = 'Name of the worker.', 
            optional = True, 
            outcast = set([]), 
            remap = dict(), 
            values = set([]),
        ),
    ), 
    bind = [], 
    info = 'Not documented', 
    only = dict(), 
    priority = dict(
        level = footprints.priorities.PriorityLevel::DEFAULT,
    ),
)
bye()[source]

Block the Boss until the worker has finished his job. THIS METHOD SHOULD NEVER BE CALLED BY THE OBJECT HIMSELF ! (WOULD CAUSE A DEADLOCK if called from inside the worker’s subprocess)

stop_working()[source]

Make the worker stop working.

work()[source]

Send the Worker to his job.

Schedulers

Contains classes for Schedulers.

Among a set of instructions to be passed to a Worker, and according to its own criteria, the Scheduler determine at the current moment the ones that can be launched right now simultaneously, and those that must be delayed.

A scheduler hence basically has one method: launchable(pending_instructions, workers, report).

Its parameters (constant among execution) can be attributed in its constructor. Other quantities, variables among execution, must be available within workers (work being done) and report (work done).

A set of basic schedulers is given.

class taylorism.schedulers.BaseScheduler[source]

Bases: object

Abstract class.

launchable(pending_instructions, workers, report)[source]

Split pending_instructions into “launchable” and “not_yet_launchable” instructions according to the scheduler own rules.

For that purpose and in a generic manner, the scheduler may need:

  • pending_instructions: todo
  • workers: being done
  • report: done.
class taylorism.schedulers.LaxistScheduler[source]

Bases: taylorism.schedulers.BaseScheduler

No sorting is done !

class taylorism.schedulers.MaxThreadsScheduler(max_threads=2)[source]

Bases: taylorism.schedulers.BaseScheduler

A basic scheduler that dequeue the pending list as long as a maximum number of simultaneous tasks (max_threads) is not reached.

class taylorism.schedulers.MaxMemoryScheduler(max_memory_percentage=0.75, total_system_memory='compute')[source]

Bases: taylorism.schedulers.BaseScheduler

A basic scheduler that dequeue the pending list as long as a critical memory level (according to ‘memory’ element of workers instructions (in MB) and total system memory) is not reached.

Examples

Basic examples of how to use the module.

A more advanced example of use can be found in epygram‘s epy_conv.py tool.

class taylorism.examples.Sleeper(*args, **kwargs)[source]

Sample worker for tutorial or debugging purpose, that sleeps a given time.

The over-loading of __init__() is not mandatory, but a possibility.

Footprint:

dict(
    attr = dict(
        name = dict(
            access = 'rwx', 
            alias = set([]), 
            default = None, 
            info = 'Name of the worker.', 
            optional = True, 
            outcast = set([]), 
            remap = dict(), 
            values = set([]),
        ), 
        sleeping_time = dict(
            access = 'rxx', 
            alias = set([]), 
            default = None, 
            info = 'Sleeping time in s.', 
            optional = False, 
            outcast = set([]), 
            remap = dict(), 
            type = float, 
            values = set([1.0, 2.0, 3.0, 5.0, 10.0, 15.0, 60.0, 30.0]),
        ), 
        wakeup_sentence = dict(
            access = 'rwx', 
            alias = set([]), 
            default = None, 
            info = 'What to say after sleep.', 
            optional = True, 
            outcast = set([]), 
            remap = dict(), 
            type = str, 
            values = set([]),
        ),
    ), 
    bind = [], 
    info = 'Sleeps.', 
    only = dict(), 
    priority = dict(
        level = footprints.priorities.PriorityLevel::DEFAULT,
    ),
)
_task()[source]

Actual task of the Sleeper is implemented therein. Return the report to be sent back to the Boss.

taylorism.examples.sleepers_example_program(verbose=True)[source]

Example: how to run and control the Boss.