summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-07 18:11:09 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-12 17:48:51 +0100
commitb2e26f1db28d74f2dd9df8ab4ed3b472503b9a5c (patch)
treece5edd4913e340a5b8f0dbbfb66ed8487c8b4c07 /lib
parentad8bf10d873abb94d987860a3f6d06b134fb8a99 (diff)
downloadbitbake-contrib-b2e26f1db28d74f2dd9df8ab4ed3b472503b9a5c.tar.gz
runqueue: Split runqueue to use bitbake-worker
This is a pretty fundamental change to the way bitbake operates. It splits out the task execution part of runqueue into a completely separately exec'd process called bitbake-worker. This means that the separate process has to build its own datastore and that configuration needs to be passed from the cooker over to the bitbake worker process. Known issues: * Hob is broken with this patch since it writes to the configuration and that configuration isn't preserved in bitbake-worker. * We create a worker for setscene, then a new worker for the main task execution. This is wasteful but shouldn't be hard to fix. * We probably send too much data over to bitbake-worker, need to see if we can streamline it. These are issues which will be followed up in subsequent patches. This patch sets the groundwork for the removal of the double bitbake execution for psuedo which will be in a follow on patch. Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'lib')
-rw-r--r--lib/bb/cache.py6
-rw-r--r--lib/bb/cookerdata.py18
-rw-r--r--lib/bb/event.py13
-rw-r--r--lib/bb/runqueue.py284
-rw-r--r--lib/bb/siggen.py3
5 files changed, 134 insertions, 190 deletions
diff --git a/lib/bb/cache.py b/lib/bb/cache.py
index fb0f40c60..b99fa99cf 100644
--- a/lib/bb/cache.py
+++ b/lib/bb/cache.py
@@ -724,7 +724,6 @@ class CacheData(object):
for info in info_array:
info.add_cacheData(self, fn)
-
class MultiProcessCache(object):
"""
BitBake multi-process cache implementation
@@ -746,13 +745,18 @@ class MultiProcessCache(object):
self.cachefile = os.path.join(cachedir, self.__class__.cache_file_name)
logger.debug(1, "Using cache in '%s'", self.cachefile)
+ glf = bb.utils.lockfile(self.cachefile + ".lock")
+
try:
with open(self.cachefile, "rb") as f:
p = pickle.Unpickler(f)
data, version = p.load()
except:
+ bb.utils.unlockfile(glf)
return
+ bb.utils.unlockfile(glf)
+
if version != self.__class__.CACHE_VERSION:
return
diff --git a/lib/bb/cookerdata.py b/lib/bb/cookerdata.py
index 149878f40..1bed455d1 100644
--- a/lib/bb/cookerdata.py
+++ b/lib/bb/cookerdata.py
@@ -25,7 +25,9 @@
import os, sys
from functools import wraps
import logging
+import bb
from bb import data
+import bb.parse
logger = logging.getLogger("BitBake")
parselog = logging.getLogger("BitBake.Parsing")
@@ -139,6 +141,20 @@ class CookerConfiguration(object):
def setServerRegIdleCallback(self, srcb):
self.server_register_idlecallback = srcb
+ def __getstate__(self):
+ state = {}
+ for key in self.__dict__.keys():
+ if key == "server_register_idlecallback":
+ state[key] = None
+ else:
+ state[key] = getattr(self, key)
+ return state
+
+ def __setstate__(self,state):
+ for k in state:
+ setattr(self, k, state[k])
+
+
def catch_parse_error(func):
"""Exception handling bits for our parsing"""
@wraps(func)
@@ -146,6 +162,8 @@ def catch_parse_error(func):
try:
return func(fn, *args)
except (IOError, bb.parse.ParseError, bb.data_smart.ExpansionError) as exc:
+ import traceback
+ parselog.critical( traceback.format_exc())
parselog.critical("Unable to parse %s: %s" % (fn, exc))
sys.exit(1)
return wrapped
diff --git a/lib/bb/event.py b/lib/bb/event.py
index 2826e3554..d73067fcf 100644
--- a/lib/bb/event.py
+++ b/lib/bb/event.py
@@ -33,11 +33,12 @@ import atexit
import traceback
import bb.utils
import bb.compat
+import bb.exceptions
# This is the pid for which we should generate the event. This is set when
# the runqueue forks off.
worker_pid = 0
-worker_pipe = None
+worker_fire = None
logger = logging.getLogger('BitBake.Event')
@@ -150,20 +151,12 @@ def fire(event, d):
# don't have a datastore so the datastore context isn't a problem.
fire_class_handlers(event, d)
- if worker_pid != 0:
+ if worker_fire:
worker_fire(event, d)
else:
fire_ui_handlers(event, d)
-def worker_fire(event, d):
- data = "<event>" + pickle.dumps(event) + "</event>"
- worker_pipe.write(data)
-
def fire_from_worker(event, d):
- if not event.startswith("<event>") or not event.endswith("</event>"):
- print("Error, not an event %s" % event)
- return
- event = pickle.loads(event[7:-8])
fire_ui_handlers(event, d)
noop = lambda _: None
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 090d1b56a..dd6e071c3 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -28,10 +28,17 @@ import sys
import signal
import stat
import fcntl
+import errno
import logging
import bb
from bb import msg, data, event
from bb import monitordisk
+import subprocess
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
bblogger = logging.getLogger("BitBake")
logger = logging.getLogger("BitBake.RunQueue")
@@ -938,6 +945,10 @@ class RunQueue:
raise
except:
logger.error("An uncaught exception occured in runqueue, please see the failure below:")
+ try:
+ self.rqexe.teardown()
+ except:
+ pass
self.state = runQueueComplete
raise
@@ -979,38 +990,41 @@ class RunQueueExecute:
self.runq_buildable = []
self.runq_running = []
self.runq_complete = []
- self.build_pids = {}
- self.build_pipes = {}
+
self.build_stamps = {}
self.failed_fnids = []
self.stampcache = {}
- def runqueue_process_waitpid(self):
- """
- Return none is there are no processes awaiting result collection, otherwise
- collect the process exit codes and close the information pipe.
- """
- pid, status = os.waitpid(-1, os.WNOHANG)
- if pid == 0 or os.WIFSTOPPED(status):
- return None
-
- if os.WIFEXITED(status):
- status = os.WEXITSTATUS(status)
- elif os.WIFSIGNALED(status):
- # Per shell conventions for $?, when a process exits due to
- # a signal, we return an exit code of 128 + SIGNUM
- status = 128 + os.WTERMSIG(status)
-
- task = self.build_pids[pid]
- del self.build_pids[pid]
-
- self.build_pipes[pid].close()
- del self.build_pipes[pid]
+ logger.debug(1, "Starting bitbake-worker")
+ self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
+ bb.utils.nonblockingfd(self.worker.stdout)
+ self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self)
+
+ workerdata = {
+ "taskdeps" : self.rqdata.dataCache.task_deps,
+ "fakerootenv" : self.rqdata.dataCache.fakerootenv,
+ "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
+ "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
+ "hashes" : self.rqdata.hashes,
+ "hash_deps" : self.rqdata.hash_deps,
+ "sigchecksums" : bb.parse.siggen.file_checksum_values,
+ "runq_hash" : self.rqdata.runq_hash,
+ "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
+ "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
+ "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
+ "logdefaultdomain" : bb.msg.loggerDefaultDomains,
+ }
+
+ self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
+ self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
+ self.worker.stdin.flush()
+
+ def runqueue_process_waitpid(self, task, status):
# self.build_stamps[pid] may not exist when use shared work directory.
- if pid in self.build_stamps:
- del self.build_stamps[pid]
+ if task in self.build_stamps:
+ del self.build_stamps[task]
if status != 0:
self.task_fail(task, status)
@@ -1019,16 +1033,11 @@ class RunQueueExecute:
return True
def finish_now(self):
- if self.stats.active:
- logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
- for k, v in self.build_pids.iteritems():
- try:
- os.kill(-k, signal.SIGTERM)
- os.waitpid(-1, 0)
- except:
- pass
- for pipe in self.build_pipes:
- self.build_pipes[pipe].read()
+
+ self.worker.stdin.write("<finishnow></finishnow>")
+ self.worker.stdin.flush()
+
+ self.teardown()
if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed
@@ -1040,14 +1049,13 @@ class RunQueueExecute:
def finish(self):
self.rq.state = runQueueCleanUp
- for pipe in self.build_pipes:
- self.build_pipes[pipe].read()
-
if self.stats.active > 0:
bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
- self.runqueue_process_waitpid()
+ self.workerpipe.read()
return
+ self.teardown()
+
if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed
return
@@ -1055,115 +1063,6 @@ class RunQueueExecute:
self.rq.state = runQueueComplete
return
- def fork_off_task(self, fn, task, taskname, quieterrors=False):
- # We need to setup the environment BEFORE the fork, since
- # a fork() or exec*() activates PSEUDO...
-
- envbackup = {}
- fakeenv = {}
- umask = None
-
- taskdep = self.rqdata.dataCache.task_deps[fn]
- if 'umask' in taskdep and taskname in taskdep['umask']:
- # umask might come in as a number or text string..
- try:
- umask = int(taskdep['umask'][taskname],8)
- except TypeError:
- umask = taskdep['umask'][taskname]
-
- if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
- envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split()
- for key, value in (var.split('=') for var in envvars):
- envbackup[key] = os.environ.get(key)
- os.environ[key] = value
- fakeenv[key] = value
-
- fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split()
- for p in fakedirs:
- bb.utils.mkdirhier(p)
-
- logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
- (fn, taskname, ', '.join(fakedirs)))
- else:
- envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split()
- for key, value in (var.split('=') for var in envvars):
- envbackup[key] = os.environ.get(key)
- os.environ[key] = value
- fakeenv[key] = value
-
- sys.stdout.flush()
- sys.stderr.flush()
- try:
- pipein, pipeout = os.pipe()
- pipein = os.fdopen(pipein, 'rb', 4096)
- pipeout = os.fdopen(pipeout, 'wb', 0)
- pid = os.fork()
- except OSError as e:
- bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
-
- if pid == 0:
- pipein.close()
-
- # Save out the PID so that the event can include it the
- # events
- bb.event.worker_pid = os.getpid()
- bb.event.worker_pipe = pipeout
-
- self.rq.state = runQueueChildProcess
- # Make the child the process group leader
- os.setpgid(0, 0)
- # No stdin
- newsi = os.open(os.devnull, os.O_RDWR)
- os.dup2(newsi, sys.stdin.fileno())
-
- if umask:
- os.umask(umask)
-
- self.cooker.data.setVar("BB_WORKERCONTEXT", "1")
- bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
- ret = 0
- try:
- the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
- the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
- for h in self.rqdata.hashes:
- the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h])
- for h in self.rqdata.hash_deps:
- the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h])
-
- # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
- # successfully. We also need to unset anything from the environment which shouldn't be there
- exports = bb.data.exported_vars(the_data)
- bb.utils.empty_environment()
- for e, v in exports:
- os.environ[e] = v
- for e in fakeenv:
- os.environ[e] = fakeenv[e]
- the_data.setVar(e, fakeenv[e])
- the_data.setVarFlag(e, 'export', "1")
-
- if quieterrors:
- the_data.setVarFlag(taskname, "quieterrors", "1")
-
- except Exception as exc:
- if not quieterrors:
- logger.critical(str(exc))
- os._exit(1)
- try:
- if not self.cooker.configuration.dry_run:
- profile = self.cooker.configuration.profile
- ret = bb.build.exec_task(fn, taskname, the_data, profile)
- os._exit(ret)
- except:
- os._exit(1)
- else:
- for key, value in envbackup.iteritems():
- if value is None:
- del os.environ[key]
- else:
- os.environ[key] = value
-
- return pid, pipein, pipeout
-
def check_dependencies(self, task, taskdeps, setscene = False):
if not self.rq.depvalidate:
return False
@@ -1184,6 +1083,16 @@ class RunQueueExecute:
valid = bb.utils.better_eval(call, locs)
return valid
+ def teardown(self):
+ logger.debug(1, "Teardown for bitbake-worker")
+ self.worker.stdin.write("<quit></quit>")
+ self.worker.stdin.flush()
+ while self.worker.returncode is None:
+ self.workerpipe.read()
+ self.worker.poll()
+ while self.workerpipe.read():
+ continue
+
class RunQueueExecuteDummy(RunQueueExecute):
def __init__(self, rq):
self.rq = rq
@@ -1275,7 +1184,6 @@ class RunQueueExecuteTasks(RunQueueExecute):
bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
(self.scheduler, ", ".join(obj.name for obj in schedulers)))
-
def get_schedulers(self):
schedulers = set(obj for obj in globals().values()
if type(obj) is type and
@@ -1349,6 +1257,9 @@ class RunQueueExecuteTasks(RunQueueExecute):
Run the tasks in a queue prepared by rqdata.prepare()
"""
+ self.workerpipe.read()
+
+
if self.stats.total == 0:
# nothing to do
self.rq.state = runQueueCleanUp
@@ -1384,23 +1295,20 @@ class RunQueueExecuteTasks(RunQueueExecute):
startevent = runQueueTaskStarted(task, self.stats, self.rq)
bb.event.fire(startevent, self.cfgData)
- pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
+ self.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
+ self.worker.stdin.flush()
- self.build_pids[pid] = task
- self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
- self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
+ self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
self.runq_running[task] = 1
self.stats.taskActive()
if self.stats.active < self.number_tasks:
return True
- for pipe in self.build_pipes:
- self.build_pipes[pipe].read()
-
if self.stats.active > 0:
- if self.runqueue_process_waitpid() is None:
- return 0.5
- return True
+ self.workerpipe.read()
+ return 0.5
+
+ self.teardown()
if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed
@@ -1415,6 +1323,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
if self.runq_complete[task] == 0:
logger.error("Task %s never completed!", task)
self.rq.state = runQueueComplete
+
return True
class RunQueueExecuteScenequeue(RunQueueExecute):
@@ -1428,6 +1337,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
# If we don't have any setscene functions, skip this step
if len(self.rqdata.runq_setscene) == 0:
rq.scenequeue_covered = set()
+ self.teardown()
rq.state = runQueueRunInit
return
@@ -1676,6 +1586,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
Run the tasks in a queue prepared by prepare_runqueue
"""
+ self.workerpipe.read()
+
task = None
if self.stats.active < self.number_tasks:
# Find the next setscene to run
@@ -1716,22 +1628,17 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
bb.event.fire(startevent, self.cfgData)
- pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
+ self.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
+ self.worker.stdin.flush()
- self.build_pids[pid] = task
- self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
self.runq_running[task] = 1
self.stats.taskActive()
if self.stats.active < self.number_tasks:
return True
- for pipe in self.build_pipes:
- self.build_pipes[pipe].read()
-
if self.stats.active > 0:
- if self.runqueue_process_waitpid() is None:
- return 0.5
- return True
+ self.workerpipe.read()
+ return 0.5
# Convert scenequeue_covered task numbers into full taskgraph ids
oldcovered = self.scenequeue_covered
@@ -1745,10 +1652,13 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
self.rq.state = runQueueRunInit
+ self.teardown()
return True
- def fork_off_task(self, fn, task, taskname):
- return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True)
+ def runqueue_process_waitpid(self, task, status):
+ task = self.rq.rqdata.runq_setscene.index(task)
+
+ RunQueueExecute.runqueue_process_waitpid(self, task, status)
class TaskFailure(Exception):
"""
@@ -1828,25 +1738,43 @@ class runQueuePipe():
"""
Abstraction for a pipe between a worker thread and the server
"""
- def __init__(self, pipein, pipeout, d):
+ def __init__(self, pipein, pipeout, d, rq):
self.input = pipein
- pipeout.close()
+ if pipeout:
+ pipeout.close()
bb.utils.nonblockingfd(self.input)
self.queue = ""
self.d = d
+ self.rq = rq
+
+ def setrunqueue(self, rq):
+ self.rq = rq
def read(self):
start = len(self.queue)
try:
self.queue = self.queue + self.input.read(102400)
- except (OSError, IOError):
- pass
+ except (OSError, IOError) as e:
+ if e.errno != errno.EAGAIN:
+ raise
end = len(self.queue)
- index = self.queue.find("</event>")
- while index != -1:
- bb.event.fire_from_worker(self.queue[:index+8], self.d)
- self.queue = self.queue[index+8:]
+ found = True
+ while found and len(self.queue):
+ found = False
index = self.queue.find("</event>")
+ while index != -1 and self.queue.startswith("<event>"):
+ event = pickle.loads(self.queue[7:index])
+ bb.event.fire_from_worker(event, self.d)
+ found = True
+ self.queue = self.queue[index+8:]
+ index = self.queue.find("</event>")
+ index = self.queue.find("</exitcode>")
+ while index != -1 and self.queue.startswith("<exitcode>"):
+ task, status = pickle.loads(self.queue[10:index])
+ self.rq.runqueue_process_waitpid(task, status)
+ found = True
+ self.queue = self.queue[index+11:]
+ index = self.queue.find("</exitcode>")
return (end > start)
def close(self):
diff --git a/lib/bb/siggen.py b/lib/bb/siggen.py
index 1ff2ecc48..fb8b67850 100644
--- a/lib/bb/siggen.py
+++ b/lib/bb/siggen.py
@@ -201,9 +201,10 @@ class SignatureGeneratorBasic(SignatureGenerator):
#d.setVar("BB_TASKHASH_task-%s" % task, taskhash[task])
return h
- def set_taskdata(self, hashes, deps):
+ def set_taskdata(self, hashes, deps, checksums):
self.runtaskdeps = deps
self.taskhash = hashes
+ self.file_checksum_values = checksums
def dump_sigtask(self, fn, task, stampbase, runtime):
k = fn + "." + task