Source code for taylorism

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Framework for parallelisation of tasks.
=======================================

The package works with three levels of subprocessing:\n
- the main program, in which are defined each task "instructions",
  and that communicates with the intermediate subprocess
- the intermediate subprocess, that receive instructions sets from the main
  program and distributes them to a range of subprocesses
- the working subprocesses, that are defined by their instructions, execute
  it and then return to the intermediate process.
  
This segmentation takes the image of a Boss distributing instructions to
Workers, and getting their report when they finish.

The main program can:\n
- initiate the Boss
- give instructions to the Boss, for them to be executed (by the Workers)
- tell the Boss to send the Workers to work
- tell the Boss to stop the Workers
- inform the Boss that there will not be any more instruction to do
- wait until all instructions have been executed
- get report (interim or final) from the Boss (composed of workers reports)

The Boss (in its subprocess loop):\n
- receives either series of instructions or "control signals" from the main
  program.
  Control signals concern:\n
  - halt/resume execution of instructions
  - request for (interim) report
  - end of instructions information
  - stop signals (letting workers finish current tasks or not) 
- collects reports from the workers, with handling of their errors: if an error
  occur in a subprocess, all other subprocesses are stopped and the error is
  returned to the main program.
- schedule execution of instructions, asking a Scheduler with regards to:\n
  - pending instructions (work to do)
  - current workers (work being done)
  - previous reports (work already done)
  (cf. schedulers doc for more details).

The Workers are defined BY their instructions (their attributes), and their
task (what they do with their instructions).
Each worker is an independent subprocess, triggered by the Boss subprocess and
supposed to return it the report of its work when finished.


Use of the module
-----------------

A proper use is to define a custom worker class inheriting from Worker,
with all its necessary instructions as footprints attributes.
Then implement the task to be done in _task() method.

The so-implemented workers will then be "hired"=generated by the Boss as soon
as the instructions given to the Boss meet those of the workers (footprints magics !).

Cf. taylorism.examples for a simple example.


Dependencies
------------
**multiprocessing**

**footprints**

**interrupt** (side-package of **taylorism**)
"""

import multiprocessing as mpc
import uuid
import sys
import traceback
import copy
import os

import footprints
from footprints import FootprintBase, proxy as fpx

import interrupt  # because subprocesses must be killable properly

from schedulers import BaseScheduler, MaxThreadsScheduler

interrupt.logger.setLevel('WARNING')
taylorism_log = footprints.loggers.getLogger(__name__)

#: timeout when polling for a Queue/Pipe communication
communications_timeout = 0.01


#################
### FUNCTIONS ###
#################
[docs]def run_as_server(common_instructions, individual_instructions, scheduler=MaxThreadsScheduler(), verbose=False): """ Build a Boss instance, make him hire workers, run the workers, and returns the Boss instance. Be aware that the Boss MUST be told when no more instructions will be appended, or the subprocess will continue to live alone (until destruction of the Boss instance). Args:\n - *common_instructions*: to be passed to the workers - *individual_instructions*: to be passed to the workers - *scheduler*: scheduler to rule scheduling of workers/threads - *verbose*: is the Boss verbose or not. """ boss = Boss(verbose=verbose, scheduler=scheduler) boss.set_instructions(common_instructions, individual_instructions) boss.make_them_work() return boss
[docs]def batch_main(common_instructions, individual_instructions, scheduler=MaxThreadsScheduler(), verbose=False): """ Run execution of the instructions as a batch process, waiting for all instructions are finished and finally printing report. Args and kwargs are those of run_as_server() function. """ boss = run_as_server(common_instructions, individual_instructions, scheduler=scheduler, verbose=verbose) with interrupt.SignalInterruptHandler(): try: boss.wait_till_finished() report = boss.get_report() except (Exception, KeyboardInterrupt): boss.stop_them_working() boss.wait_till_finished() raise else: print "Report from workers:" for r in report['workers_report']: print r['report'] #################### ### MAIN CLASSES ### ####################
[docs]class Worker(FootprintBase): """ Template for workers. A Worker is an object supposed to do a task, according to instructions. The instructions has to be added to footprint attributes in actual classes. """ _abstract = True _collector = ('worker',) _footprint = dict( attr=dict( name=dict( info="Name of the worker.", optional=True, default=None, access='rwx') ) ) def __init__(self, *args, **kwargs): """Constructor. See its footprint for arguments.""" super(Worker, self).__init__(*args, **kwargs) if self.name is None: self.name = str(uuid.uuid4()) self._process = mpc.Process(target=self._work_and_communicate) def __del__(self): if hasattr(self, '_process'): self._process.join(1) if self._process.is_alive(): self._process.terminate()
[docs] def work(self): """Send the Worker to his job.""" self._process.start()
[docs] def bye(self): """ Block the Boss until the worker has finished his job. THIS METHOD SHOULD NEVER BE CALLED BY THE OBJECT HIMSELF ! (WOULD CAUSE A DEADLOCK if called from inside the worker's subprocess) """ self._process.join()
[docs] def stop_working(self): """Make the worker stop working.""" self._process.terminate()
def _work_and_communicate(self): """ Send the Worker to his task, making sure he communicates with its boss. From within this method down, everything is done in the subprocess world ! """ with interrupt.SignalInterruptHandler(): to_be_sent_back = {'name':self.name, 'report':None} try: to_be_sent_back = {'name':self.name, 'report':self._task()} except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() tb = traceback.format_exception(exc_type, exc_value, exc_traceback) to_be_sent_back = {'name':self.name, 'report':e, 'traceback':tb} finally: self._messenger.put(to_be_sent_back) def _task(self, **kwargs): """ Actual task of the Worker to be implemented therein. Return the report to be sent back to the Boss. """ raise RuntimeError("this method must be implemented in Worker's inheritant class !")
[docs]class Boss(object): """ Template for bosses. A Boss is an object supposed to order tasks to a series of workers. Optionally can be attributed to the Boss a *name* and a *verbose*ity (to report in log, the workers reports). Also, a *scheduler* can be assigned, to rule the ordering of tasks to workers. Custom schedulers can be used, they only need to inherit from .schedulers.BaseScheduler and implemented launchable() method. """ control_signals = {'HALT':'Suspend ordering workers to work until RESUME.', 'RESUME':'Resume loop on pending_instructions/workers.', 'SEND_REPORT':'Send interim report (and continue normally).', 'END':'Terminate all pending work, then Stop listening.\ No new instructions from control will be listened,\ except a STOP*.', 'STOP':'Halt pending work, but let workers finish their\ current work, and then stop listening.', 'STOP_LISTENING':'Stop listening, while workers continue\ their current job.', 'STOP_RIGHTNOW':'Stop workers immediately and stop\ listening.'} def __init__(self, scheduler=MaxThreadsScheduler(), name=None, verbose=False): assert isinstance(scheduler, BaseScheduler) self.scheduler = scheduler self.name = name self.verbose = verbose self.workers_messenger = mpc.Queue() (self.control_messenger_in, self.control_messenger_out) = mpc.Pipe() # in = inside subprocess, out = main self.control_messenger_out.send(self.control_signals['HALT']) self._process = mpc.Process(target=self._listen_and_communicate) self._process.start() def __del__(self): if hasattr(self, '_process'): self._process.join(1) if self._process.is_alive(): self._process.terminate() self.control_messenger_in.close() self.control_messenger_out.close() self.workers_messenger.close()
[docs] def set_instructions(self, common_instructions={}, individual_instructions={}): """ Set instructions to be distributed to workers. - *common_instructions* are a series of arguments shared by each worker, to be passed to the Worker factory. - *individual_instructions* are a series of arguments proper to each worker, hence all individual instructions must have the same length """ # parse instructions individual_instructions = copy.deepcopy(individual_instructions) instructions_sets = [] if len(individual_instructions) > 0: # check their length is homogeneous indiv_instr_num = len(individual_instructions[individual_instructions.keys()[0]]) # length of first instruction assert all([len(instr) == indiv_instr_num for instr in individual_instructions.values()]), \ "all *individual_instructions* must have the same length." # gather common and individual for _ in range(indiv_instr_num): instructions = copy.copy(common_instructions) for k, v in individual_instructions.items(): instructions.update({k:v.pop(0)}) instructions_sets.append(instructions) # send instructions to control self.control_messenger_out.send(instructions_sets)
[docs] def make_them_work(self, terminate=False, stop_listening=False): """ Order the workers to work. If *terminate*, no other instructions could be appended later. If *stop_listening*, alive workers go on their jobs, but they are not listened to anymore; this is a bit tricky but might be used ? """ self.control_messenger_out.send(self.control_signals['RESUME']) if stop_listening: self.control_messenger_out.send(self.control_signals['STOP_LISTENING']) if terminate: self.terminate()
[docs] def stop_them_working(self): """Stop the workers.""" self.control_messenger_out.send(self.control_signals['STOP_RIGHTNOW'])
[docs] def get_report(self, interim=True): """ Get report of the work executed. If *interim*, ask for an interim report if no report is available, i.e. containing the work done by the time of calling. """ received_a_report = self.control_messenger_out.poll def _getreport(): if received_a_report(): received = self.control_messenger_out.recv() if isinstance(received['workers_report'], Exception): taylorism_log.error("Error was catch in subprocesses with traceback:") sys.stderr.writelines(received['traceback']) raise received['workers_report'] else: received = None return received # first try to get report report = _getreport() if report is None and interim: self.control_messenger_out.send(self.control_signals['SEND_REPORT']) while report is None: report = _getreport() if not self._process.is_alive(): break return report
[docs] def end(self): """ Ends the listening process once instructions are treated. MUST BE CALLED (or wait_till_finished) for each Boss to avoid zombies processes. """ self.control_messenger_out.send(self.control_signals['END'])
[docs] def wait_till_finished(self): """Block the calling tree until all instructions have been executed.""" self.end() self._process.join() # boss subprocess internal methods ##################################
def _listen_and_communicate(self): """ Interface routine, to catch exceptions and communicate. From within this method down, everything is done in the subprocess world ! """ with interrupt.SignalInterruptHandler(): try: (workers_report, pending_instructions) = self._listen() if len(pending_instructions) == 0: report = {'workers_report':workers_report, 'status':'finished'} else: report = {'workers_report':workers_report, 'status':'pending', 'pending':pending_instructions} except (Exception, KeyboardInterrupt) as e: exc_type, exc_value, exc_traceback = sys.exc_info() tb = traceback.format_exception(exc_type, exc_value, exc_traceback) report = {'workers_report':e, 'traceback':tb} self.stop_them_working() finally: self.control_messenger_in.send(report) def _listen(self): """ Actual listening method, i.e. running subprocess at interface between main and workers. Infinite loop: - A. listen to control, for appending new instructions or control signals - B. listen to workers, to collect their reports and/or errors - C. assign work to workers - D. exit loop if any reason for """ workers = {} pending_instructions = [] report = [] def stop_them_working(): for wname in workers.keys(): workers.pop(wname).stop_working() def hire_worker(instructions): w = fpx.worker(**instructions) if w is None: raise AttributeError("no adequate Worker was found with these instructions:" + str(instructions)) w._messenger = self.workers_messenger workers[w.name] = w return w halt = False end = False stop = False while True: # A. listen to control if self.control_messenger_in.poll(communications_timeout): control = self.control_messenger_in.recv() if control in self.control_signals.values(): # received a control signal if control == self.control_signals['SEND_REPORT']: self.control_messenger_in.send({'workers_report':report, 'status':'interim'}) elif control == self.control_signals['HALT']: halt = True elif control == self.control_signals['RESUME']: halt = False elif control in [self.control_signals[k] for k in self.control_signals.keys() if 'STOP' in k] \ or control == self.control_signals['END']: end = True if control == self.control_signals['STOP_LISTENING']: break # leave out the infinite loop elif control in (self.control_signals['STOP'], self.control_signals['STOP_RIGHTNOW']): stop = True if control == self.control_signals['STOP_RIGHTNOW']: stop_them_working() else: # received new instructions if not end: # if an END or STOP signal has been received, new instructions are not listened to if isinstance(control, list): pending_instructions.extend(control) elif isinstance(control, dict): pending_instructions.append(control) # B. listen to workers try: reported = self.workers_messenger.get(timeout=communications_timeout) except mpc.queues.Empty: pass else: # got a new message from workers ! report.append(reported) if isinstance(reported['report'], Exception): # worker got an exception taylorism_log.error("error encountered with worker " + reported['name'] + " with traceback:") sys.stderr.writelines(reported['traceback']) sys.stderr.write("Instructions of guilty worker:\n") w = [repr(a) + '\n' for a in sorted(workers[reported['name']].footprint_as_dict().items()) if a] sys.stderr.writelines(w) stop_them_working() raise reported['report'] else: # worker has finished if self.verbose: taylorism_log.info(str(reported['report'])) workers.pop(reported['name']).bye() # C. there is work to do and no STOP signal has been received: re-launch if len(pending_instructions) > 0 and not stop and not halt: (launchable, not_yet_launchable) = self.scheduler.launchable(pending_instructions, workers=workers, report=report) for instructions in launchable: w = hire_worker(instructions) w.work() if self.verbose: taylorism_log.info(' '.join(['Worker', w.name, 'started.'])) pending_instructions = not_yet_launchable # D. should we stop now ? if end and (len(workers) == len(pending_instructions) == 0): # a STOP signal has been received, all workers are done and no more pending instructions remain: # we can leave out infinite loop stop = True if stop: break return (report, pending_instructions)