aboutsummaryrefslogtreecommitdiffstats
path: root/lib/bb/runqueue.py
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2019-08-14 11:48:25 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2019-08-14 14:45:59 +0100
commit1bf5be46f92f125193638cf41ff207d68f592259 (patch)
treebf71c780ea6cf6988bf34ef9b7b81bb8b4820066 /lib/bb/runqueue.py
parent030b9f2b3ce6ed40e79304eb0ffee6c6613f43be (diff)
downloadbitbake-1bf5be46f92f125193638cf41ff207d68f592259.tar.gz
runqueue: Fix event timing race
The event from the task notifiing of hash equivalency should only be processed when the task completes. This can otherwise result in a race where a dependent task may run before the original task completes causing various failures. To make this work reliably, the code had to be restructured quite a bit. Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'lib/bb/runqueue.py')
-rw-r--r--lib/bb/runqueue.py141
1 files changed, 72 insertions, 69 deletions
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index eb8e34276..a04703c87 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -1696,7 +1696,8 @@ class RunQueueExecute:
self.sq_running = set()
self.sq_live = set()
- self.changed_setscene = set()
+ self.updated_taskhash_queue = []
+ self.pending_migrations = set()
self.runq_buildable = set()
self.runq_running = set()
@@ -1910,8 +1911,8 @@ class RunQueueExecute:
if self.sq_deferred:
logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
err = True
- if self.changed_setscene:
- logger.error("Scenequeue had unprocessed changed entries: %s" % pprint.pformat(self.changed_setscene))
+ if self.updated_taskhash_queue:
+ logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
err = True
if self.holdoff_tasks:
logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
@@ -2023,7 +2024,7 @@ class RunQueueExecute:
if self.can_start_task():
return True
- if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks:
+ if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
logger.info("Setscene tasks completed")
err = self.summarise_scenequeue_errors()
@@ -2177,45 +2178,66 @@ class RunQueueExecute:
#bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
return taskdepdata
- def updated_taskhash(self, tid, unihash):
+ def update_holdofftasks(self):
+ self.holdoff_tasks = set()
+
+ for tid in self.rqdata.runq_setscene_tids:
+ if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
+ self.holdoff_tasks.add(tid)
+
+ for tid in self.holdoff_tasks.copy():
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep not in self.runq_complete:
+ self.holdoff_tasks.add(dep)
+ logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks))
+
+
+ def process_possible_migrations(self):
+
changed = set()
- if unihash != self.rqdata.runtaskentries[tid].unihash:
- logger.info("Task %s unihash changed to %s" % (tid, unihash))
- self.rqdata.runtaskentries[tid].unihash = unihash
- bb.parse.siggen.set_unihash(tid, unihash)
-
- # Work out all tasks which depend on this one
- total = set()
- next = set(self.rqdata.runtaskentries[tid].revdeps)
- while next:
- current = next.copy()
- total = total |next
- next = set()
- for ntid in current:
- next |= self.rqdata.runtaskentries[ntid].revdeps
- next.difference_update(total)
-
- # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
- done = set()
- next = set(self.rqdata.runtaskentries[tid].revdeps)
- while next:
- current = next.copy()
- next = set()
- for tid in current:
- if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
- continue
- procdep = []
- for dep in self.rqdata.runtaskentries[tid].depends:
- procdep.append(dep)
- orighash = self.rqdata.runtaskentries[tid].hash
- self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)])
- origuni = self.rqdata.runtaskentries[tid].unihash
- self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
- logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
- next |= self.rqdata.runtaskentries[tid].revdeps
- changed.add(tid)
- total.remove(tid)
- next.intersection_update(total)
+ for tid, unihash in self.updated_taskhash_queue.copy():
+ if tid in self.runq_running and tid not in self.runq_complete:
+ continue
+
+ self.updated_taskhash_queue.remove((tid, unihash))
+
+ if unihash != self.rqdata.runtaskentries[tid].unihash:
+ logger.info("Task %s unihash changed to %s" % (tid, unihash))
+ self.rqdata.runtaskentries[tid].unihash = unihash
+ bb.parse.siggen.set_unihash(tid, unihash)
+
+ # Work out all tasks which depend on this one
+ total = set()
+ next = set(self.rqdata.runtaskentries[tid].revdeps)
+ while next:
+ current = next.copy()
+ total = total |next
+ next = set()
+ for ntid in current:
+ next |= self.rqdata.runtaskentries[ntid].revdeps
+ next.difference_update(total)
+
+ # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
+ done = set()
+ next = set(self.rqdata.runtaskentries[tid].revdeps)
+ while next:
+ current = next.copy()
+ next = set()
+ for tid in current:
+ if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
+ continue
+ procdep = []
+ for dep in self.rqdata.runtaskentries[tid].depends:
+ procdep.append(dep)
+ orighash = self.rqdata.runtaskentries[tid].hash
+ self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)])
+ origuni = self.rqdata.runtaskentries[tid].unihash
+ self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
+ logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
+ next |= self.rqdata.runtaskentries[tid].revdeps
+ changed.add(tid)
+ total.remove(tid)
+ next.intersection_update(total)
if changed:
for mc in self.rq.worker:
@@ -2223,7 +2245,7 @@ class RunQueueExecute:
for mc in self.rq.fakeworker:
self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
- logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
+ logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
for tid in changed:
if tid not in self.rqdata.runq_setscene_tids:
@@ -2231,31 +2253,12 @@ class RunQueueExecute:
valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False)
if not valid:
continue
- self.changed_setscene.add(tid)
-
- if changed:
- self.update_holdofftasks()
-
- def update_holdofftasks(self):
- self.holdoff_tasks = set()
-
- for tid in self.rqdata.runq_setscene_tids:
- if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
- self.holdoff_tasks.add(tid)
-
- for tid in self.holdoff_tasks.copy():
- for dep in self.sqdata.sq_covered_tasks[tid]:
- if dep not in self.runq_complete:
- self.holdoff_tasks.add(dep)
- logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks))
-
- def process_possible_migrations(self):
- changes = False
- for tid in self.changed_setscene.copy():
if tid in self.runq_running:
- self.changed_setscene.remove(tid)
continue
+ if tid not in self.pending_migrations:
+ self.pending_migrations.add(tid)
+ for tid in self.pending_migrations.copy():
valid = True
# Check no tasks this covers are running
for dep in self.sqdata.sq_covered_tasks[tid]:
@@ -2266,6 +2269,8 @@ class RunQueueExecute:
if not valid:
continue
+ self.pending_migrations.remove(tid)
+
if tid in self.tasks_scenequeue_done:
self.tasks_scenequeue_done.remove(tid)
for dep in self.sqdata.sq_covered_tasks[tid]:
@@ -2296,10 +2301,8 @@ class RunQueueExecute:
logger.info("Setscene task %s now valid and being rerun" % tid)
self.sqdone = False
- self.changed_setscene.remove(tid)
- changes = True
- if changes:
+ if changed:
self.update_holdofftasks()
def scenequeue_updatecounters(self, task, fail=False):
@@ -2854,7 +2857,7 @@ class runQueuePipe():
bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
bb.event.fire_from_worker(event, self.d)
if isinstance(event, taskUniHashUpdate):
- self.rqexec.updated_taskhash(event.taskid, event.unihash)
+ self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
found = True
self.queue = self.queue[index+8:]
index = self.queue.find(b"</event>")