diff options
Diffstat (limited to 'meta/lib/oe/utils.py')
-rw-r--r-- | meta/lib/oe/utils.py | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py index 6aed6dc993..753b577555 100644 --- a/meta/lib/oe/utils.py +++ b/meta/lib/oe/utils.py @@ -1,4 +1,6 @@ import subprocess +import multiprocessing +import traceback def read_file(filename): try: @@ -280,6 +282,74 @@ def multiprocess_exec(commands, function): return results +# For each item in items, call the function 'target' with item as the first +# argument, extraargs as the other arguments and handle any exceptions in the +# parent thread +def multiprocess_launch(target, items, d, extraargs=None): + + class ProcessLaunch(multiprocessing.Process): + def __init__(self, *args, **kwargs): + multiprocessing.Process.__init__(self, *args, **kwargs) + self._pconn, self._cconn = multiprocessing.Pipe() + self._exception = None + self._result = None + + def run(self): + try: + ret = self._target(*self._args, **self._kwargs) + self._cconn.send((None, ret)) + except Exception as e: + tb = traceback.format_exc() + self._cconn.send((e, tb)) + + def update(self): + if self._pconn.poll(): + (e, tb) = self._pconn.recv() + if e is not None: + self._exception = (e, tb) + else: + self._result = tb + + @property + def exception(self): + self.update() + return self._exception + + @property + def result(self): + self.update() + return self._result + + max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1) + launched = [] + errors = [] + results = [] + items = list(items) + while (items and not errors) or launched: + if not errors and items and len(launched) < max_process: + args = (items.pop(),) + if extraargs is not None: + args = args + extraargs + p = ProcessLaunch(target=target, args=args) + p.start() + launched.append(p) + for q in launched: + # The finished processes are joined when calling is_alive() + if not q.is_alive(): + if q.exception: + errors.append(q.exception) + if q.result: + results.append(q.result) + launched.remove(q) + # Paranoia doesn't hurt + for p in launched: + p.join() + if errors: + for (e, tb) in errors: + bb.error(str(tb)) + bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above") + return results + def squashspaces(string): import re return re.sub("\s+", " ", string).strip() |