#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Contains a framework for parallel jobs.
"""
import multiprocessing as mpc
from multiprocessing import queues
import uuid
import sys
import traceback
import copy
import footprints
from footprints import FootprintBase, proxy as fpx
parallel_log = footprints.loggers.getLogger(__name__)
[docs]class Worker(FootprintBase):
"""
Template for workers.
A Worker is an object supposed to do a job, according to instructions.
The instructions has to be added to footprint attributes in actual classes.
Huh... keep calm guys, the validity of this definition is only assumed
in this very context ! Don't lynch me...
"""
_abstract = True
_collector = ('worker',)
_footprint = dict(
attr=dict(
name=dict(
info="Name of the Worker.",
optional=True,
default=uuid.uuid4()),
messenger=dict(
info="Medium in which to send up report and/or encountered errors.",
type=queues.Queue)
)
)
#def __init__(self, *args, **kwargs):
# """Constructor. See its footprint for arguments."""
# super(Worker, self).__init__(*args, **kwargs)
@property
[docs] def ready(self):
"""Is the worker ready to work ?"""
ready = True
if not hasattr(self, '_process'):
ready = False
else:
if self._process.is_alive():
ready = False
return ready
[docs] def change_boss(self, messenger):
"""Change messenger (queue) to the Boss."""
assert isinstance(messenger, queues.Queue)
self._attributes['messenger'] = messenger
[docs] def get_new_instructions(self, **instructions):
"""Get new instructions."""
for k, v in instructions.items():
if k in self._attributes:
self._attributes[k] = v
else:
raise AttributeError("unknown instruction: " + k)
[docs] def get_ready(self, force_to_stop_if_working=False, **kwargs):
"""Prepare the worker to work."""
toreturn = True
if hasattr(self, '_process'):
if force_to_stop_if_working:
self.stop_working()
else:
toreturn = False
self._process = mpc.Process(target=self._work_and_communicate, kwargs=kwargs)
return toreturn
[docs] def work(self):
"""Send the Worker to his job."""
assert self.ready, "Worker not ready to work."
self._process.start()
[docs] def you_are_expected(self):
"""
Block the Boss until the worker has finished his job.
THIS METHOD SHOULD NEVER BE CALLED BY THE OBJECT HIMSELF !
(WOULD CAUSE A DEADLOCK if called from inside the worker's subprocess)
"""
if hasattr(self, '_process'):
self._process.join()
[docs] def stop_working(self):
"""Make the worker stop working."""
if hasattr(self, '_process'):
self._process.terminate()
[docs] def byebye_boss(self):
"""Quit the boss."""
self.messenger.close()
self._attributes['messenger'] = None
def _work_and_communicate(self, **kwargs):
"""
Send the Worker to his task, making sure he communicates with
its boss.
From within this method down, everything is done in the subprocess
world !
"""
to_be_sent_back = {'name':self.name, 'message':None}
try:
to_be_sent_back = {'name':self.name, 'message':self._task(**kwargs)}
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.format_exception(exc_type, exc_value, exc_traceback)
to_be_sent_back = {'name':self.name, 'message':e, 'traceback':tb}
finally:
self.messenger.put(to_be_sent_back)
def _task(self, **kwargs):
"""
Actual task of the Worker to be implemented therein.
Return the data to be sent back to the Boss.
"""
pass
[docs]class Boss(object):
"""
Template for bosses.
A Boss is an object supposed to order a job to a series of workers.
Huh... keep calm guys, the validity of this definition is only assumed
in this very context ! Don't lynch me...
Optionally can be attributed to the Boss a *name* and a *verbose*ity
(designed to report publically, in log, the workers reports).
"""
stop_signals = {'END':'Terminate all pending work, then Stop listening.\
No new instructions from control will be listened,\
except a STOP*.',
'STOP':'Halt pending work, but let workers finish their\
current work, and then stop listening.',
'STOP_LISTENING':'Stop listening, while workers continue\
their current job.',
'STOP_RIGHTNOW':'Stop workers immediately and stop\
listening.'}
def __init__(self, name=str(uuid.uuid4()), verbose=False):
self.name = name
self.verbose = verbose
self.workers_messenger = mpc.Queue()
(self.control_messenger_in, self.control_messenger_out) = mpc.Pipe()
self.workers = {}
self.pending_instructions = []
self.idle_workers = []
@property
def workers_number(self):
return len(self.workers)
[docs] def hire_workers(self, number, common_instructions={}, individual_instructions={}):
"""
Creates and attach a series of Workers to the Boss.
- *common_instructions* are a series of arguments shared by each
worker, to be passed to the Worker factory.
- *individual_instructions* are a series of arguments proper to each
worker, hence:
- each individual instruction must be at least of length *number*
(if longer, the remaining instructions will be distributed to the
workers as soon as they finish individually their job)
- all individual instructions must have the same length
"""
self.pending_instructions.extend(self._parse_instructions(common_instructions, individual_instructions))
dummy_instructions = copy.deepcopy(self.pending_instructions[0])
dummy_instructions.update(common_instructions) # to be sure the common instructions point to the same objects
for _ in range(number):
name = str(uuid.uuid4())
if len(self.pending_instructions) > 0:
instructions = self.pending_instructions.pop(0)
else:
instructions = dummy_instructions
self.idle_workers.append(name)
w = fpx.worker(name=name,
messenger=self.workers_messenger,
**instructions)
self.workers[name] = w
[docs] def assign_pending_instructions(self, idle_only=True):
"""
Assign pending instructions to workers.
If *idle_only*, assign only to idle workers.
"""
for w in self.workers.values():
if w.name in self.idle_workers:
self.idle_workers.pop(self.idle_workers.index(w.name))
w.get_new_instructions(**self.pending_instructions.pop(0))
elif not idle_only:
w.get_new_instructions(**self.pending_instructions.pop(0))
def _parse_instructions(self, common_instructions, individual_instructions):
"""Build a set of complete instructions for each worker."""
individual_instructions = copy.deepcopy(individual_instructions)
instructions_sets = []
if len(individual_instructions) > 0:
indiv_instr_num = len(individual_instructions[individual_instructions.keys()[0]])
assert all([len(instr) == indiv_instr_num for instr in individual_instructions.values()]), \
"all *individual_instructions* must have the same length."
for _ in range(indiv_instr_num):
instructions = copy.copy(common_instructions)
for k, v in individual_instructions.items():
instructions.update({k:v.pop(0)})
instructions_sets.append(instructions)
return instructions_sets
[docs] def set_new_instructions(self, common_instructions={}, individual_instructions={}, assign=True, append=True):
"""
Set new instructions to pending list.
It is recommended to set new instructions either and only:
- if no work is being done
- if work is being done and listen()/make_them_work() have been
called with terminate=False. In this case, the new instructions
are appended to pending ones
Else, indeterminate state may occur to your new instructions.
If *assign*, assign instructions to idle workers.
If not *append*, the instructions replace any pending instructions.
In this case and if *assign*, the instructions are assigned to all
workers, not only idle ones.
"""
instructions_sets = self._parse_instructions(common_instructions, individual_instructions)
if hasattr(self, '_process') and self._process.is_alive():
self.control_messenger_out.send(instructions_sets)
else:
if append:
self.pending_instructions.extend(instructions_sets)
else:
self.pending_instructions = instructions_sets
if assign:
self.assign_pending_instructions(idle_only=append)
[docs] def attach_worker(self, worker, instructions={}):
"""
Attach an existing worker to the Boss.
Optionally, new instructions can be given to the worker.
"""
worker.change_boss(messenger=self.workers_messenger)
if instructions != {}:
worker.get_new_instructions(**instructions)
self.workers[worker.name] = worker
[docs] def fire_worker(self, name=None, instructions={}):
"""
Fire a worker, by name or instructions.
If none of these are given, fire a random one.
"""
def _fire_worker_named(myname):
w = self.workers.pop(myname, None)
if myname in self.idle_workers:
self.idle_workers.pop(self.idle_workers.index(myname))
if w is not None:
w.byebye_boss()
if name:
if isinstance(name, list):
for n in name:
_fire_worker_named(n)
else:
_fire_worker_named(name)
elif instructions != {}:
for name in self.workers.keys():
ok = True
for i in instructions.items():
if i[0] not in self.workers[name]._attributes or \
(i[0] in self.workers[name]._attributes and \
i[1] != self.workers[name]._attributes[i[0]]):
ok = False
break
if ok:
_fire_worker_named(name)
else:
if self.workers_number > 0:
n = self.workers.keys()[-1]
_fire_worker_named(n)
[docs] def fire_them_all(self):
"""Empties the workers."""
self.fire_worker(name=copy.copy(self.workers.keys()))
[docs] def make_them_ready(self):
"""Sets the workers ready."""
for w in self.workers.values():
if w.name not in self.idle_workers:
w.get_ready()
[docs] def make_them_work(self,
listen=True,
terminate=True,
block=True,
get_report=True):
"""
Tells the workers to work.
If *listen*, listen after them and get back reports.
*terminate*and *block* are passed to listen().
If *get_report*, then call get_report(block) and return report.
"""
for w in self.workers.values():
if w.name not in self.idle_workers:
w.work()
if listen:
self.listen(terminate=terminate, block=block)
if get_report:
return self.get_report(block=block)
[docs] def listen(self, terminate=True, block=True):
"""
Listen to both workers and new instructions.
If *terminate*, program to stop listening when there are no more pending
instructions and workers have finished.
If *block*, block calling process until being sure that the workers
have terminated.
"""
self._process = mpc.Process(target=self._listen_and_communicate,
kwargs={'pending_instructions':self.pending_instructions,
'idle_workers':self.idle_workers})
self._process.start()
if terminate:
self.stop_listening(signal=self.stop_signals['END'], block=block)
[docs] def stop_listening(self, signal=stop_signals['END'], block=True):
"""
Stop listening, and eventually stop the workers immediately.
*signal* has to be one of Boss.stop_signals.values().
If *block*, halt until being sure that the workers have terminated.
"""
assert signal in (Boss.stop_signals.values())
self.control_messenger_out.send(signal)
if block:
for w in self.workers.values():
w.you_are_expected()
self.idle_workers = self.workers.keys()
def _listen_and_communicate(self, **kwargs):
"""
Interface routine, to catch exceptions and communicate.
From within this method down, everything is done in the subprocess
world !
"""
try:
(report, pending_instructions) = self._listen(**kwargs)
to_be_sent_back = {'report':report, 'pending_instructions':pending_instructions}
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.format_exception(exc_type, exc_value, exc_traceback)
to_be_sent_back = {'report':e, 'traceback':tb}
self.stop_them_working()
finally:
self.control_messenger_in.send(to_be_sent_back)
def _listen(self, pending_instructions, idle_workers):
"""
Actual listening method:
- listen to any new instructions to be dispatched to workers
- listen to workers reports
"""
block = True
timeout = 0.01
end = False
stop = False
report = []
while True:
if self.control_messenger_in.poll(timeout):
control = self.control_messenger_in.recv()
if control in self.stop_signals.values():
if control == self.stop_signals['STOP_LISTENING']:
break
end = True
if control in (self.stop_signals['STOP'], self.stop_signals['STOP_RIGHTNOW']):
pending_instructions = []
if control == self.stop_signals['STOP_RIGHTNOW']:
self.stop_them_working()
idle_workers = self.workers.keys()
stop = True
else:
pending_instructions.extend(control)
try:
reported = self.workers_messenger.get(block=block, timeout=timeout)
except mpc.queues.Empty:
# there still work to do and an idle worker, make him work
if not stop and len(pending_instructions) > 0 and len(idle_workers) > 0:
worker_name = idle_workers.pop(0)
self.workers[worker_name].get_new_instructions(**pending_instructions.pop(0))
self.workers[worker_name].get_ready()
self.workers[worker_name].work()
else: # got a new message from workers !
report.append(reported)
if not isinstance(reported['message'], Exception):
if self.verbose:
parallel_log.info(str(reported['message']))
# there is still work to do, send him back to work with new instructions
if not stop and len(pending_instructions) > 0:
self.workers[reported['name']].get_new_instructions(**pending_instructions.pop(0))
self.workers[reported['name']].get_ready()
self.workers[reported['name']].work()
else:
idle_workers.append(reported['name'])
elif isinstance(reported['message'], Exception): # if a Worker encounter an exception
parallel_log.error("error encountered with worker " + reported['name'] + " with traceback:")
sys.stderr.writelines(reported['traceback'])
sys.stderr.write("Instructions of guilty worker:\n")
w = [repr(a) + '\n' for a in sorted(self.workers[reported['name']].footprint_as_dict().items())]
sys.stderr.writelines(w)
self.stop_them_working()
raise reported['message']
if end and len(idle_workers) == self.workers_number:
break
return (report, pending_instructions)
[docs] def get_report(self, block=True):
"""
Get report of the work executed and
updates the object from the work done in subprocess.
If *block*, block the calling tree until a report is received.
"""
def _getreport():
report = self.control_messenger_out.poll()
if report:
received = self.control_messenger_out.recv()
report = received['report']
if not isinstance(report, Exception):
self.pending_instructions = received['pending_instructions']
self.idle_workers = self.workers.keys()
else:
parallel_log.error("Error was catch in subprocesses with traceback:")
sys.stderr.writelines(received['traceback'])
raise report
return report
report = _getreport()
if block:
if not report and hasattr(self, '_process') and self._process.is_alive():
while not report:
report = _getreport()
if not self._process.is_alive():
break
return report
[docs] def stop_them_working(self, get_report=False):
"""Stop the workers and get report."""
for w in self.workers.values():
w.stop_working()
if get_report:
report = self.get_report()
return report
def __del__(self):
self.control_messenger_in.close()
self.control_messenger_out.close()
self.workers_messenger.close()