#!/usr/bin/env python3 # # SPDX-License-Identifier: GPL-2.0-only # import os import sys import warnings sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging import bb import select import errno import signal import pickle import traceback import queue import shlex import subprocess from multiprocessing import Lock from threading import Thread if sys.getfilesystemencoding() != "utf-8": sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") # Users shouldn't be running this code directly if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") sys.exit(1) profiling = False if sys.argv[1].startswith("decafbadbad"): profiling = True try: import cProfile as profile except: import profile # Unbuffer stdout to avoid log truncation in the event # of an unorderly exit as well as to provide timely # updates to log files for use with tail try: if sys.stdout.name == '': import fcntl fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) fl |= os.O_SYNC fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) except: pass logger = logging.getLogger("BitBake") worker_pipe = sys.stdout.fileno() bb.utils.nonblockingfd(worker_pipe) # Need to guard against multiprocessing being used in child processes # and multiple processes trying to write to the parent at the same time worker_pipe_lock = None handler = bb.event.LogHandler() logger.addHandler(handler) if 0: # Code to write out a log file of all events passing through the worker logfilename = "/tmp/workerlogfile" format_str = "%(levelname)s: %(message)s" conlogformat = bb.msg.BBLogFormatter(format_str) consolelog = logging.FileHandler(logfilename) consolelog.setFormatter(conlogformat) logger.addHandler(consolelog) worker_queue = queue.Queue() def worker_fire(event, d): data = b"" + pickle.dumps(event) + b"" worker_fire_prepickled(data) def worker_fire_prepickled(event): global worker_queue worker_queue.put(event) # # We can end up with write contention with the cooker, it can be trying to send commands # and we can be trying to send event data back. Therefore use a separate thread for writing # back data to cooker. # worker_thread_exit = False def worker_flush(worker_queue): worker_queue_int = b"" global worker_pipe, worker_thread_exit while True: try: worker_queue_int = worker_queue_int + worker_queue.get(True, 1) except queue.Empty: pass while (worker_queue_int or not worker_queue.empty()): try: (_, ready, _) = select.select([], [worker_pipe], [], 1) if not worker_queue.empty(): worker_queue_int = worker_queue_int + worker_queue.get() written = os.write(worker_pipe, worker_queue_int) worker_queue_int = worker_queue_int[written:] except (IOError, OSError) as e: if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: raise if worker_thread_exit and worker_queue.empty() and not worker_queue_int: return worker_thread = Thread(target=worker_flush, args=(worker_queue,)) worker_thread.start() def worker_child_fire(event, d): global worker_pipe global worker_pipe_lock data = b"" + pickle.dumps(event) + b"" try: worker_pipe_lock.acquire() while(len(data)): written = worker_pipe.write(data) data = data[written:] worker_pipe_lock.release() except IOError: sigterm_handler(None, None) raise bb.event.worker_fire = worker_fire lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): if lf: lf.write(msg) lf.flush() def sigterm_handler(signum, frame): signal.signal(signal.SIGTERM, signal.SIG_DFL) os.killpg(0, signal.SIGTERM) sys.exit() def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): # We need to setup the environment BEFORE the fork, since # a fork() or exec*() activates PSEUDO... envbackup = {} fakeroot = False fakeenv = {} umask = None taskdep = workerdata["taskdeps"][fn] if 'umask' in taskdep and taskname in taskdep['umask']: umask = taskdep['umask'][taskname] elif workerdata["umask"]: umask = workerdata["umask"] if umask: # umask might come in as a number or text string.. try: umask = int(umask, 8) except TypeError: pass dry_run = cfg.dry_run or dry_run_exec # We can't use the fakeroot environment in a dry run as it possibly hasn't been built if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: fakeroot = True envvars = (workerdata["fakerootenv"][fn] or "").split() for key, value in (var.split('=') for var in envvars): envbackup[key] = os.environ.get(key) os.environ[key] = value fakeenv[key] = value fakedirs = (workerdata["fakerootdirs"][fn] or "").split() for p in fakedirs: bb.utils.mkdirhier(p) logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % (fn, taskname, ', '.join(fakedirs))) else: envvars = (workerdata["fakerootnoenv"][fn] or "").split() for key, value in (var.split('=') for var in envvars): envbackup[key] = os.environ.get(key) os.environ[key] = value fakeenv[key] = value sys.stdout.flush() sys.stderr.flush() try: pipein, pipeout = os.pipe() pipein = os.fdopen(pipein, 'rb', 4096) pipeout = os.fdopen(pipeout, 'wb', 0) pid = os.fork() except OSError as e: logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) sys.exit(1) if pid == 0: def child(): global worker_pipe global worker_pipe_lock pipein.close() bb.utils.signal_on_parent_exit("SIGTERM") # Save out the PID so that the event can include it the # events bb.event.worker_pid = os.getpid() bb.event.worker_fire = worker_child_fire worker_pipe = pipeout worker_pipe_lock = Lock() # Make the child the process group leader and ensure no # child process will be controlled by the current terminal # This ensures signals sent to the controlling terminal like Ctrl+C # don't stop the child processes. os.setsid() signal.signal(signal.SIGTERM, sigterm_handler) # Let SIGHUP exit as SIGTERM signal.signal(signal.SIGHUP, sigterm_handler) # No stdin newsi = os.open(os.devnull, os.O_RDWR) os.dup2(newsi, sys.stdin.fileno()) if umask: os.umask(umask) try: bb_cache = bb.cache.NoCache(databuilder) (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) the_data = databuilder.mcdata[mc] the_data.setVar("BB_WORKERCONTEXT", "1") the_data.setVar("BB_TASKDEPDATA", taskdepdata) if cfg.limited_deps: the_data.setVar("BB_LIMITEDDEPS", "1") the_data.setVar("BUILDNAME", workerdata["buildname"]) the_data.setVar("DATE", workerdata["date"]) the_data.setVar("TIME", workerdata["time"]) for varname, value in extraconfigdata.items(): the_data.setVar(varname, value) bb.parse.siggen.set_taskdata(workerdata["sigdata"]) if "newhashes" in workerdata: bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) ret = 0 the_data = bb_cache.loadDataFull(fn, appends) the_data.setVar('BB_TASKHASH', taskhash) the_data.setVar('BB_UNIHASH', unihash) bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) # exported_vars() returns a generator which *cannot* be passed to os.environ.update() # successfully. We also need to unset anything from the environment which shouldn't be there exports = bb.data.exported_vars(the_data) bb.utils.empty_environment() for e, v in exports: os.environ[e] = v for e in fakeenv: os.environ[e] = fakeenv[e] the_data.setVar(e, fakeenv[e]) the_data.setVarFlag(e, 'export', "1") task_exports = the_data.getVarFlag(taskname, 'exports') if task_exports: for e in task_exports.split(): the_data.setVarFlag(e, 'export', '1') v = the_data.getVar(e) if v is not None: os.environ[e] = v if quieterrors: the_data.setVarFlag(taskname, "quieterrors", "1") except Exception: if not quieterrors: logger.critical(traceback.format_exc()) os._exit(1) try: if dry_run: return 0 try: ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) finally: if fakeroot: fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) return ret except: os._exit(1) if not profiling: os._exit(child()) else: profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) prof = profile.Profile() try: ret = profile.Profile.runcall(prof, child) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) os._exit(ret) else: for key, value in iter(envbackup.items()): if value is None: del os.environ[key] else: os.environ[key] = value return pid, pipein, pipeout class runQueueWorkerPipe(): """ Abstraction for a pipe between a worker thread and the worker server """ def __init__(self, pipein, pipeout): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) self.queue = b"" def read(self): start = len(self.queue) try: self.queue = self.queue + (self.input.read(102400) or b"") except (OSError, IOError) as e: if e.errno != errno.EAGAIN: raise end = len(self.queue) index = self.queue.find(b"") while index != -1: msg = self.queue[:index+8] assert msg.startswith(b"") and msg.count(b"") == 1 worker_fire_prepickled(msg) self.queue = self.queue[index+8:] index = self.queue.find(b"") return (end > start) def close(self): while self.read(): continue if len(self.queue) > 0: print("Warning, worker child left partial message: %s" % self.queue) self.input.close() normalexit = False class BitbakeWorker(object): def __init__(self, din): self.input = din bb.utils.nonblockingfd(self.input) self.queue = b"" self.cookercfg = None self.databuilder = None self.data = None self.extraconfigdata = None self.build_pids = {} self.build_pipes = {} signal.signal(signal.SIGTERM, self.sigterm_exception) # Let SIGHUP exit as SIGTERM signal.signal(signal.SIGHUP, self.sigterm_exception) if "beef" in sys.argv[1]: bb.utils.set_process_name("Worker (Fakeroot)") else: bb.utils.set_process_name("Worker") def sigterm_exception(self, signum, stackframe): if signum == signal.SIGTERM: bb.warn("Worker received SIGTERM, shutting down...") elif signum == signal.SIGHUP: bb.warn("Worker received SIGHUP, shutting down...") self.handle_finishnow(None) signal.signal(signal.SIGTERM, signal.SIG_DFL) os.kill(os.getpid(), signal.SIGTERM) def serve(self): while True: (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) if self.input in ready: try: r = self.input.read() if len(r) == 0: # EOF on pipe, server must have terminated self.sigterm_exception(signal.SIGTERM, None) self.queue = self.queue + r except (OSError, IOError): pass if len(self.queue): self.handle_item(b"cookerconfig", self.handle_cookercfg) self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) self.handle_item(b"workerdata", self.handle_workerdata) self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) self.handle_item(b"runtask", self.handle_runtask) self.handle_item(b"finishnow", self.handle_finishnow) self.handle_item(b"ping", self.handle_ping) self.handle_item(b"quit", self.handle_quit) for pipe in self.build_pipes: if self.build_pipes[pipe].input in ready: self.build_pipes[pipe].read() if len(self.build_pids): while self.process_waitpid(): continue def handle_item(self, item, func): if self.queue.startswith(b"<" + item + b">"): index = self.queue.find(b"") while index != -1: func(self.queue[(len(item) + 2):index]) self.queue = self.queue[(index + len(item) + 3):] index = self.queue.find(b"") def handle_cookercfg(self, data): self.cookercfg = pickle.loads(data) self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) self.databuilder.parseBaseConfiguration() self.data = self.databuilder.data def handle_extraconfigdata(self, data): self.extraconfigdata = pickle.loads(data) def handle_workerdata(self, data): self.workerdata = pickle.loads(data) bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] for mc in self.databuilder.mcdata: self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) def handle_newtaskhashes(self, data): self.workerdata["newhashes"] = pickle.loads(data) def handle_ping(self, _): workerlog_write("Handling ping\n") logger.warning("Pong from bitbake-worker!") def handle_quit(self, data): workerlog_write("Handling quit\n") global normalexit normalexit = True sys.exit(0) def handle_runtask(self, data): fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) self.build_pids[pid] = task self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) def process_waitpid(self): """ Return none is there are no processes awaiting result collection, otherwise collect the process exit codes and close the information pipe. """ try: pid, status = os.waitpid(-1, os.WNOHANG) if pid == 0 or os.WIFSTOPPED(status): return False except OSError: return False workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) if os.WIFEXITED(status): status = os.WEXITSTATUS(status) elif os.WIFSIGNALED(status): # Per shell conventions for $?, when a process exits due to # a signal, we return an exit code of 128 + SIGNUM status = 128 + os.WTERMSIG(status) task = self.build_pids[pid] del self.build_pids[pid] self.build_pipes[pid].close() del self.build_pipes[pid] worker_fire_prepickled(b"" + pickle.dumps((task, status)) + b"") return True def handle_finishnow(self, _): if self.build_pids: logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) for k, v in iter(self.build_pids.items()): try: os.kill(-k, signal.SIGTERM) os.waitpid(-1, 0) except: pass for pipe in self.build_pipes: self.build_pipes[pipe].read() try: worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) if not profiling: worker.serve() else: profname = "profile-worker.log" prof = profile.Profile() try: profile.Profile.runcall(prof, worker.serve) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) except BaseException as e: if not normalexit: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) finally: worker_thread_exit = True worker_thread.join() workerlog_write("exiting") if not normalexit: sys.exit(1) sys.exit(0)