diff options
Diffstat (limited to 'lib/bb/runqueue.py')
-rw-r--r-- | lib/bb/runqueue.py | 395 |
1 files changed, 259 insertions, 136 deletions
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index ce711b625..bc7e18175 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -157,7 +157,7 @@ class RunQueueScheduler(object): (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) if tid in self.rq.runq_buildable: - self.buildable.append(tid) + self.buildable.add(tid) self.rev_prio_map = None self.is_pressure_usable() @@ -198,16 +198,38 @@ class RunQueueScheduler(object): curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] - exceeds_cpu_pressure = self.rq.max_cpu_pressure and (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) > self.rq.max_cpu_pressure - exceeds_io_pressure = self.rq.max_io_pressure and (float(curr_io_pressure) - float(self.prev_io_pressure)) > self.rq.max_io_pressure - exceeds_memory_pressure = self.rq.max_memory_pressure and (float(curr_memory_pressure) - float(self.prev_memory_pressure)) > self.rq.max_memory_pressure now = time.time() - if now - self.prev_pressure_time > 1.0: + tdiff = now - self.prev_pressure_time + psi_accumulation_interval = 1.0 + cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff + io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff + memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff + exceeds_cpu_pressure = self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure + exceeds_io_pressure = self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure + exceeds_memory_pressure = self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure + + if tdiff > psi_accumulation_interval: self.prev_cpu_pressure = curr_cpu_pressure self.prev_io_pressure = curr_io_pressure self.prev_memory_pressure = curr_memory_pressure self.prev_pressure_time = now + + pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure) + pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure) + if hasattr(self, "pressure_state") and pressure_state != self.pressure_state: + bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))) + self.pressure_state = pressure_state return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure) + elif self.rq.max_loadfactor: + limit = False + loadfactor = float(os.getloadavg()[0]) / os.cpu_count() + # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor)) + if loadfactor > self.rq.max_loadfactor: + limit = True + if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit: + bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)) + self.loadfactor_limit = limit + return limit return False def next_buildable_task(self): @@ -258,11 +280,11 @@ class RunQueueScheduler(object): best = None bestprio = None for tid in buildable: - taskname = taskname_from_tid(tid) - if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): - continue prio = self.rev_prio_map[tid] if bestprio is None or bestprio > prio: + taskname = taskname_from_tid(tid) + if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): + continue stamp = self.stamps[tid] if stamp in self.rq.build_stamps.values(): continue @@ -655,6 +677,7 @@ class RunQueueData: self.init_progress_reporter.start() self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Step A - Work out a list of tasks to run # @@ -803,6 +826,7 @@ class RunQueueData: #self.dump_data() self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Resolve recursive 'recrdeptask' dependencies (Part B) # @@ -899,6 +923,7 @@ class RunQueueData: self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) #self.dump_data() @@ -980,6 +1005,7 @@ class RunQueueData: mark_active(tid, 1) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Step C - Prune all inactive tasks # @@ -988,25 +1014,32 @@ class RunQueueData: # Handle --runall if self.cooker.configuration.runall: # re-run the mark_active and then drop unused tasks from new list - reduced_tasklist = set(self.runtaskentries.keys()) - for tid in list(self.runtaskentries.keys()): - if tid not in runq_build: - reduced_tasklist.remove(tid) - runq_build = {} - for task in self.cooker.configuration.runall: - if not task.startswith("do_"): - task = "do_{0}".format(task) - runall_tids = set() - for tid in reduced_tasklist: - wanttid = "{0}:{1}".format(fn_from_tid(tid), task) - if wanttid in self.runtaskentries: - runall_tids.add(wanttid) + runall_tids = set() + added = True + while added: + reduced_tasklist = set(self.runtaskentries.keys()) + for tid in list(self.runtaskentries.keys()): + if tid not in runq_build: + reduced_tasklist.remove(tid) + runq_build = {} - for tid in list(runall_tids): - mark_active(tid, 1) - if self.cooker.configuration.force: - invalidate_task(tid, False) + orig = runall_tids + runall_tids = set() + for task in self.cooker.configuration.runall: + if not task.startswith("do_"): + task = "do_{0}".format(task) + for tid in reduced_tasklist: + wanttid = "{0}:{1}".format(fn_from_tid(tid), task) + if wanttid in self.runtaskentries: + runall_tids.add(wanttid) + + for tid in list(runall_tids): + mark_active(tid, 1) + self.target_tids.append(tid) + if self.cooker.configuration.force: + invalidate_task(tid, False) + added = runall_tids - orig delcount = set() for tid in list(self.runtaskentries.keys()): @@ -1019,6 +1052,7 @@ class RunQueueData: bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Handle runonly if self.cooker.configuration.runonly: @@ -1059,6 +1093,7 @@ class RunQueueData: logger.verbose("Assign Weightings") self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Generate a list of reverse dependencies to ease future calculations for tid in self.runtaskentries: @@ -1066,6 +1101,7 @@ class RunQueueData: self.runtaskentries[dep].revdeps.add(tid) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Identify tasks at the end of dependency chains # Error on circular dependency loops (length two) @@ -1082,12 +1118,14 @@ class RunQueueData: logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Calculate task weights # Check of higher length circular dependencies self.runq_weight = self.calculate_task_weights(endpoints) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Sanity Check - Check for multiple tasks building the same provider for mc in self.dataCaches: @@ -1188,6 +1226,7 @@ class RunQueueData: self.init_progress_reporter.next_stage() self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Iterate over the task list looking for tasks with a 'setscene' function self.runq_setscene_tids = set() @@ -1200,6 +1239,7 @@ class RunQueueData: self.runq_setscene_tids.add(tid) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Invalidate task if force mode active if self.cooker.configuration.force: @@ -1216,6 +1256,7 @@ class RunQueueData: invalidate_task(fn + ":" + st, True) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) # Create and print to the logs a virtual/xxxx -> PN (fn) table for mc in taskData: @@ -1228,6 +1269,7 @@ class RunQueueData: bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) self.init_progress_reporter.next_stage() + bb.event.check_for_interrupts(self.cooker.data) bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) @@ -1240,6 +1282,7 @@ class RunQueueData: dealtwith.add(tid) todeal.remove(tid) self.prepare_task_hash(tid) + bb.event.check_for_interrupts(self.cooker.data) bb.parse.siggen.writeout_file_checksum_cache() @@ -1292,24 +1335,36 @@ class RunQueue: self.worker = {} self.fakeworker = {} + @staticmethod + def send_pickled_data(worker, data, name): + msg = bytearray() + msg.extend(b"<" + name.encode() + b">") + pickled_data = pickle.dumps(data) + msg.extend(len(pickled_data).to_bytes(4, 'big')) + msg.extend(pickled_data) + msg.extend(b"</" + name.encode() + b">") + worker.stdin.write(msg) + def _start_worker(self, mc, fakeroot = False, rqexec = None): logger.debug("Starting bitbake-worker") magic = "decafbad" if self.cooker.configuration.profile: magic = "decafbadbad" fakerootlogs = None + + workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker") if fakeroot: magic = magic + "beef" mcdata = self.cooker.databuilder.mcdata[mc] fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() env = os.environ.copy() - for key, value in (var.split('=') for var in fakerootenv): + for key, value in (var.split('=',1) for var in fakerootenv): env[key] = value - worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) + worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs else: - worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) + worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) bb.utils.nonblockingfd(worker.stdout) workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs) @@ -1327,9 +1382,9 @@ class RunQueue: "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), } - worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") - worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") - worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") + RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") + RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") + RunQueue.send_pickled_data(worker, workerdata, "workerdata") worker.stdin.flush() return RunQueueWorker(worker, workerpipe) @@ -1339,7 +1394,7 @@ class RunQueue: return logger.debug("Teardown for bitbake-worker") try: - worker.process.stdin.write(b"<quit></quit>") + RunQueue.send_pickled_data(worker.process, b"", "quit") worker.process.stdin.flush() worker.process.stdin.close() except IOError: @@ -1351,12 +1406,12 @@ class RunQueue: continue worker.pipe.close() - def start_worker(self): + def start_worker(self, rqexec): if self.worker: self.teardown_workers() self.teardown = False for mc in self.rqdata.dataCaches: - self.worker[mc] = self._start_worker(mc) + self.worker[mc] = self._start_worker(mc, False, rqexec) def start_fakeworker(self, rqexec, mc): if not mc in self.fakeworker: @@ -1483,6 +1538,7 @@ class RunQueue: """ retval = True + bb.event.check_for_interrupts(self.cooker.data) if self.state is runQueuePrepare: # NOTE: if you add, remove or significantly refactor the stages of this @@ -1515,6 +1571,9 @@ class RunQueue: ('bb.event.HeartbeatEvent',), data=self.cfgData) self.dm_event_handler_registered = True + self.rqdata.init_progress_reporter.next_stage() + self.rqexe = RunQueueExecute(self) + dump = self.cooker.configuration.dump_signatures if dump: self.rqdata.init_progress_reporter.finish() @@ -1526,10 +1585,8 @@ class RunQueue: self.state = runQueueComplete if self.state is runQueueSceneInit: - self.rqdata.init_progress_reporter.next_stage() - self.start_worker() - self.rqdata.init_progress_reporter.next_stage() - self.rqexe = RunQueueExecute(self) + self.start_worker(self.rqexe) + self.rqdata.init_progress_reporter.finish() # If we don't have any setscene functions, skip execution if not self.rqdata.runq_setscene_tids: @@ -1644,6 +1701,17 @@ class RunQueue: return def print_diffscenetasks(self): + def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid): + invalidtasks = [] + for t in taskdepends[task].depends: + if t not in valid and t not in visited_invalid: + invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid)) + visited_invalid.add(t) + + direct_invalid = [t for t in taskdepends[task].depends if t not in valid] + if not direct_invalid and task not in noexec: + invalidtasks = [task] + return invalidtasks noexec = [] tocheck = set() @@ -1677,46 +1745,49 @@ class RunQueue: valid_new.add(dep) invalidtasks = set() - for tid in self.rqdata.runtaskentries: - if tid not in valid_new and tid not in noexec: - invalidtasks.add(tid) - found = set() - processed = set() - for tid in invalidtasks: + toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets]) + for tid in toptasks: toprocess = set([tid]) while toprocess: next = set() + visited_invalid = set() for t in toprocess: - for dep in self.rqdata.runtaskentries[t].depends: - if dep in invalidtasks: - found.add(tid) - if dep not in processed: - processed.add(dep) + if t not in valid_new and t not in noexec: + invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid)) + continue + if t in self.rqdata.runq_setscene_tids: + for dep in self.rqexe.sqdata.sq_deps[t]: next.add(dep) + continue + + for dep in self.rqdata.runtaskentries[t].depends: + next.add(dep) + toprocess = next - if tid in found: - toprocess = set() tasklist = [] - for tid in invalidtasks.difference(found): + for tid in invalidtasks: tasklist.append(tid) if tasklist: bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) - return invalidtasks.difference(found) + return invalidtasks def write_diffscenetasks(self, invalidtasks): + bb.siggen.check_siggen_version(bb.siggen) # Define recursion callback def recursecb(key, hash1, hash2): hashes = [hash1, hash2] + bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes)) hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) + bb.debug(1, "Found hashfiles:\n{}".format(hashfiles)) recout = [] if len(hashfiles) == 2: - out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb) + out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb) recout.extend(list(' ' + l for l in out2)) else: recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) @@ -1727,20 +1798,25 @@ class RunQueue: for tid in invalidtasks: (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - h = self.rqdata.runtaskentries[tid].hash + h = self.rqdata.runtaskentries[tid].unihash + bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname)) matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc]) + bb.debug(1, "Found hashfiles:\n{}".format(matches)) match = None - for m in matches: - if h in m: - match = m + for m in matches.values(): + if h in m['path']: + match = m['path'] if match is None: - bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h) + bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid)) matches = {k : v for k, v in iter(matches.items()) if h not in k} + matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']} + if matches_local: + matches = matches_local if matches: - latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1] + latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path'] prevh = __find_sha256__.search(latestmatch).group(0) output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) - bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) + bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, most recent matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) class RunQueueExecute: @@ -1756,6 +1832,7 @@ class RunQueueExecute: self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU") self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO") self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY") + self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX") self.sq_buildable = set() self.sq_running = set() @@ -1773,6 +1850,8 @@ class RunQueueExecute: self.build_stamps2 = [] self.failed_tids = [] self.sq_deferred = {} + self.sq_needed_harddeps = set() + self.sq_harddep_deferred = set() self.stampcache = {} @@ -1782,11 +1861,6 @@ class RunQueueExecute: self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids)) - for mc in rq.worker: - rq.worker[mc].pipe.setrunqueueexec(self) - for mc in rq.fakeworker: - rq.fakeworker[mc].pipe.setrunqueueexec(self) - if self.number_tasks <= 0: bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) @@ -1812,6 +1886,11 @@ class RunQueueExecute: bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit)) if self.max_memory_pressure > upper_limit: bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) + + if self.max_loadfactor: + self.max_loadfactor = float(self.max_loadfactor) + if self.max_loadfactor <= 0: + bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor)) # List of setscene tasks which we've covered self.scenequeue_covered = set() @@ -1822,11 +1901,6 @@ class RunQueueExecute: self.tasks_notcovered = set() self.scenequeue_notneeded = set() - # We can't skip specified target tasks which aren't setscene tasks - self.cantskip = set(self.rqdata.target_tids) - self.cantskip.difference_update(self.rqdata.runq_setscene_tids) - self.cantskip.intersection_update(self.rqdata.runtaskentries) - schedulers = self.get_schedulers() for scheduler in schedulers: if self.scheduler == scheduler.name: @@ -1839,7 +1913,25 @@ class RunQueueExecute: #if self.rqdata.runq_setscene_tids: self.sqdata = SQData() - build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self) + build_scenequeue_data(self.sqdata, self.rqdata, self) + + update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True) + + # Compute a list of 'stale' sstate tasks where the current hash does not match the one + # in any stamp files. Pass the list out to metadata as an event. + found = {} + for tid in self.rqdata.runq_setscene_tids: + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + stamps = bb.build.find_stale_stamps(taskname, taskfn) + if stamps: + if mc not in found: + found[mc] = {} + found[mc][tid] = stamps + for mc in found: + event = bb.event.StaleSetSceneTasks(found[mc]) + bb.event.fire(event, self.cooker.databuilder.mcdata[mc]) + + self.build_taskdepdata_cache() def runqueue_process_waitpid(self, task, status, fakerootlog=None): @@ -1865,14 +1957,14 @@ class RunQueueExecute: def finish_now(self): for mc in self.rq.worker: try: - self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") self.rq.worker[mc].process.stdin.flush() except IOError: # worker must have died? pass for mc in self.rq.fakeworker: try: - self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") self.rq.fakeworker[mc].process.stdin.flush() except IOError: # worker must have died? @@ -1941,8 +2033,7 @@ class RunQueueExecute: try: module = __import__(modname, fromlist=(name,)) except ImportError as exc: - logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) - raise SystemExit(1) + bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) else: schedulers.add(getattr(module, name)) return schedulers @@ -1972,11 +2063,19 @@ class RunQueueExecute: self.setbuildable(revdep) logger.debug("Marking task %s as buildable", revdep) - for t in self.sq_deferred.copy(): + found = None + for t in sorted(self.sq_deferred.copy()): if self.sq_deferred[t] == task: - logger.debug2("Deferred task %s now buildable" % t) - del self.sq_deferred[t] - update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) + # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task. + # We shouldn't allow all to run at once as it is prone to races. + if not found: + bb.debug(1, "Deferred task %s now buildable" % t) + del self.sq_deferred[t] + update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) + found = t + else: + bb.debug(1, "Deferring %s after %s" % (t, found)) + self.sq_deferred[t] = found def task_complete(self, task): self.stats.taskCompleted() @@ -2081,8 +2180,11 @@ class RunQueueExecute: if not self.sqdone and self.can_start_task(): # Find the next setscene to run for nexttask in self.sorted_setscene_tids: - if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): - if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): + if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred: + if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \ + nexttask not in self.sq_needed_harddeps and \ + self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \ + self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): if nexttask not in self.rqdata.target_tids: logger.debug2("Skipping setscene for task %s" % nexttask) self.sq_task_skip(nexttask) @@ -2090,6 +2192,19 @@ class RunQueueExecute: if nexttask in self.sq_deferred: del self.sq_deferred[nexttask] return True + if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered): + logger.debug2("Deferring %s due to hard dependencies" % nexttask) + updated = False + for dep in self.sqdata.sq_harddeps_rev[nexttask]: + if dep not in self.sq_needed_harddeps: + logger.debug2("Enabling task %s as it is a hard dependency" % dep) + self.sq_buildable.add(dep) + self.sq_needed_harddeps.add(dep) + updated = True + self.sq_harddep_deferred.add(nexttask) + if updated: + return True + continue # If covered tasks are running, need to wait for them to complete for t in self.sqdata.sq_covered_tasks[nexttask]: if t in self.runq_running and t not in self.runq_complete: @@ -2139,6 +2254,7 @@ class RunQueueExecute: bb.event.fire(startevent, self.cfgData) taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] + realfn = bb.cache.virtualfn2realfn(taskfn)[0] runtask = { 'fn' : taskfn, 'task' : task, @@ -2147,6 +2263,7 @@ class RunQueueExecute: 'unihash' : self.rqdata.get_task_unihash(task), 'quieterrors' : True, 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), + 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 'taskdepdata' : self.sq_build_taskdepdata(task), 'dry_run' : False, 'taskdep': taskdep, @@ -2158,10 +2275,10 @@ class RunQueueExecute: if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: if not mc in self.rq.fakeworker: self.rq.start_fakeworker(self, mc) - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) @@ -2232,6 +2349,7 @@ class RunQueueExecute: bb.event.fire(startevent, self.cfgData) taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] + realfn = bb.cache.virtualfn2realfn(taskfn)[0] runtask = { 'fn' : taskfn, 'task' : task, @@ -2240,6 +2358,7 @@ class RunQueueExecute: 'unihash' : self.rqdata.get_task_unihash(task), 'quieterrors' : False, 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), + 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 'taskdepdata' : self.build_taskdepdata(task), 'dry_run' : self.rqdata.setscene_enforce, 'taskdep': taskdep, @@ -2257,10 +2376,10 @@ class RunQueueExecute: self.rq.state = runQueueFailed self.stats.taskFailed() return True - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) @@ -2314,6 +2433,22 @@ class RunQueueExecute: ret.add(dep) return ret + # Build the individual cache entries in advance once to save time + def build_taskdepdata_cache(self): + taskdepdata_cache = {} + for task in self.rqdata.runtaskentries: + (mc, fn, taskname, taskfn) = split_tid_mcfn(task) + pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] + deps = self.rqdata.runtaskentries[task].depends + provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] + taskhash = self.rqdata.runtaskentries[task].hash + unihash = self.rqdata.runtaskentries[task].unihash + deps = self.filtermcdeps(task, mc, deps) + hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] + taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] + + self.taskdepdata_cache = taskdepdata_cache + # We filter out multiconfig dependencies from taskdepdata we pass to the tasks # as most code can't handle them def build_taskdepdata(self, task): @@ -2325,15 +2460,9 @@ class RunQueueExecute: while next: additional = [] for revdep in next: - (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) - pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - deps = self.rqdata.runtaskentries[revdep].depends - provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] - taskhash = self.rqdata.runtaskentries[revdep].hash - unihash = self.rqdata.runtaskentries[revdep].unihash - deps = self.filtermcdeps(task, mc, deps) - taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] - for revdep2 in deps: + self.taskdepdata_cache[revdep][6] = self.rqdata.runtaskentries[revdep].unihash + taskdepdata[revdep] = self.taskdepdata_cache[revdep] + for revdep2 in self.taskdepdata_cache[revdep][3]: if revdep2 not in taskdepdata: additional.append(revdep2) next = additional @@ -2347,7 +2476,7 @@ class RunQueueExecute: return notcovered = set(self.scenequeue_notcovered) - notcovered |= self.cantskip + notcovered |= self.sqdata.cantskip for tid in self.scenequeue_notcovered: notcovered |= self.sqdata.sq_covered_tasks[tid] notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) @@ -2461,9 +2590,9 @@ class RunQueueExecute: if changed: for mc in self.rq.worker: - self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") for mc in self.rq.fakeworker: - self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) @@ -2533,8 +2662,8 @@ class RunQueueExecute: update_tasks2 = [] for tid in update_tasks: harddepfail = False - for t in self.sqdata.sq_harddeps: - if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered: + for t in self.sqdata.sq_harddeps_rev[tid]: + if t in self.scenequeue_notcovered: harddepfail = True break if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): @@ -2566,12 +2695,14 @@ class RunQueueExecute: if changed: self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) + self.sq_needed_harddeps = set() + self.sq_harddep_deferred = set() self.holdoff_need_update = True def scenequeue_updatecounters(self, task, fail=False): - for dep in sorted(self.sqdata.sq_deps[task]): - if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: + if fail and task in self.sqdata.sq_harddeps: + for dep in sorted(self.sqdata.sq_harddeps[task]): if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: # dependency could be already processed, e.g. noexec setscene task continue @@ -2581,6 +2712,7 @@ class RunQueueExecute: logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) self.sq_task_failoutright(dep) continue + for dep in sorted(self.sqdata.sq_deps[task]): if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): if dep not in self.sq_buildable: self.sq_buildable.add(dep) @@ -2599,6 +2731,13 @@ class RunQueueExecute: new.add(dep) next = new + # If this task was one which other setscene tasks have a hard dependency upon, we need + # to walk through the hard dependencies and allow execution of those which have completed dependencies. + if task in self.sqdata.sq_harddeps: + for dep in self.sq_harddep_deferred.copy(): + if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): + self.sq_harddep_deferred.remove(dep) + self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) self.holdoff_need_update = True @@ -2672,7 +2811,8 @@ class RunQueueExecute: provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] taskhash = self.rqdata.runtaskentries[revdep].hash unihash = self.rqdata.runtaskentries[revdep].unihash - taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] + hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] + taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] for revdep2 in deps: if revdep2 not in taskdepdata: additional.append(revdep2) @@ -2716,6 +2856,7 @@ class SQData(object): self.sq_revdeps = {} # Injected inter-setscene task dependencies self.sq_harddeps = {} + self.sq_harddeps_rev = {} # Cache of stamp files so duplicates can't run in parallel self.stamps = {} # Setscene tasks directly depended upon by the build @@ -2725,12 +2866,17 @@ class SQData(object): # A list of normal tasks a setscene task covers self.sq_covered_tasks = {} -def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): +def build_scenequeue_data(sqdata, rqdata, sqrq): sq_revdeps = {} sq_revdeps_squash = {} sq_collated_deps = {} + # We can't skip specified target tasks which aren't setscene tasks + sqdata.cantskip = set(rqdata.target_tids) + sqdata.cantskip.difference_update(rqdata.runq_setscene_tids) + sqdata.cantskip.intersection_update(rqdata.runtaskentries) + # We need to construct a dependency graph for the setscene functions. Intermediate # dependencies between the setscene tasks only complicate the code. This code # therefore aims to collapse the huge runqueue dependency tree into a smaller one @@ -2799,7 +2945,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): for tid in rqdata.runtaskentries: if not rqdata.runtaskentries[tid].revdeps: sqdata.unskippable.add(tid) - sqdata.unskippable |= sqrq.cantskip + sqdata.unskippable |= sqdata.cantskip while new: new = False orig = sqdata.unskippable.copy() @@ -2838,6 +2984,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): idepends = rqdata.taskData[mc].taskentries[realtid].idepends sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) + sqdata.sq_harddeps_rev[tid] = set() for (depname, idependtask) in idepends: if depname not in rqdata.taskData[mc].build_targets: @@ -2850,20 +2997,15 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): if deptid not in rqdata.runtaskentries: bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) + logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid)) + if not deptid in sqdata.sq_harddeps: sqdata.sq_harddeps[deptid] = set() sqdata.sq_harddeps[deptid].add(tid) - - sq_revdeps_squash[tid].add(deptid) - # Have to zero this to avoid circular dependencies - sq_revdeps_squash[deptid] = set() + sqdata.sq_harddeps_rev[tid].add(deptid) rqdata.init_progress_reporter.next_stage() - for task in sqdata.sq_harddeps: - for dep in sqdata.sq_harddeps[task]: - sq_revdeps_squash[dep].add(task) - rqdata.init_progress_reporter.next_stage() #for tid in sq_revdeps_squash: @@ -2890,7 +3032,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): if not sqdata.sq_revdeps[tid]: sqrq.sq_buildable.add(tid) - rqdata.init_progress_reporter.finish() + rqdata.init_progress_reporter.next_stage() sqdata.noexec = set() sqdata.stamppresent = set() @@ -2907,23 +3049,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): sqdata.hashes[h] = tid else: sqrq.sq_deferred[tid] = sqdata.hashes[h] - bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h])) - - update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True) - - # Compute a list of 'stale' sstate tasks where the current hash does not match the one - # in any stamp files. Pass the list out to metadata as an event. - found = {} - for tid in rqdata.runq_setscene_tids: - (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) - stamps = bb.build.find_stale_stamps(taskname, taskfn) - if stamps: - if mc not in found: - found[mc] = {} - found[mc][tid] = stamps - for mc in found: - event = bb.event.StaleSetSceneTasks(found[mc]) - bb.event.fire(event, cooker.databuilder.mcdata[mc]) + bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h])) def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): @@ -3119,15 +3245,12 @@ class runQueuePipe(): if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) - self.queue = b"" + self.queue = bytearray() self.d = d self.rq = rq self.rqexec = rqexec self.fakerootlogs = fakerootlogs - def setrunqueueexec(self, rqexec): - self.rqexec = rqexec - def read(self): for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: for worker in workers.values(): @@ -3138,7 +3261,7 @@ class runQueuePipe(): start = len(self.queue) try: - self.queue = self.queue + (self.input.read(102400) or b"") + self.queue.extend(self.input.read(102400) or b"") except (OSError, IOError) as e: if e.errno != errno.EAGAIN: raise |