From 1bf5be46f92f125193638cf41ff207d68f592259 Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Wed, 14 Aug 2019 11:48:25 +0100 Subject: runqueue: Fix event timing race The event from the task notifiing of hash equivalency should only be processed when the task completes. This can otherwise result in a race where a dependent task may run before the original task completes causing various failures. To make this work reliably, the code had to be restructured quite a bit. Signed-off-by: Richard Purdie --- lib/bb/runqueue.py | 141 +++++++++++++++++++++++++++-------------------------- 1 file changed, 72 insertions(+), 69 deletions(-) (limited to 'lib/bb/runqueue.py') diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index eb8e34276..a04703c87 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -1696,7 +1696,8 @@ class RunQueueExecute: self.sq_running = set() self.sq_live = set() - self.changed_setscene = set() + self.updated_taskhash_queue = [] + self.pending_migrations = set() self.runq_buildable = set() self.runq_running = set() @@ -1910,8 +1911,8 @@ class RunQueueExecute: if self.sq_deferred: logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) err = True - if self.changed_setscene: - logger.error("Scenequeue had unprocessed changed entries: %s" % pprint.pformat(self.changed_setscene)) + if self.updated_taskhash_queue: + logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) err = True if self.holdoff_tasks: logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) @@ -2023,7 +2024,7 @@ class RunQueueExecute: if self.can_start_task(): return True - if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks: + if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: logger.info("Setscene tasks completed") err = self.summarise_scenequeue_errors() @@ -2177,45 +2178,66 @@ class RunQueueExecute: #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) return taskdepdata - def updated_taskhash(self, tid, unihash): + def update_holdofftasks(self): + self.holdoff_tasks = set() + + for tid in self.rqdata.runq_setscene_tids: + if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: + self.holdoff_tasks.add(tid) + + for tid in self.holdoff_tasks.copy(): + for dep in self.sqdata.sq_covered_tasks[tid]: + if dep not in self.runq_complete: + self.holdoff_tasks.add(dep) + logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks)) + + + def process_possible_migrations(self): + changed = set() - if unihash != self.rqdata.runtaskentries[tid].unihash: - logger.info("Task %s unihash changed to %s" % (tid, unihash)) - self.rqdata.runtaskentries[tid].unihash = unihash - bb.parse.siggen.set_unihash(tid, unihash) - - # Work out all tasks which depend on this one - total = set() - next = set(self.rqdata.runtaskentries[tid].revdeps) - while next: - current = next.copy() - total = total |next - next = set() - for ntid in current: - next |= self.rqdata.runtaskentries[ntid].revdeps - next.difference_update(total) - - # Now iterate those tasks in dependency order to regenerate their taskhash/unihash - done = set() - next = set(self.rqdata.runtaskentries[tid].revdeps) - while next: - current = next.copy() - next = set() - for tid in current: - if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): - continue - procdep = [] - for dep in self.rqdata.runtaskentries[tid].depends: - procdep.append(dep) - orighash = self.rqdata.runtaskentries[tid].hash - self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)]) - origuni = self.rqdata.runtaskentries[tid].unihash - self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) - logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) - next |= self.rqdata.runtaskentries[tid].revdeps - changed.add(tid) - total.remove(tid) - next.intersection_update(total) + for tid, unihash in self.updated_taskhash_queue.copy(): + if tid in self.runq_running and tid not in self.runq_complete: + continue + + self.updated_taskhash_queue.remove((tid, unihash)) + + if unihash != self.rqdata.runtaskentries[tid].unihash: + logger.info("Task %s unihash changed to %s" % (tid, unihash)) + self.rqdata.runtaskentries[tid].unihash = unihash + bb.parse.siggen.set_unihash(tid, unihash) + + # Work out all tasks which depend on this one + total = set() + next = set(self.rqdata.runtaskentries[tid].revdeps) + while next: + current = next.copy() + total = total |next + next = set() + for ntid in current: + next |= self.rqdata.runtaskentries[ntid].revdeps + next.difference_update(total) + + # Now iterate those tasks in dependency order to regenerate their taskhash/unihash + done = set() + next = set(self.rqdata.runtaskentries[tid].revdeps) + while next: + current = next.copy() + next = set() + for tid in current: + if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): + continue + procdep = [] + for dep in self.rqdata.runtaskentries[tid].depends: + procdep.append(dep) + orighash = self.rqdata.runtaskentries[tid].hash + self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)]) + origuni = self.rqdata.runtaskentries[tid].unihash + self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) + logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) + next |= self.rqdata.runtaskentries[tid].revdeps + changed.add(tid) + total.remove(tid) + next.intersection_update(total) if changed: for mc in self.rq.worker: @@ -2223,7 +2245,7 @@ class RunQueueExecute: for mc in self.rq.fakeworker: self.rq.fakeworker[mc].process.stdin.write(b"" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"") - logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) + logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) for tid in changed: if tid not in self.rqdata.runq_setscene_tids: @@ -2231,31 +2253,12 @@ class RunQueueExecute: valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False) if not valid: continue - self.changed_setscene.add(tid) - - if changed: - self.update_holdofftasks() - - def update_holdofftasks(self): - self.holdoff_tasks = set() - - for tid in self.rqdata.runq_setscene_tids: - if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: - self.holdoff_tasks.add(tid) - - for tid in self.holdoff_tasks.copy(): - for dep in self.sqdata.sq_covered_tasks[tid]: - if dep not in self.runq_complete: - self.holdoff_tasks.add(dep) - logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks)) - - def process_possible_migrations(self): - changes = False - for tid in self.changed_setscene.copy(): if tid in self.runq_running: - self.changed_setscene.remove(tid) continue + if tid not in self.pending_migrations: + self.pending_migrations.add(tid) + for tid in self.pending_migrations.copy(): valid = True # Check no tasks this covers are running for dep in self.sqdata.sq_covered_tasks[tid]: @@ -2266,6 +2269,8 @@ class RunQueueExecute: if not valid: continue + self.pending_migrations.remove(tid) + if tid in self.tasks_scenequeue_done: self.tasks_scenequeue_done.remove(tid) for dep in self.sqdata.sq_covered_tasks[tid]: @@ -2296,10 +2301,8 @@ class RunQueueExecute: logger.info("Setscene task %s now valid and being rerun" % tid) self.sqdone = False - self.changed_setscene.remove(tid) - changes = True - if changes: + if changed: self.update_holdofftasks() def scenequeue_updatecounters(self, task, fail=False): @@ -2854,7 +2857,7 @@ class runQueuePipe(): bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) bb.event.fire_from_worker(event, self.d) if isinstance(event, taskUniHashUpdate): - self.rqexec.updated_taskhash(event.taskid, event.unihash) + self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) found = True self.queue = self.queue[index+8:] index = self.queue.find(b"") -- cgit 1.2.3-korg