diff options
Diffstat (limited to 'bin/bitbake-worker')
-rwxr-xr-x | bin/bitbake-worker | 149 |
1 files changed, 105 insertions, 44 deletions
diff --git a/bin/bitbake-worker b/bin/bitbake-worker index 7765b9368..e8073f2ac 100755 --- a/bin/bitbake-worker +++ b/bin/bitbake-worker @@ -1,11 +1,14 @@ #!/usr/bin/env python3 # +# Copyright BitBake Contributors +# # SPDX-License-Identifier: GPL-2.0-only # import os import sys import warnings +warnings.simplefilter("default") sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging @@ -16,11 +19,12 @@ import signal import pickle import traceback import queue +import shlex +import subprocess from multiprocessing import Lock from threading import Thread -if sys.getfilesystemencoding() != "utf-8": - sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") +bb.utils.check_system_locale() # Users shouldn't be running this code directly if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): @@ -87,19 +91,19 @@ def worker_fire_prepickled(event): worker_thread_exit = False def worker_flush(worker_queue): - worker_queue_int = b"" + worker_queue_int = bytearray() global worker_pipe, worker_thread_exit while True: try: - worker_queue_int = worker_queue_int + worker_queue.get(True, 1) + worker_queue_int.extend(worker_queue.get(True, 1)) except queue.Empty: pass while (worker_queue_int or not worker_queue.empty()): try: (_, ready, _) = select.select([], [worker_pipe], [], 1) if not worker_queue.empty(): - worker_queue_int = worker_queue_int + worker_queue.get() + worker_queue_int.extend(worker_queue.get()) written = os.write(worker_pipe, worker_queue_int) worker_queue_int = worker_queue_int[written:] except (IOError, OSError) as e: @@ -117,11 +121,10 @@ def worker_child_fire(event, d): data = b"<event>" + pickle.dumps(event) + b"</event>" try: - worker_pipe_lock.acquire() - while(len(data)): - written = worker_pipe.write(data) - data = data[written:] - worker_pipe_lock.release() + with bb.utils.lock_timeout(worker_pipe_lock): + while(len(data)): + written = worker_pipe.write(data) + data = data[written:] except IOError: sigterm_handler(None, None) raise @@ -140,15 +143,29 @@ def sigterm_handler(signum, frame): os.killpg(0, signal.SIGTERM) sys.exit() -def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): +def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask): + + fn = runtask['fn'] + task = runtask['task'] + taskname = runtask['taskname'] + taskhash = runtask['taskhash'] + unihash = runtask['unihash'] + appends = runtask['appends'] + layername = runtask['layername'] + taskdepdata = runtask['taskdepdata'] + quieterrors = runtask['quieterrors'] # We need to setup the environment BEFORE the fork, since # a fork() or exec*() activates PSEUDO... envbackup = {} + fakeroot = False fakeenv = {} umask = None - taskdep = workerdata["taskdeps"][fn] + uid = os.getuid() + gid = os.getgid() + + taskdep = runtask['taskdep'] if 'umask' in taskdep and taskname in taskdep['umask']: umask = taskdep['umask'][taskname] elif workerdata["umask"]: @@ -160,24 +177,25 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha except TypeError: pass - dry_run = cfg.dry_run or dry_run_exec + dry_run = cfg.dry_run or runtask['dry_run'] # We can't use the fakeroot environment in a dry run as it possibly hasn't been built if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: - envvars = (workerdata["fakerootenv"][fn] or "").split() - for key, value in (var.split('=') for var in envvars): + fakeroot = True + envvars = (runtask['fakerootenv'] or "").split() + for key, value in (var.split('=',1) for var in envvars): envbackup[key] = os.environ.get(key) os.environ[key] = value fakeenv[key] = value - fakedirs = (workerdata["fakerootdirs"][fn] or "").split() + fakedirs = (runtask['fakerootdirs'] or "").split() for p in fakedirs: bb.utils.mkdirhier(p) logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % (fn, taskname, ', '.join(fakedirs))) else: - envvars = (workerdata["fakerootnoenv"][fn] or "").split() - for key, value in (var.split('=') for var in envvars): + envvars = (runtask['fakerootnoenv'] or "").split() + for key, value in (var.split('=',1) for var in envvars): envbackup[key] = os.environ.get(key) os.environ[key] = value fakeenv[key] = value @@ -219,19 +237,21 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha # Let SIGHUP exit as SIGTERM signal.signal(signal.SIGHUP, sigterm_handler) - # No stdin - newsi = os.open(os.devnull, os.O_RDWR) - os.dup2(newsi, sys.stdin.fileno()) + # No stdin & stdout + # stdout is used as a status report channel and must not be used by child processes. + dumbio = os.open(os.devnull, os.O_RDWR) + os.dup2(dumbio, sys.stdin.fileno()) + os.dup2(dumbio, sys.stdout.fileno()) - if umask: + if umask is not None: os.umask(umask) try: - bb_cache = bb.cache.NoCache(databuilder) (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) the_data = databuilder.mcdata[mc] the_data.setVar("BB_WORKERCONTEXT", "1") the_data.setVar("BB_TASKDEPDATA", taskdepdata) + the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", "")) if cfg.limited_deps: the_data.setVar("BB_LIMITEDDEPS", "1") the_data.setVar("BUILDNAME", workerdata["buildname"]) @@ -245,12 +265,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) ret = 0 - the_data = bb_cache.loadDataFull(fn, appends) + the_data = databuilder.parseRecipe(fn, appends, layername) the_data.setVar('BB_TASKHASH', taskhash) the_data.setVar('BB_UNIHASH', unihash) + bb.parse.siggen.setup_datacache_from_datastore(fn, the_data) bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) + if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')): + if bb.utils.is_local_uid(uid): + logger.debug("Attempting to disable network for %s" % taskname) + bb.utils.disable_network(uid, gid) + else: + logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid)) + # exported_vars() returns a generator which *cannot* be passed to os.environ.update() # successfully. We also need to unset anything from the environment which shouldn't be there exports = bb.data.exported_vars(the_data) @@ -279,10 +307,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha if not quieterrors: logger.critical(traceback.format_exc()) os._exit(1) + + sys.stdout.flush() + sys.stderr.flush() + try: if dry_run: return 0 - return bb.build.exec_task(fn, taskname, the_data, cfg.profile) + try: + ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) + finally: + if fakeroot: + fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) + subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) + return ret except: os._exit(1) if not profiling: @@ -314,12 +352,12 @@ class runQueueWorkerPipe(): if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) - self.queue = b"" + self.queue = bytearray() def read(self): start = len(self.queue) try: - self.queue = self.queue + (self.input.read(102400) or b"") + self.queue.extend(self.input.read(102400) or b"") except (OSError, IOError) as e: if e.errno != errno.EAGAIN: raise @@ -347,7 +385,7 @@ class BitbakeWorker(object): def __init__(self, din): self.input = din bb.utils.nonblockingfd(self.input) - self.queue = b"" + self.queue = bytearray() self.cookercfg = None self.databuilder = None self.data = None @@ -381,7 +419,7 @@ class BitbakeWorker(object): if len(r) == 0: # EOF on pipe, server must have terminated self.sigterm_exception(signal.SIGTERM, None) - self.queue = self.queue + r + self.queue.extend(r) except (OSError, IOError): pass if len(self.queue): @@ -401,19 +439,35 @@ class BitbakeWorker(object): while self.process_waitpid(): continue - def handle_item(self, item, func): - if self.queue.startswith(b"<" + item + b">"): - index = self.queue.find(b"</" + item + b">") - while index != -1: - func(self.queue[(len(item) + 2):index]) - self.queue = self.queue[(index + len(item) + 3):] - index = self.queue.find(b"</" + item + b">") + opening_tag = b"<" + item + b">" + if not self.queue.startswith(opening_tag): + return + + tag_len = len(opening_tag) + if len(self.queue) < tag_len + 4: + # we need to receive more data + return + header = self.queue[tag_len:tag_len + 4] + payload_len = int.from_bytes(header, 'big') + # closing tag has length (tag_len + 1) + if len(self.queue) < tag_len * 2 + 1 + payload_len: + # we need to receive more data + return + + index = self.queue.find(b"</" + item + b">") + if index != -1: + try: + func(self.queue[(tag_len + 4):index]) + except pickle.UnpicklingError: + workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) + raise + self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):] def handle_cookercfg(self, data): self.cookercfg = pickle.loads(data) self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) - self.databuilder.parseBaseConfiguration() + self.databuilder.parseBaseConfiguration(worker=True) self.data = self.databuilder.data def handle_extraconfigdata(self, data): @@ -428,6 +482,7 @@ class BitbakeWorker(object): for mc in self.databuilder.mcdata: self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) + self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe") def handle_newtaskhashes(self, data): self.workerdata["newhashes"] = pickle.loads(data) @@ -445,11 +500,15 @@ class BitbakeWorker(object): sys.exit(0) def handle_runtask(self, data): - fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) - workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) + runtask = pickle.loads(data) - pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) + fn = runtask['fn'] + task = runtask['task'] + taskname = runtask['taskname'] + workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) + + pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask) self.build_pids[pid] = task self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) @@ -513,9 +572,11 @@ except BaseException as e: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) +finally: + worker_thread_exit = True + worker_thread.join() -worker_thread_exit = True -worker_thread.join() - -workerlog_write("exitting") +workerlog_write("exiting") +if not normalexit: + sys.exit(1) sys.exit(0) |