Source code for epygram.untied.parallel

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Contains a framework for parallel jobs.
"""

import multiprocessing as mpc
from multiprocessing import queues
import uuid
import sys
import traceback
import copy

import footprints
from footprints import FootprintBase, proxy as fpx

parallel_log = footprints.loggers.getLogger(__name__)



[docs]class Worker(FootprintBase): """ Template for workers. A Worker is an object supposed to do a job, according to instructions. The instructions has to be added to footprint attributes in actual classes. Huh... keep calm guys, the validity of this definition is only assumed in this very context ! Don't lynch me... """ _abstract = True _collector = ('worker',) _footprint = dict( attr=dict( name=dict( info="Name of the Worker.", optional=True, default=uuid.uuid4()), messenger=dict( info="Medium in which to send up report and/or encountered errors.", type=queues.Queue) ) ) #def __init__(self, *args, **kwargs): # """Constructor. See its footprint for arguments.""" # super(Worker, self).__init__(*args, **kwargs) @property
[docs] def ready(self): """Is the worker ready to work ?""" ready = True if not hasattr(self, '_process'): ready = False else: if self._process.is_alive(): ready = False return ready
[docs] def change_boss(self, messenger): """Change messenger (queue) to the Boss.""" assert isinstance(messenger, queues.Queue) self._attributes['messenger'] = messenger
[docs] def get_new_instructions(self, **instructions): """Get new instructions.""" for k, v in instructions.items(): if k in self._attributes: self._attributes[k] = v else: raise AttributeError("unknown instruction: " + k)
[docs] def get_ready(self, force_to_stop_if_working=False, **kwargs): """Prepare the worker to work.""" toreturn = True if hasattr(self, '_process'): if force_to_stop_if_working: self.stop_working() else: toreturn = False self._process = mpc.Process(target=self._work_and_communicate, kwargs=kwargs) return toreturn
[docs] def work(self): """Send the Worker to his job.""" assert self.ready, "Worker not ready to work." self._process.start()
[docs] def you_are_expected(self): """ Block the Boss until the worker has finished his job. THIS METHOD SHOULD NEVER BE CALLED BY THE OBJECT HIMSELF ! (WOULD CAUSE A DEADLOCK if called from inside the worker's subprocess) """ if hasattr(self, '_process'): self._process.join()
[docs] def stop_working(self): """Make the worker stop working.""" if hasattr(self, '_process'): self._process.terminate()
[docs] def byebye_boss(self): """Quit the boss.""" self.messenger.close() self._attributes['messenger'] = None
def _work_and_communicate(self, **kwargs): """ Send the Worker to his task, making sure he communicates with its boss. From within this method down, everything is done in the subprocess world ! """ to_be_sent_back = {'name':self.name, 'message':None} try: to_be_sent_back = {'name':self.name, 'message':self._task(**kwargs)} except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() tb = traceback.format_exception(exc_type, exc_value, exc_traceback) to_be_sent_back = {'name':self.name, 'message':e, 'traceback':tb} finally: self.messenger.put(to_be_sent_back) def _task(self, **kwargs): """ Actual task of the Worker to be implemented therein. Return the data to be sent back to the Boss. """ pass
[docs]class Boss(object): """ Template for bosses. A Boss is an object supposed to order a job to a series of workers. Huh... keep calm guys, the validity of this definition is only assumed in this very context ! Don't lynch me... Optionally can be attributed to the Boss a *name* and a *verbose*ity (designed to report publically, in log, the workers reports). """ stop_signals = {'END':'Terminate all pending work, then Stop listening.\ No new instructions from control will be listened,\ except a STOP*.', 'STOP':'Halt pending work, but let workers finish their\ current work, and then stop listening.', 'STOP_LISTENING':'Stop listening, while workers continue\ their current job.', 'STOP_RIGHTNOW':'Stop workers immediately and stop\ listening.'} def __init__(self, name=str(uuid.uuid4()), verbose=False): self.name = name self.verbose = verbose self.workers_messenger = mpc.Queue() (self.control_messenger_in, self.control_messenger_out) = mpc.Pipe() self.workers = {} self.pending_instructions = [] self.idle_workers = [] @property def workers_number(self): return len(self.workers)
[docs] def hire_workers(self, number, common_instructions={}, individual_instructions={}): """ Creates and attach a series of Workers to the Boss. - *common_instructions* are a series of arguments shared by each worker, to be passed to the Worker factory. - *individual_instructions* are a series of arguments proper to each worker, hence: - each individual instruction must be at least of length *number* (if longer, the remaining instructions will be distributed to the workers as soon as they finish individually their job) - all individual instructions must have the same length """ self.pending_instructions.extend(self._parse_instructions(common_instructions, individual_instructions)) dummy_instructions = copy.deepcopy(self.pending_instructions[0]) dummy_instructions.update(common_instructions) # to be sure the common instructions point to the same objects for _ in range(number): name = str(uuid.uuid4()) if len(self.pending_instructions) > 0: instructions = self.pending_instructions.pop(0) else: instructions = dummy_instructions self.idle_workers.append(name) w = fpx.worker(name=name, messenger=self.workers_messenger, **instructions) self.workers[name] = w
[docs] def assign_pending_instructions(self, idle_only=True): """ Assign pending instructions to workers. If *idle_only*, assign only to idle workers. """ for w in self.workers.values(): if w.name in self.idle_workers: self.idle_workers.pop(self.idle_workers.index(w.name)) w.get_new_instructions(**self.pending_instructions.pop(0)) elif not idle_only: w.get_new_instructions(**self.pending_instructions.pop(0))
def _parse_instructions(self, common_instructions, individual_instructions): """Build a set of complete instructions for each worker.""" individual_instructions = copy.deepcopy(individual_instructions) instructions_sets = [] if len(individual_instructions) > 0: indiv_instr_num = len(individual_instructions[individual_instructions.keys()[0]]) assert all([len(instr) == indiv_instr_num for instr in individual_instructions.values()]), \ "all *individual_instructions* must have the same length." for _ in range(indiv_instr_num): instructions = copy.copy(common_instructions) for k, v in individual_instructions.items(): instructions.update({k:v.pop(0)}) instructions_sets.append(instructions) return instructions_sets
[docs] def set_new_instructions(self, common_instructions={}, individual_instructions={}, assign=True, append=True): """ Set new instructions to pending list. It is recommended to set new instructions either and only: - if no work is being done - if work is being done and listen()/make_them_work() have been called with terminate=False. In this case, the new instructions are appended to pending ones Else, indeterminate state may occur to your new instructions. If *assign*, assign instructions to idle workers. If not *append*, the instructions replace any pending instructions. In this case and if *assign*, the instructions are assigned to all workers, not only idle ones. """ instructions_sets = self._parse_instructions(common_instructions, individual_instructions) if hasattr(self, '_process') and self._process.is_alive(): self.control_messenger_out.send(instructions_sets) else: if append: self.pending_instructions.extend(instructions_sets) else: self.pending_instructions = instructions_sets if assign: self.assign_pending_instructions(idle_only=append)
[docs] def attach_worker(self, worker, instructions={}): """ Attach an existing worker to the Boss. Optionally, new instructions can be given to the worker. """ worker.change_boss(messenger=self.workers_messenger) if instructions != {}: worker.get_new_instructions(**instructions) self.workers[worker.name] = worker
[docs] def fire_worker(self, name=None, instructions={}): """ Fire a worker, by name or instructions. If none of these are given, fire a random one. """ def _fire_worker_named(myname): w = self.workers.pop(myname, None) if myname in self.idle_workers: self.idle_workers.pop(self.idle_workers.index(myname)) if w is not None: w.byebye_boss() if name: if isinstance(name, list): for n in name: _fire_worker_named(n) else: _fire_worker_named(name) elif instructions != {}: for name in self.workers.keys(): ok = True for i in instructions.items(): if i[0] not in self.workers[name]._attributes or \ (i[0] in self.workers[name]._attributes and \ i[1] != self.workers[name]._attributes[i[0]]): ok = False break if ok: _fire_worker_named(name) else: if self.workers_number > 0: n = self.workers.keys()[-1] _fire_worker_named(n)
[docs] def fire_them_all(self): """Empties the workers.""" self.fire_worker(name=copy.copy(self.workers.keys()))
[docs] def make_them_ready(self): """Sets the workers ready.""" for w in self.workers.values(): if w.name not in self.idle_workers: w.get_ready()
[docs] def make_them_work(self, listen=True, terminate=True, block=True, get_report=True): """ Tells the workers to work. If *listen*, listen after them and get back reports. *terminate*and *block* are passed to listen(). If *get_report*, then call get_report(block) and return report. """ for w in self.workers.values(): if w.name not in self.idle_workers: w.work() if listen: self.listen(terminate=terminate, block=block) if get_report: return self.get_report(block=block)
[docs] def listen(self, terminate=True, block=True): """ Listen to both workers and new instructions. If *terminate*, program to stop listening when there are no more pending instructions and workers have finished. If *block*, block calling process until being sure that the workers have terminated. """ self._process = mpc.Process(target=self._listen_and_communicate, kwargs={'pending_instructions':self.pending_instructions, 'idle_workers':self.idle_workers}) self._process.start() if terminate: self.stop_listening(signal=self.stop_signals['END'], block=block)
[docs] def stop_listening(self, signal=stop_signals['END'], block=True): """ Stop listening, and eventually stop the workers immediately. *signal* has to be one of Boss.stop_signals.values(). If *block*, halt until being sure that the workers have terminated. """ assert signal in (Boss.stop_signals.values()) self.control_messenger_out.send(signal) if block: for w in self.workers.values(): w.you_are_expected() self.idle_workers = self.workers.keys()
def _listen_and_communicate(self, **kwargs): """ Interface routine, to catch exceptions and communicate. From within this method down, everything is done in the subprocess world ! """ try: (report, pending_instructions) = self._listen(**kwargs) to_be_sent_back = {'report':report, 'pending_instructions':pending_instructions} except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() tb = traceback.format_exception(exc_type, exc_value, exc_traceback) to_be_sent_back = {'report':e, 'traceback':tb} self.stop_them_working() finally: self.control_messenger_in.send(to_be_sent_back) def _listen(self, pending_instructions, idle_workers): """ Actual listening method: - listen to any new instructions to be dispatched to workers - listen to workers reports """ block = True timeout = 0.01 end = False stop = False report = [] while True: if self.control_messenger_in.poll(timeout): control = self.control_messenger_in.recv() if control in self.stop_signals.values(): if control == self.stop_signals['STOP_LISTENING']: break end = True if control in (self.stop_signals['STOP'], self.stop_signals['STOP_RIGHTNOW']): pending_instructions = [] if control == self.stop_signals['STOP_RIGHTNOW']: self.stop_them_working() idle_workers = self.workers.keys() stop = True else: pending_instructions.extend(control) try: reported = self.workers_messenger.get(block=block, timeout=timeout) except mpc.queues.Empty: # there still work to do and an idle worker, make him work if not stop and len(pending_instructions) > 0 and len(idle_workers) > 0: worker_name = idle_workers.pop(0) self.workers[worker_name].get_new_instructions(**pending_instructions.pop(0)) self.workers[worker_name].get_ready() self.workers[worker_name].work() else: # got a new message from workers ! report.append(reported) if not isinstance(reported['message'], Exception): if self.verbose: parallel_log.info(str(reported['message'])) # there is still work to do, send him back to work with new instructions if not stop and len(pending_instructions) > 0: self.workers[reported['name']].get_new_instructions(**pending_instructions.pop(0)) self.workers[reported['name']].get_ready() self.workers[reported['name']].work() else: idle_workers.append(reported['name']) elif isinstance(reported['message'], Exception): # if a Worker encounter an exception parallel_log.error("error encountered with worker " + reported['name'] + " with traceback:") sys.stderr.writelines(reported['traceback']) sys.stderr.write("Instructions of guilty worker:\n") w = [repr(a) + '\n' for a in sorted(self.workers[reported['name']].footprint_as_dict().items())] sys.stderr.writelines(w) self.stop_them_working() raise reported['message'] if end and len(idle_workers) == self.workers_number: break return (report, pending_instructions)
[docs] def get_report(self, block=True): """ Get report of the work executed and updates the object from the work done in subprocess. If *block*, block the calling tree until a report is received. """ def _getreport(): report = self.control_messenger_out.poll() if report: received = self.control_messenger_out.recv() report = received['report'] if not isinstance(report, Exception): self.pending_instructions = received['pending_instructions'] self.idle_workers = self.workers.keys() else: parallel_log.error("Error was catch in subprocesses with traceback:") sys.stderr.writelines(received['traceback']) raise report return report report = _getreport() if block: if not report and hasattr(self, '_process') and self._process.is_alive(): while not report: report = _getreport() if not self._process.is_alive(): break return report
[docs] def stop_them_working(self, get_report=False): """Stop the workers and get report.""" for w in self.workers.values(): w.stop_working() if get_report: report = self.get_report() return report
def __del__(self): self.control_messenger_in.close() self.control_messenger_out.close() self.workers_messenger.close()