aboutsummaryrefslogtreecommitdiffstats
path: root/meta/lib/oe/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oe/utils.py')
-rw-r--r--meta/lib/oe/utils.py70
1 files changed, 70 insertions, 0 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 6aed6dc993..753b577555 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -1,4 +1,6 @@
import subprocess
+import multiprocessing
+import traceback
def read_file(filename):
try:
@@ -280,6 +282,74 @@ def multiprocess_exec(commands, function):
return results
+# For each item in items, call the function 'target' with item as the first
+# argument, extraargs as the other arguments and handle any exceptions in the
+# parent thread
+def multiprocess_launch(target, items, d, extraargs=None):
+
+ class ProcessLaunch(multiprocessing.Process):
+ def __init__(self, *args, **kwargs):
+ multiprocessing.Process.__init__(self, *args, **kwargs)
+ self._pconn, self._cconn = multiprocessing.Pipe()
+ self._exception = None
+ self._result = None
+
+ def run(self):
+ try:
+ ret = self._target(*self._args, **self._kwargs)
+ self._cconn.send((None, ret))
+ except Exception as e:
+ tb = traceback.format_exc()
+ self._cconn.send((e, tb))
+
+ def update(self):
+ if self._pconn.poll():
+ (e, tb) = self._pconn.recv()
+ if e is not None:
+ self._exception = (e, tb)
+ else:
+ self._result = tb
+
+ @property
+ def exception(self):
+ self.update()
+ return self._exception
+
+ @property
+ def result(self):
+ self.update()
+ return self._result
+
+ max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1)
+ launched = []
+ errors = []
+ results = []
+ items = list(items)
+ while (items and not errors) or launched:
+ if not errors and items and len(launched) < max_process:
+ args = (items.pop(),)
+ if extraargs is not None:
+ args = args + extraargs
+ p = ProcessLaunch(target=target, args=args)
+ p.start()
+ launched.append(p)
+ for q in launched:
+ # The finished processes are joined when calling is_alive()
+ if not q.is_alive():
+ if q.exception:
+ errors.append(q.exception)
+ if q.result:
+ results.append(q.result)
+ launched.remove(q)
+ # Paranoia doesn't hurt
+ for p in launched:
+ p.join()
+ if errors:
+ for (e, tb) in errors:
+ bb.error(str(tb))
+ bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above")
+ return results
+
def squashspaces(string):
import re
return re.sub("\s+", " ", string).strip()