#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Framework for parallelisation of tasks.
=======================================
The package works with three levels of subprocessing:\n
- the main program, in which are defined each task "instructions",
and that communicates with the intermediate subprocess
- the intermediate subprocess, that receive instructions sets from the main
program and distributes them to a range of subprocesses
- the working subprocesses, that are defined by their instructions, execute
it and then return to the intermediate process.
This segmentation takes the image of a Boss distributing instructions to
Workers, and getting their report when they finish.
The main program can:\n
- initiate the Boss
- give instructions to the Boss, for them to be executed (by the Workers)
- tell the Boss to send the Workers to work
- tell the Boss to stop the Workers
- inform the Boss that there will not be any more instruction to do
- wait until all instructions have been executed
- get report (interim or final) from the Boss (composed of workers reports)
The Boss (in its subprocess loop):\n
- receives either series of instructions or "control signals" from the main
program.
Control signals concern:\n
- halt/resume execution of instructions
- request for (interim) report
- end of instructions information
- stop signals (letting workers finish current tasks or not)
- collects reports from the workers, with handling of their errors: if an error
occur in a subprocess, all other subprocesses are stopped and the error is
returned to the main program.
- schedule execution of instructions, asking a Scheduler with regards to:\n
- pending instructions (work to do)
- current workers (work being done)
- previous reports (work already done)
(cf. schedulers doc for more details).
The Workers are defined BY their instructions (their attributes), and their
task (what they do with their instructions).
Each worker is an independent subprocess, triggered by the Boss subprocess and
supposed to return it the report of its work when finished.
Use of the module
-----------------
A proper use is to define a custom worker class inheriting from Worker,
with all its necessary instructions as footprints attributes.
Then implement the task to be done in _task() method.
The so-implemented workers will then be "hired"=generated by the Boss as soon
as the instructions given to the Boss meet those of the workers (footprints magics !).
Cf. taylorism.examples for a simple example.
Dependencies
------------
**multiprocessing**
**footprints**
**interrupt** (side-package of **taylorism**)
"""
import multiprocessing as mpc
import uuid
import sys
import traceback
import copy
import os
import footprints
from footprints import FootprintBase, proxy as fpx
import interrupt # because subprocesses must be killable properly
from schedulers import BaseScheduler, MaxThreadsScheduler
interrupt.logger.setLevel('WARNING')
taylorism_log = footprints.loggers.getLogger(__name__)
#: timeout when polling for a Queue/Pipe communication
communications_timeout = 0.01
#################
### FUNCTIONS ###
#################
[docs]def run_as_server(common_instructions, individual_instructions,
scheduler=MaxThreadsScheduler(),
verbose=False):
"""
Build a Boss instance, make him hire workers,
run the workers, and returns the Boss instance.
Be aware that the Boss MUST be told when no more instructions will be
appended, or the subprocess will continue to live alone (until
destruction of the Boss instance).
Args:\n
- *common_instructions*: to be passed to the workers
- *individual_instructions*: to be passed to the workers
- *scheduler*: scheduler to rule scheduling of workers/threads
- *verbose*: is the Boss verbose or not.
"""
boss = Boss(verbose=verbose, scheduler=scheduler)
boss.set_instructions(common_instructions, individual_instructions)
boss.make_them_work()
return boss
[docs]def batch_main(common_instructions, individual_instructions,
scheduler=MaxThreadsScheduler(),
verbose=False):
"""
Run execution of the instructions as a batch process, waiting for all
instructions are finished and finally printing report.
Args and kwargs are those of run_as_server() function.
"""
boss = run_as_server(common_instructions, individual_instructions,
scheduler=scheduler,
verbose=verbose)
with interrupt.SignalInterruptHandler():
try:
boss.wait_till_finished()
report = boss.get_report()
except (Exception, KeyboardInterrupt):
boss.stop_them_working()
boss.wait_till_finished()
raise
else:
print "Report from workers:"
for r in report['workers_report']:
print r['report']
####################
### MAIN CLASSES ###
####################
[docs]class Worker(FootprintBase):
"""
Template for workers.
A Worker is an object supposed to do a task, according to instructions.
The instructions has to be added to footprint attributes in actual classes.
"""
_abstract = True
_collector = ('worker',)
_footprint = dict(
attr=dict(
name=dict(
info="Name of the worker.",
optional=True,
default=None,
access='rwx')
)
)
def __init__(self, *args, **kwargs):
"""Constructor. See its footprint for arguments."""
super(Worker, self).__init__(*args, **kwargs)
if self.name is None:
self.name = str(uuid.uuid4())
self._process = mpc.Process(target=self._work_and_communicate)
def __del__(self):
if hasattr(self, '_process'):
self._process.join(1)
if self._process.is_alive():
self._process.terminate()
[docs] def work(self):
"""Send the Worker to his job."""
self._process.start()
[docs] def bye(self):
"""
Block the Boss until the worker has finished his job.
THIS METHOD SHOULD NEVER BE CALLED BY THE OBJECT HIMSELF !
(WOULD CAUSE A DEADLOCK if called from inside the worker's subprocess)
"""
self._process.join()
[docs] def stop_working(self):
"""Make the worker stop working."""
self._process.terminate()
def _work_and_communicate(self):
"""
Send the Worker to his task, making sure he communicates with
its boss.
From within this method down, everything is done in the subprocess
world !
"""
with interrupt.SignalInterruptHandler():
to_be_sent_back = {'name':self.name, 'report':None}
try:
to_be_sent_back = {'name':self.name, 'report':self._task()}
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.format_exception(exc_type, exc_value, exc_traceback)
to_be_sent_back = {'name':self.name, 'report':e, 'traceback':tb}
finally:
self._messenger.put(to_be_sent_back)
def _task(self, **kwargs):
"""
Actual task of the Worker to be implemented therein.
Return the report to be sent back to the Boss.
"""
raise RuntimeError("this method must be implemented in Worker's inheritant class !")
[docs]class Boss(object):
"""
Template for bosses.
A Boss is an object supposed to order tasks to a series of workers.
Optionally can be attributed to the Boss a *name* and a *verbose*ity
(to report in log, the workers reports).
Also, a *scheduler* can be assigned, to rule the ordering of tasks to
workers.
Custom schedulers can be used, they only need to inherit from
.schedulers.BaseScheduler and implemented launchable() method.
"""
control_signals = {'HALT':'Suspend ordering workers to work until RESUME.',
'RESUME':'Resume loop on pending_instructions/workers.',
'SEND_REPORT':'Send interim report (and continue normally).',
'END':'Terminate all pending work, then Stop listening.\
No new instructions from control will be listened,\
except a STOP*.',
'STOP':'Halt pending work, but let workers finish their\
current work, and then stop listening.',
'STOP_LISTENING':'Stop listening, while workers continue\
their current job.',
'STOP_RIGHTNOW':'Stop workers immediately and stop\
listening.'}
def __init__(self, scheduler=MaxThreadsScheduler(),
name=None,
verbose=False):
assert isinstance(scheduler, BaseScheduler)
self.scheduler = scheduler
self.name = name
self.verbose = verbose
self.workers_messenger = mpc.Queue()
(self.control_messenger_in, self.control_messenger_out) = mpc.Pipe() # in = inside subprocess, out = main
self.control_messenger_out.send(self.control_signals['HALT'])
self._process = mpc.Process(target=self._listen_and_communicate)
self._process.start()
def __del__(self):
if hasattr(self, '_process'):
self._process.join(1)
if self._process.is_alive():
self._process.terminate()
self.control_messenger_in.close()
self.control_messenger_out.close()
self.workers_messenger.close()
[docs] def set_instructions(self, common_instructions={}, individual_instructions={}):
"""
Set instructions to be distributed to workers.
- *common_instructions* are a series of arguments shared by each
worker, to be passed to the Worker factory.
- *individual_instructions* are a series of arguments proper to each
worker, hence all individual instructions must have the same length
"""
# parse instructions
individual_instructions = copy.deepcopy(individual_instructions)
instructions_sets = []
if len(individual_instructions) > 0:
# check their length is homogeneous
indiv_instr_num = len(individual_instructions[individual_instructions.keys()[0]]) # length of first instruction
assert all([len(instr) == indiv_instr_num for instr in individual_instructions.values()]), \
"all *individual_instructions* must have the same length."
# gather common and individual
for _ in range(indiv_instr_num):
instructions = copy.copy(common_instructions)
for k, v in individual_instructions.items():
instructions.update({k:v.pop(0)})
instructions_sets.append(instructions)
# send instructions to control
self.control_messenger_out.send(instructions_sets)
[docs] def make_them_work(self, terminate=False, stop_listening=False):
"""
Order the workers to work.
If *terminate*, no other instructions could be appended later.
If *stop_listening*, alive workers go on their jobs, but they are not
listened to anymore; this is a bit tricky but might be used ?
"""
self.control_messenger_out.send(self.control_signals['RESUME'])
if stop_listening:
self.control_messenger_out.send(self.control_signals['STOP_LISTENING'])
if terminate:
self.terminate()
[docs] def stop_them_working(self):
"""Stop the workers."""
self.control_messenger_out.send(self.control_signals['STOP_RIGHTNOW'])
[docs] def get_report(self, interim=True):
"""
Get report of the work executed.
If *interim*, ask for an interim report if no report is available,
i.e. containing the work done by the time of calling.
"""
received_a_report = self.control_messenger_out.poll
def _getreport():
if received_a_report():
received = self.control_messenger_out.recv()
if isinstance(received['workers_report'], Exception):
taylorism_log.error("Error was catch in subprocesses with traceback:")
sys.stderr.writelines(received['traceback'])
raise received['workers_report']
else:
received = None
return received
# first try to get report
report = _getreport()
if report is None and interim:
self.control_messenger_out.send(self.control_signals['SEND_REPORT'])
while report is None:
report = _getreport()
if not self._process.is_alive():
break
return report
[docs] def end(self):
"""
Ends the listening process once instructions are treated.
MUST BE CALLED (or wait_till_finished) for each Boss to avoid zombies
processes.
"""
self.control_messenger_out.send(self.control_signals['END'])
[docs] def wait_till_finished(self):
"""Block the calling tree until all instructions have been executed."""
self.end()
self._process.join()
# boss subprocess internal methods
##################################
def _listen_and_communicate(self):
"""
Interface routine, to catch exceptions and communicate.
From within this method down, everything is done in the subprocess
world !
"""
with interrupt.SignalInterruptHandler():
try:
(workers_report, pending_instructions) = self._listen()
if len(pending_instructions) == 0:
report = {'workers_report':workers_report,
'status':'finished'}
else:
report = {'workers_report':workers_report,
'status':'pending',
'pending':pending_instructions}
except (Exception, KeyboardInterrupt) as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.format_exception(exc_type, exc_value, exc_traceback)
report = {'workers_report':e, 'traceback':tb}
self.stop_them_working()
finally:
self.control_messenger_in.send(report)
def _listen(self):
"""
Actual listening method, i.e. running subprocess at interface between
main and workers.
Infinite loop:
- A. listen to control, for appending new instructions or control signals
- B. listen to workers, to collect their reports and/or errors
- C. assign work to workers
- D. exit loop if any reason for
"""
workers = {}
pending_instructions = []
report = []
def stop_them_working():
for wname in workers.keys():
workers.pop(wname).stop_working()
def hire_worker(instructions):
w = fpx.worker(**instructions)
if w is None:
raise AttributeError("no adequate Worker was found with these instructions:" + str(instructions))
w._messenger = self.workers_messenger
workers[w.name] = w
return w
halt = False
end = False
stop = False
while True:
# A. listen to control
if self.control_messenger_in.poll(communications_timeout):
control = self.control_messenger_in.recv()
if control in self.control_signals.values():
# received a control signal
if control == self.control_signals['SEND_REPORT']:
self.control_messenger_in.send({'workers_report':report, 'status':'interim'})
elif control == self.control_signals['HALT']:
halt = True
elif control == self.control_signals['RESUME']:
halt = False
elif control in [self.control_signals[k] for k in self.control_signals.keys() if 'STOP' in k] \
or control == self.control_signals['END']:
end = True
if control == self.control_signals['STOP_LISTENING']:
break # leave out the infinite loop
elif control in (self.control_signals['STOP'], self.control_signals['STOP_RIGHTNOW']):
stop = True
if control == self.control_signals['STOP_RIGHTNOW']:
stop_them_working()
else:
# received new instructions
if not end:
# if an END or STOP signal has been received, new instructions are not listened to
if isinstance(control, list):
pending_instructions.extend(control)
elif isinstance(control, dict):
pending_instructions.append(control)
# B. listen to workers
try:
reported = self.workers_messenger.get(timeout=communications_timeout)
except mpc.queues.Empty:
pass
else:
# got a new message from workers !
report.append(reported)
if isinstance(reported['report'], Exception):
# worker got an exception
taylorism_log.error("error encountered with worker " + reported['name'] + " with traceback:")
sys.stderr.writelines(reported['traceback'])
sys.stderr.write("Instructions of guilty worker:\n")
w = [repr(a) + '\n' for a in sorted(workers[reported['name']].footprint_as_dict().items()) if a]
sys.stderr.writelines(w)
stop_them_working()
raise reported['report']
else:
# worker has finished
if self.verbose:
taylorism_log.info(str(reported['report']))
workers.pop(reported['name']).bye()
# C. there is work to do and no STOP signal has been received: re-launch
if len(pending_instructions) > 0 and not stop and not halt:
(launchable, not_yet_launchable) = self.scheduler.launchable(pending_instructions, workers=workers, report=report)
for instructions in launchable:
w = hire_worker(instructions)
w.work()
if self.verbose:
taylorism_log.info(' '.join(['Worker', w.name, 'started.']))
pending_instructions = not_yet_launchable
# D. should we stop now ?
if end and (len(workers) == len(pending_instructions) == 0):
# a STOP signal has been received, all workers are done and no more pending instructions remain:
# we can leave out infinite loop
stop = True
if stop:
break
return (report, pending_instructions)