summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/bitbake-worker358
-rw-r--r--lib/bb/cache.py6
-rw-r--r--lib/bb/cookerdata.py18
-rw-r--r--lib/bb/event.py13
-rw-r--r--lib/bb/runqueue.py284
-rw-r--r--lib/bb/siggen.py3
6 files changed, 492 insertions, 190 deletions
diff --git a/bin/bitbake-worker b/bin/bitbake-worker
new file mode 100755
index 000000000..8edf8dd65
--- /dev/null
+++ b/bin/bitbake-worker
@@ -0,0 +1,358 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import warnings
+sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
+from bb import fetch2
+import logging
+import bb
+import select
+import errno
+
+# Users shouldn't be running this code directly
+if len(sys.argv) != 2 or sys.argv[1] != "decafbad":
+ print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
+ sys.exit(1)
+
+logger = logging.getLogger("BitBake")
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+ bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
+
+
+worker_pipe = sys.stdout.fileno()
+bb.utils.nonblockingfd(worker_pipe)
+
+handler = bb.event.LogHandler()
+logger.addHandler(handler)
+
+if 0:
+ # Code to write out a log file of all events passing through the worker
+ logfilename = "/tmp/workerlogfile"
+ format_str = "%(levelname)s: %(message)s"
+ conlogformat = bb.msg.BBLogFormatter(format_str)
+ consolelog = logging.FileHandler(logfilename)
+ bb.msg.addDefaultlogFilter(consolelog)
+ consolelog.setFormatter(conlogformat)
+ logger.addHandler(consolelog)
+
+worker_queue = ""
+
+def worker_fire(event, d):
+ data = "<event>" + pickle.dumps(event) + "</event>"
+ worker_fire_prepickled(data)
+
+def worker_fire_prepickled(event):
+ global worker_queue
+
+ worker_queue = worker_queue + event
+ worker_flush()
+
+def worker_flush():
+ global worker_queue, worker_pipe
+
+ if not worker_queue:
+ return
+
+ try:
+ written = os.write(worker_pipe, worker_queue)
+ worker_queue = worker_queue[written:]
+ except (IOError, OSError) as e:
+ if e.errno != errno.EAGAIN:
+ raise
+
+def worker_child_fire(event, d):
+ global worker_pipe
+
+ data = "<event>" + pickle.dumps(event) + "</event>"
+ worker_pipe.write(data)
+
+bb.event.worker_fire = worker_fire
+
+lf = None
+#lf = open("/tmp/workercommandlog", "w+")
+def workerlog_write(msg):
+ if lf:
+ lf.write(msg)
+ lf.flush()
+
+def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, quieterrors=False):
+ # We need to setup the environment BEFORE the fork, since
+ # a fork() or exec*() activates PSEUDO...
+
+ envbackup = {}
+ fakeenv = {}
+ umask = None
+
+ taskdep = workerdata["taskdeps"][fn]
+ if 'umask' in taskdep and taskname in taskdep['umask']:
+ # umask might come in as a number or text string..
+ try:
+ umask = int(taskdep['umask'][taskname],8)
+ except TypeError:
+ umask = taskdep['umask'][taskname]
+
+ if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
+ envvars = (workerdata["fakerootenv"][fn] or "").split()
+ for key, value in (var.split('=') for var in envvars):
+ envbackup[key] = os.environ.get(key)
+ os.environ[key] = value
+ fakeenv[key] = value
+
+ fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
+ for p in fakedirs:
+ bb.utils.mkdirhier(p)
+ logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
+ (fn, taskname, ', '.join(fakedirs)))
+ else:
+ envvars = (workerdata["fakerootnoenv"][fn] or "").split()
+ for key, value in (var.split('=') for var in envvars):
+ envbackup[key] = os.environ.get(key)
+ os.environ[key] = value
+ fakeenv[key] = value
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ try:
+ pipein, pipeout = os.pipe()
+ pipein = os.fdopen(pipein, 'rb', 4096)
+ pipeout = os.fdopen(pipeout, 'wb', 0)
+ pid = os.fork()
+ except OSError as e:
+ bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
+
+ if pid == 0:
+ global worker_pipe
+ pipein.close()
+
+ # Save out the PID so that the event can include it the
+ # events
+ bb.event.worker_pid = os.getpid()
+ bb.event.worker_fire = worker_child_fire
+ worker_pipe = pipeout
+
+ # Make the child the process group leader
+ os.setpgid(0, 0)
+ # No stdin
+ newsi = os.open(os.devnull, os.O_RDWR)
+ os.dup2(newsi, sys.stdin.fileno())
+
+ if umask:
+ os.umask(umask)
+
+ data.setVar("BB_WORKERCONTEXT", "1")
+ bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
+ ret = 0
+ try:
+ the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
+ the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
+ for h in workerdata["hashes"]:
+ the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
+ for h in workerdata["hash_deps"]:
+ the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])
+
+ # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
+ # successfully. We also need to unset anything from the environment which shouldn't be there
+ exports = bb.data.exported_vars(the_data)
+ bb.utils.empty_environment()
+ for e, v in exports:
+ os.environ[e] = v
+ for e in fakeenv:
+ os.environ[e] = fakeenv[e]
+ the_data.setVar(e, fakeenv[e])
+ the_data.setVarFlag(e, 'export', "1")
+
+ if quieterrors:
+ the_data.setVarFlag(taskname, "quieterrors", "1")
+
+ except Exception as exc:
+ if not quieterrors:
+ logger.critical(str(exc))
+ os._exit(1)
+ try:
+ if not cfg.dry_run:
+ ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
+ os._exit(ret)
+ except:
+ os._exit(1)
+ else:
+ for key, value in envbackup.iteritems():
+ if value is None:
+ del os.environ[key]
+ else:
+ os.environ[key] = value
+
+ return pid, pipein, pipeout
+
+class runQueueWorkerPipe():
+ """
+ Abstraction for a pipe between a worker thread and the worker server
+ """
+ def __init__(self, pipein, pipeout):
+ self.input = pipein
+ if pipeout:
+ pipeout.close()
+ bb.utils.nonblockingfd(self.input)
+ self.queue = ""
+
+ def read(self):
+ start = len(self.queue)
+ try:
+ self.queue = self.queue + self.input.read(102400)
+ except (OSError, IOError) as e:
+ if e.errno != errno.EAGAIN:
+ raise
+
+ end = len(self.queue)
+ index = self.queue.find("</event>")
+ while index != -1:
+ worker_fire_prepickled(self.queue[:index+8])
+ self.queue = self.queue[index+8:]
+ index = self.queue.find("</event>")
+ return (end > start)
+
+ def close(self):
+ while self.read():
+ continue
+ if len(self.queue) > 0:
+ print("Warning, worker child left partial message: %s" % self.queue)
+ self.input.close()
+
+normalexit = False
+
+class BitbakeWorker(object):
+ def __init__(self, din):
+ self.input = din
+ bb.utils.nonblockingfd(self.input)
+ self.queue = ""
+ self.cookercfg = None
+ self.databuilder = None
+ self.data = None
+ self.build_pids = {}
+ self.build_pipes = {}
+
+ def serve(self):
+ while True:
+ (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
+ if self.input in ready or len(self.queue):
+ start = len(self.queue)
+ try:
+ self.queue = self.queue + self.input.read()
+ except (OSError, IOError):
+ pass
+ end = len(self.queue)
+ self.handle_item("cookerconfig", self.handle_cookercfg)
+ self.handle_item("workerdata", self.handle_workerdata)
+ self.handle_item("runtask", self.handle_runtask)
+ self.handle_item("finishnow", self.handle_finishnow)
+ self.handle_item("ping", self.handle_ping)
+ self.handle_item("quit", self.handle_quit)
+
+ for pipe in self.build_pipes:
+ self.build_pipes[pipe].read()
+ if len(self.build_pids):
+ self.process_waitpid()
+ worker_flush()
+
+
+ def handle_item(self, item, func):
+ if self.queue.startswith("<" + item + ">"):
+ index = self.queue.find("</" + item + ">")
+ while index != -1:
+ func(self.queue[(len(item) + 2):index])
+ self.queue = self.queue[(index + len(item) + 3):]
+ index = self.queue.find("</" + item + ">")
+
+ def handle_cookercfg(self, data):
+ self.cookercfg = pickle.loads(data)
+ self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
+ self.databuilder.parseBaseConfiguration()
+ self.data = self.databuilder.data
+
+ def handle_workerdata(self, data):
+ self.workerdata = pickle.loads(data)
+ bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
+ bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
+ bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
+ bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
+
+ def handle_ping(self, _):
+ workerlog_write("Handling ping\n")
+
+ logger.warn("Pong from bitbake-worker!")
+
+ def handle_quit(self, data):
+ workerlog_write("Handling quit\n")
+
+ global normalexit
+ normalexit = True
+ sys.exit(0)
+
+ def handle_runtask(self, data):
+ fn, task, taskname, quieterrors, appends = pickle.loads(data)
+ workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
+
+ pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, quieterrors)
+
+ self.build_pids[pid] = task
+ self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
+
+ def process_waitpid(self):
+ """
+ Return none is there are no processes awaiting result collection, otherwise
+ collect the process exit codes and close the information pipe.
+ """
+ try:
+ pid, status = os.waitpid(-1, os.WNOHANG)
+ if pid == 0 or os.WIFSTOPPED(status):
+ return None
+ except OSError:
+ return None
+
+ workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
+
+ if os.WIFEXITED(status):
+ status = os.WEXITSTATUS(status)
+ elif os.WIFSIGNALED(status):
+ # Per shell conventions for $?, when a process exits due to
+ # a signal, we return an exit code of 128 + SIGNUM
+ status = 128 + os.WTERMSIG(status)
+
+ task = self.build_pids[pid]
+ del self.build_pids[pid]
+
+ self.build_pipes[pid].close()
+ del self.build_pipes[pid]
+
+ worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
+
+ def handle_finishnow(self, _):
+ if self.build_pids:
+ logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
+ for k, v in self.build_pids.iteritems():
+ try:
+ os.kill(-k, signal.SIGTERM)
+ os.waitpid(-1, 0)
+ except:
+ pass
+ for pipe in self.build_pipes:
+ self.build_pipes[pipe].read()
+
+try:
+ worker = BitbakeWorker(sys.stdin)
+ worker.serve()
+except BaseException as e:
+ if not normalexit:
+ import traceback
+ sys.stderr.write(traceback.format_exc())
+ sys.stderr.write(str(e))
+while len(worker_queue):
+ worker_flush()
+workerlog_write("exitting")
+sys.exit(0)
+
diff --git a/lib/bb/cache.py b/lib/bb/cache.py
index fb0f40c60..b99fa99cf 100644
--- a/lib/bb/cache.py
+++ b/lib/bb/cache.py
@@ -724,7 +724,6 @@ class CacheData(object):
for info in info_array:
info.add_cacheData(self, fn)
-
class MultiProcessCache(object):
"""
BitBake multi-process cache implementation
@@ -746,13 +745,18 @@ class MultiProcessCache(object):
self.cachefile = os.path.join(cachedir, self.__class__.cache_file_name)
logger.debug(1, "Using cache in '%s'", self.cachefile)
+ glf = bb.utils.lockfile(self.cachefile + ".lock")
+
try:
with open(self.cachefile, "rb") as f:
p = pickle.Unpickler(f)
data, version = p.load()
except:
+ bb.utils.unlockfile(glf)
return
+ bb.utils.unlockfile(glf)
+
if version != self.__class__.CACHE_VERSION:
return
diff --git a/lib/bb/cookerdata.py b/lib/bb/cookerdata.py
index 149878f40..1bed455d1 100644
--- a/lib/bb/cookerdata.py
+++ b/lib/bb/cookerdata.py
@@ -25,7 +25,9 @@
import os, sys
from functools import wraps
import logging
+import bb
from bb import data
+import bb.parse
logger = logging.getLogger("BitBake")
parselog = logging.getLogger("BitBake.Parsing")
@@ -139,6 +141,20 @@ class CookerConfiguration(object):
def setServerRegIdleCallback(self, srcb):
self.server_register_idlecallback = srcb
+ def __getstate__(self):
+ state = {}
+ for key in self.__dict__.keys():
+ if key == "server_register_idlecallback":
+ state[key] = None
+ else:
+ state[key] = getattr(self, key)
+ return state
+
+ def __setstate__(self,state):
+ for k in state:
+ setattr(self, k, state[k])
+
+
def catch_parse_error(func):
"""Exception handling bits for our parsing"""
@wraps(func)
@@ -146,6 +162,8 @@ def catch_parse_error(func):
try:
return func(fn, *args)
except (IOError, bb.parse.ParseError, bb.data_smart.ExpansionError) as exc:
+ import traceback
+ parselog.critical( traceback.format_exc())
parselog.critical("Unable to parse %s: %s" % (fn, exc))
sys.exit(1)
return wrapped
diff --git a/lib/bb/event.py b/lib/bb/event.py
index 2826e3554..d73067fcf 100644
--- a/lib/bb/event.py
+++ b/lib/bb/event.py
@@ -33,11 +33,12 @@ import atexit
import traceback
import bb.utils
import bb.compat
+import bb.exceptions
# This is the pid for which we should generate the event. This is set when
# the runqueue forks off.
worker_pid = 0
-worker_pipe = None
+worker_fire = None
logger = logging.getLogger('BitBake.Event')
@@ -150,20 +151,12 @@ def fire(event, d):
# don't have a datastore so the datastore context isn't a problem.
fire_class_handlers(event, d)
- if worker_pid != 0:
+ if worker_fire:
worker_fire(event, d)
else:
fire_ui_handlers(event, d)
-def worker_fire(event, d):
- data = "<event>" + pickle.dumps(event) + "</event>"
- worker_pipe.write(data)
-
def fire_from_worker(event, d):
- if not event.startswith("<event>") or not event.endswith("</event>"):
- print("Error, not an event %s" % event)
- return
- event = pickle.loads(event[7:-8])
fire_ui_handlers(event, d)
noop = lambda _: None
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 090d1b56a..dd6e071c3 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -28,10 +28,17 @@ import sys
import signal
import stat
import fcntl
+import errno
import logging
import bb
from bb import msg, data, event
from bb import monitordisk
+import subprocess
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
bblogger = logging.getLogger("BitBake")
logger = logging.getLogger("BitBake.RunQueue")
@@ -938,6 +945,10 @@ class RunQueue:
raise
except:
logger.error("An uncaught exception occured in runqueue, please see the failure below:")
+ try:
+ self.rqexe.teardown()
+ except:
+ pass
self.state = runQueueComplete
raise
@@ -979,38 +990,41 @@ class RunQueueExecute:
self.runq_buildable = []
self.runq_running = []
self.runq_complete = []
- self.build_pids = {}
- self.build_pipes = {}
+
self.build_stamps = {}
self.failed_fnids = []
self.stampcache = {}
- def runqueue_process_waitpid(self):
- """
- Return none is there are no processes awaiting result collection, otherwise
- collect the process exit codes and close the information pipe.
- """
- pid, status = os.waitpid(-1, os.WNOHANG)
- if pid == 0 or os.WIFSTOPPED(status):
- return None
-
- if os.WIFEXITED(status):
- status = os.WEXITSTATUS(status)
- elif os.WIFSIGNALED(status):
- # Per shell conventions for $?, when a process exits due to
- # a signal, we return an exit code of 128 + SIGNUM
- status = 128 + os.WTERMSIG(status)
-
- task = self.build_pids[pid]
- del self.build_pids[pid]
-
- self.build_pipes[pid].close()
- del self.build_pipes[pid]
+ logger.debug(1, "Starting bitbake-worker")
+ self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
+ bb.utils.nonblockingfd(self.worker.stdout)
+ self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self)
+
+ workerdata = {
+ "taskdeps" : self.rqdata.dataCache.task_deps,
+ "fakerootenv" : self.rqdata.dataCache.fakerootenv,
+ "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
+ "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
+ "hashes" : self.rqdata.hashes,
+ "hash_deps" : self.rqdata.hash_deps,
+ "sigchecksums" : bb.parse.siggen.file_checksum_values,
+ "runq_hash" : self.rqdata.runq_hash,
+ "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
+ "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
+ "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
+ "logdefaultdomain" : bb.msg.loggerDefaultDomains,
+ }
+
+ self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
+ self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
+ self.worker.stdin.flush()
+
+ def runqueue_process_waitpid(self, task, status):
# self.build_stamps[pid] may not exist when use shared work directory.
- if pid in self.build_stamps:
- del self.build_stamps[pid]
+ if task in self.build_stamps:
+ del self.build_stamps[task]
if status != 0:
self.task_fail(task, status)
@@ -1019,16 +1033,11 @@ class RunQueueExecute:
return True
def finish_now(self):
- if self.stats.active:
- logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
- for k, v in self.build_pids.iteritems():
- try:
- os.kill(-k, signal.SIGTERM)
- os.waitpid(-1, 0)
- except:
- pass
- for pipe in self.build_pipes:
- self.build_pipes[pipe].read()
+
+ self.worker.stdin.write("<finishnow></finishnow>")
+ self.worker.stdin.flush()
+
+ self.teardown()
if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed
@@ -1040,14 +1049,13 @@ class RunQueueExecute:
def finish(self):
self.rq.state = runQueueCleanUp
- for pipe in self.build_pipes:
- self.build_pipes[pipe].read()
-
if self.stats.active > 0:
bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
- self.runqueue_process_waitpid()
+ self.workerpipe.read()
return
+ self.teardown()
+
if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed
return
@@ -1055,115 +1063,6 @@ class RunQueueExecute:
self.rq.state = runQueueComplete
return
- def fork_off_task(self, fn, task, taskname, quieterrors=False):
- # We need to setup the environment BEFORE the fork, since
- # a fork() or exec*() activates PSEUDO...
-
- envbackup = {}
- fakeenv = {}
- umask = None
-
- taskdep = self.rqdata.dataCache.task_deps[fn]
- if 'umask' in taskdep and taskname in taskdep['umask']:
- # umask might come in as a number or text string..
- try:
- umask = int(taskdep['umask'][taskname],8)
- except TypeError:
- umask = taskdep['umask'][taskname]
-
- if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
- envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split()
- for key, value in (var.split('=') for var in envvars):
- envbackup[key] = os.environ.get(key)
- os.environ[key] = value
- fakeenv[key] = value
-
- fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split()
- for p in fakedirs:
- bb.utils.mkdirhier(p)
-
- logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
- (fn, taskname, ', '.join(fakedirs)))
- else:
- envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split()
- for key, value in (var.split('=') for var in envvars):
- envbackup[key] = os.environ.get(key)
- os.environ[key] = value
- fakeenv[key] = value
-
- sys.stdout.flush()
- sys.stderr.flush()
- try:
- pipein, pipeout = os.pipe()
- pipein = os.fdopen(pipein, 'rb', 4096)
- pipeout = os.fdopen(pipeout, 'wb', 0)
- pid = os.fork()
- except OSError as e:
- bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
-
- if pid == 0:
- pipein.close()
-
- # Save out the PID so that the event can include it the
- # events
- bb.event.worker_pid = os.getpid()
- bb.event.worker_pipe = pipeout
-
- self.rq.state = runQueueChildProcess
- # Make the child the process group leader
- os.setpgid(0, 0)
- # No stdin
- newsi = os.open(os.devnull, os.O_RDWR)
- os.dup2(newsi, sys.stdin.fileno())
-
- if umask:
- os.umask(umask)
-
- self.cooker.data.setVar("BB_WORKERCONTEXT", "1")
- bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
- ret = 0
- try:
- the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
- the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
- for h in self.rqdata.hashes:
- the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h])
- for h in self.rqdata.hash_deps:
- the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h])
-
- # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
- # successfully. We also need to unset anything from the environment which shouldn't be there
- exports = bb.data.exported_vars(the_data)
- bb.utils.empty_environment()
- for e, v in exports:
- os.environ[e] = v
- for e in fakeenv:
- os.environ[e] = fakeenv[e]
- the_data.setVar(e, fakeenv[e])
- the_data.setVarFlag(e, 'export', "1")
-
- if quieterrors:
- the_data.setVarFlag(taskname, "quieterrors", "1")
-
- except Exception as exc:
- if not quieterrors:
- logger.critical(str(exc))
- os._exit(1)
- try:
- if not self.cooker.configuration.dry_run:
- profile = self.cooker.configuration.profile
- ret = bb.build.exec_task(fn, taskname, the_data, profile)
- os._exit(ret)
- except:
- os._exit(1)
- else:
- for key, value in envbackup.iteritems():
- if value is None:
- del os.environ[key]
- else:
- os.environ[key] = value
-
- return pid, pipein, pipeout
-
def check_dependencies(self, task, taskdeps, setscene = False):
if not self.rq.depvalidate:
return False
@@ -1184,6 +1083,16 @@ class RunQueueExecute:
valid = bb.utils.better_eval(call, locs)
return valid
+ def teardown(self):
+ logger.debug(1, "Teardown for bitbake-worker")
+ self.worker.stdin.write("<quit></quit>")
+ self.worker.stdin.flush()
+ while self.worker.returncode is None:
+ self.workerpipe.read()
+ self.worker.poll()
+ while self.workerpipe.read():
+ continue
+
class RunQueueExecuteDummy(RunQueueExecute):
def __init__(self, rq):
self.rq = rq
@@ -1275,7 +1184,6 @@ class RunQueueExecuteTasks(RunQueueExecute):
bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
(self.scheduler, ", ".join(obj.name for obj in schedulers)))
-
def get_schedulers(self):
schedulers = set(obj for obj in globals().values()
if type(obj) is type and
@@ -1349,6 +1257,9 @@ class RunQueueExecuteTasks(RunQueueExecute):
Run the tasks in a queue prepared by rqdata.prepare()
"""
+ self.workerpipe.read()
+
+
if self.stats.total == 0:
# nothing to do
self.rq.state = runQueueCleanUp
@@ -1384,23 +1295,20 @@ class RunQueueExecuteTasks(RunQueueExecute):
startevent = runQueueTaskStarted(task, self.stats, self.rq)
bb.event.fire(startevent, self.cfgData)
- pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
+ self.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
+ self.worker.stdin.flush()
- self.build_pids[pid] = task
- self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
- self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
+ self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
self.runq_running[task] = 1
self.stats.taskActive()
if self.stats.active < self.number_tasks:
return True
- for pipe in self.build_pipes:
- self.build_pipes[pipe].read()
-
if self.stats.active > 0:
- if self.runqueue_process_waitpid() is None:
- return 0.5
- return True
+ self.workerpipe.read()
+ return 0.5
+
+ self.teardown()
if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed
@@ -1415,6 +1323,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
if self.runq_complete[task] == 0:
logger.error("Task %s never completed!", task)
self.rq.state = runQueueComplete
+
return True
class RunQueueExecuteScenequeue(RunQueueExecute):
@@ -1428,6 +1337,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
# If we don't have any setscene functions, skip this step
if len(self.rqdata.runq_setscene) == 0:
rq.scenequeue_covered = set()
+ self.teardown()
rq.state = runQueueRunInit
return
@@ -1676,6 +1586,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
Run the tasks in a queue prepared by prepare_runqueue
"""
+ self.workerpipe.read()
+
task = None
if self.stats.active < self.number_tasks:
# Find the next setscene to run
@@ -1716,22 +1628,17 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
bb.event.fire(startevent, self.cfgData)
- pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
+ self.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
+ self.worker.stdin.flush()
- self.build_pids[pid] = task
- self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
self.runq_running[task] = 1
self.stats.taskActive()
if self.stats.active < self.number_tasks:
return True
- for pipe in self.build_pipes:
- self.build_pipes[pipe].read()
-
if self.stats.active > 0:
- if self.runqueue_process_waitpid() is None:
- return 0.5
- return True
+ self.workerpipe.read()
+ return 0.5
# Convert scenequeue_covered task numbers into full taskgraph ids
oldcovered = self.scenequeue_covered
@@ -1745,10 +1652,13 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
self.rq.state = runQueueRunInit
+ self.teardown()
return True
- def fork_off_task(self, fn, task, taskname):
- return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
+ def runqueue_process_waitpid(self, task, status):
+ task = self.rq.rqdata.runq_setscene.index(task)
+
+ RunQueueExecute.runqueue_process_waitpid(self, task, status)
class TaskFailure(Exception):
"""
@@ -1828,25 +1738,43 @@ class runQueuePipe():
"""
Abstraction for a pipe between a worker thread and the server
"""
- def __init__(self, pipein, pipeout, d):
+ def __init__(self, pipein, pipeout, d, rq):
self.input = pipein
- pipeout.close()
+ if pipeout:
+ pipeout.close()
bb.utils.nonblockingfd(self.input)
self.queue = ""
self.d = d
+ self.rq = rq
+
+ def setrunqueue(self, rq):
+ self.rq = rq
def read(self):
start = len(self.queue)
try:
self.queue = self.queue + self.input.read(102400)
- except (OSError, IOError):
- pass
+ except (OSError, IOError) as e:
+ if e.errno != errno.EAGAIN:
+ raise
end = len(self.queue)
- index = self.queue.find("</event>")
- while index != -1:
- bb.event.fire_from_worker(self.queue[:index+8], self.d)
- self.queue = self.queue[index+8:]
+ found = True
+ while found and len(self.queue):
+ found = False
index = self.queue.find("</event>")
+ while index != -1 and self.queue.startswith("<event>"):
+ event = pickle.loads(self.queue[7:index])
+ bb.event.fire_from_worker(event, self.d)
+ found = True
+ self.queue = self.queue[index+8:]
+ index = self.queue.find("</event>")
+ index = self.queue.find("</exitcode>")
+ while index != -1 and self.queue.startswith("<exitcode>"):
+ task, status = pickle.loads(self.queue[10:index])
+ self.rq.runqueue_process_waitpid(task, status)
+ found = True
+ self.queue = self.queue[index+11:]
+ index = self.queue.find("</exitcode>")
return (end > start)
def close(self):
diff --git a/lib/bb/siggen.py b/lib/bb/siggen.py
index 1ff2ecc48..fb8b67850 100644
--- a/lib/bb/siggen.py
+++ b/lib/bb/siggen.py
@@ -201,9 +201,10 @@ class SignatureGeneratorBasic(SignatureGenerator):
#d.setVar("BB_TASKHASH_task-%s" % task, taskhash[task])
return h
- def set_taskdata(self, hashes, deps):
+ def set_taskdata(self, hashes, deps, checksums):
self.runtaskdeps = deps
self.taskhash = hashes
+ self.file_checksum_values = checksums
def dump_sigtask(self, fn, task, stampbase, runtime):
k = fn + "." + task