diff options
-rwxr-xr-x | bitbake/bin/bitbake-worker | 6 | ||||
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 165 |
2 files changed, 161 insertions, 10 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index f63f060c57..3e502d5ca9 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker @@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha the_data.setVar(varname, value) bb.parse.siggen.set_taskdata(workerdata["sigdata"]) + if "newhashes" in workerdata: + bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) ret = 0 the_data = bb_cache.loadDataFull(fn, appends) @@ -377,6 +379,7 @@ class BitbakeWorker(object): self.handle_item(b"cookerconfig", self.handle_cookercfg) self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) self.handle_item(b"workerdata", self.handle_workerdata) + self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) self.handle_item(b"runtask", self.handle_runtask) self.handle_item(b"finishnow", self.handle_finishnow) self.handle_item(b"ping", self.handle_ping) @@ -416,6 +419,9 @@ class BitbakeWorker(object): for mc in self.databuilder.mcdata: self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) + def handle_newtaskhashes(self, data): + self.workerdata["newhashes"] = pickle.loads(data) + def handle_ping(self, _): workerlog_write("Handling ping\n") diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 519561c231..11b98f698d 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -149,7 +149,7 @@ class RunQueueScheduler(object): Return the id of the first task we find that is buildable """ self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] - buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)] + buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks] if not buildable: return None @@ -206,6 +206,9 @@ class RunQueueScheduler(object): def newbuildable(self, task): self.buildable.append(task) + def removebuildable(self, task): + self.buildable.remove(task) + def describe_task(self, taskid): result = 'ID %s' % taskid if self.rev_prio_map: @@ -1719,6 +1722,8 @@ class RunQueueExecute: self.sq_running = set() self.sq_live = set() + self.changed_setscene = set() + self.runq_buildable = set() self.runq_running = set() self.runq_complete = set() @@ -1730,6 +1735,7 @@ class RunQueueExecute: self.stampcache = {} + self.holdoff_tasks = set() self.sqdone = False self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) @@ -1925,6 +1931,7 @@ class RunQueueExecute: """ self.rq.read_workers() + self.process_possible_migrations() task = None if not self.sqdone and self.can_start_task(): @@ -2007,7 +2014,7 @@ class RunQueueExecute: if self.can_start_task(): return True - if not self.sq_live and not self.sqdone and not self.sq_deferred: + if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks: logger.info("Setscene tasks completed") logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) @@ -2167,6 +2174,131 @@ class RunQueueExecute: #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) return taskdepdata + def updated_taskhash(self, tid, unihash): + changed = set() + if unihash != self.rqdata.runtaskentries[tid].unihash: + logger.info("Task %s unihash changed to %s" % (tid, unihash)) + self.rqdata.runtaskentries[tid].unihash = unihash + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash) + + # Work out all tasks which depend on this one + total = set() + next = set(self.rqdata.runtaskentries[tid].revdeps) + while next: + current = next.copy() + total = total |next + next = set() + for ntid in current: + next |= self.rqdata.runtaskentries[ntid].revdeps + next.difference_update(total) + + # Now iterate those tasks in dependency order to regenerate their taskhash/unihash + done = set() + next = set(self.rqdata.runtaskentries[tid].revdeps) + while next: + current = next.copy() + next = set() + for tid in current: + if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): + continue + procdep = [] + for dep in self.rqdata.runtaskentries[tid].depends: + procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep)) + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + orighash = self.rqdata.runtaskentries[tid].hash + self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc]) + origuni = self.rqdata.runtaskentries[tid].unihash + self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname) + logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) + next |= self.rqdata.runtaskentries[tid].revdeps + changed.add(tid) + total.remove(tid) + next.intersection_update(total) + + if changed: + for mc in self.rq.worker: + self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + for mc in self.rq.fakeworker: + self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + + logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) + + for tid in changed: + if tid not in self.rqdata.runq_setscene_tids: + continue + valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False) + if not valid: + continue + self.changed_setscene.add(tid) + + if changed: + self.update_holdofftasks() + + def update_holdofftasks(self): + self.holdoff_tasks = set(self.changed_setscene) + + for tid in self.rqdata.runq_setscene_tids: + if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: + self.holdoff_tasks.add(tid) + + for tid in self.holdoff_tasks.copy(): + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep not in self.runq_complete: + self.holdoff_tasks.add(dep) + logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks)) + + def process_possible_migrations(self): + changes = False + for tid in self.changed_setscene.copy(): + if tid in self.runq_running: + self.changed_setscene.remove(tid) + continue + + valid = True + # Check no tasks this covers are running + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep in self.runq_running and dep not in self.runq_complete: + logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid)) + valid = False + break + if not valid: + continue + + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep not in self.runq_complete: + if dep in self.tasks_scenequeue_done: + self.tasks_scenequeue_done.remove(dep) + if dep in self.tasks_notcovered: + self.tasks_notcovered.remove(dep) + + if tid in self.sq_buildable: + self.sq_buildable.remove(tid) + if tid in self.sq_running: + self.sq_running.remove(tid) + if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): + if tid not in self.sq_buildable: + self.sq_buildable.add(tid) + + if tid in self.sqdata.outrightfail: + self.sqdata.outrightfail.remove(tid) + if tid in self.scenequeue_notcovered: + self.scenequeue_notcovered.remove(tid) + + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) + + if tid in self.build_stamps: + del self.build_stamps[tid] + + logger.info("Setscene task %s now valid and being rerun" % tid) + self.sqdone = False + self.changed_setscene.remove(tid) + changes = True + + if changes: + self.update_holdofftasks() + def scenequeue_process_notcovered(self, task): if len(self.rqdata.runtaskentries[task].depends) == 0: self.setbuildable(task) @@ -2194,7 +2326,7 @@ class RunQueueExecute: for deptask in self.rqdata.runtaskentries[t].revdeps: if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids: continue - if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done): + if deptask in self.sqdata.unskippable: new.add(deptask) self.tasks_scenequeue_done.add(deptask) self.tasks_notcovered.add(deptask) @@ -2254,8 +2386,9 @@ class RunQueueExecute: self.tasks_covered.update(covered) self.coveredtopocess.remove(task) for tid in covered: - if len(self.rqdata.runtaskentries[tid].depends) == 0: + if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): self.setbuildable(tid) + self.update_holdofftasks() def sq_task_completeoutright(self, task): """ @@ -2454,8 +2587,8 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): rqdata.init_progress_reporter.next_stage() - # Build a list of setscene tasks which are "unskippable" - # These are direct endpoints referenced by the build + # Build a list of tasks which are "unskippable" + # These are direct endpoints referenced by the build upto and including setscene tasks # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon new = True for tid in rqdata.runtaskentries: @@ -2463,18 +2596,19 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): sqdata.unskippable.add(tid) while new: new = False - for tid in sqdata.unskippable.copy(): + orig = sqdata.unskippable.copy() + for tid in orig: if tid in rqdata.runq_setscene_tids: continue - sqdata.unskippable.remove(tid) if len(rqdata.runtaskentries[tid].depends) == 0: # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable sqrq.tasks_notcovered.add(tid) sqrq.tasks_scenequeue_done.add(tid) sqrq.setbuildable(tid) - sqrq.scenequeue_process_unskippable(tid) + sqrq.scenequeue_process_unskippable(tid) sqdata.unskippable |= rqdata.runtaskentries[tid].depends - new = True + if sqdata.unskippable != orig: + new = True rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) @@ -2710,6 +2844,15 @@ class runQueueTaskSkipped(runQueueEvent): runQueueEvent.__init__(self, task, stats, rq) self.reason = reason +class taskUniHashUpdate(bb.event.Event): + """ + Base runQueue event class + """ + def __init__(self, task, unihash): + self.taskid = task + self.unihash = unihash + bb.event.Event.__init__(self) + class runQueuePipe(): """ Abstraction for a pipe between a worker thread and the server @@ -2752,6 +2895,8 @@ class runQueuePipe(): except ValueError as e: bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) bb.event.fire_from_worker(event, self.d) + if isinstance(event, taskUniHashUpdate): + self.rqexec.updated_taskhash(event.taskid, event.unihash) found = True self.queue = self.queue[index+8:] index = self.queue.find(b"</event>") |