aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/bitbake-worker6
-rw-r--r--lib/bb/runqueue.py165
2 files changed, 161 insertions, 10 deletions
diff --git a/bin/bitbake-worker b/bin/bitbake-worker
index f63f060c5..3e502d5ca 100755
--- a/bin/bitbake-worker
+++ b/bin/bitbake-worker
@@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
the_data.setVar(varname, value)
bb.parse.siggen.set_taskdata(workerdata["sigdata"])
+ if "newhashes" in workerdata:
+ bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
ret = 0
the_data = bb_cache.loadDataFull(fn, appends)
@@ -377,6 +379,7 @@ class BitbakeWorker(object):
self.handle_item(b"cookerconfig", self.handle_cookercfg)
self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
self.handle_item(b"workerdata", self.handle_workerdata)
+ self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
self.handle_item(b"runtask", self.handle_runtask)
self.handle_item(b"finishnow", self.handle_finishnow)
self.handle_item(b"ping", self.handle_ping)
@@ -416,6 +419,9 @@ class BitbakeWorker(object):
for mc in self.databuilder.mcdata:
self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
+ def handle_newtaskhashes(self, data):
+ self.workerdata["newhashes"] = pickle.loads(data)
+
def handle_ping(self, _):
workerlog_write("Handling ping\n")
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 519561c23..11b98f698 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -149,7 +149,7 @@ class RunQueueScheduler(object):
Return the id of the first task we find that is buildable
"""
self.buildable = [x for x in self.buildable if x not in self.rq.runq_running]
- buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)]
+ buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks]
if not buildable:
return None
@@ -206,6 +206,9 @@ class RunQueueScheduler(object):
def newbuildable(self, task):
self.buildable.append(task)
+ def removebuildable(self, task):
+ self.buildable.remove(task)
+
def describe_task(self, taskid):
result = 'ID %s' % taskid
if self.rev_prio_map:
@@ -1719,6 +1722,8 @@ class RunQueueExecute:
self.sq_running = set()
self.sq_live = set()
+ self.changed_setscene = set()
+
self.runq_buildable = set()
self.runq_running = set()
self.runq_complete = set()
@@ -1730,6 +1735,7 @@ class RunQueueExecute:
self.stampcache = {}
+ self.holdoff_tasks = set()
self.sqdone = False
self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
@@ -1925,6 +1931,7 @@ class RunQueueExecute:
"""
self.rq.read_workers()
+ self.process_possible_migrations()
task = None
if not self.sqdone and self.can_start_task():
@@ -2007,7 +2014,7 @@ class RunQueueExecute:
if self.can_start_task():
return True
- if not self.sq_live and not self.sqdone and not self.sq_deferred:
+ if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks:
logger.info("Setscene tasks completed")
logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
@@ -2167,6 +2174,131 @@ class RunQueueExecute:
#bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
return taskdepdata
+ def updated_taskhash(self, tid, unihash):
+ changed = set()
+ if unihash != self.rqdata.runtaskentries[tid].unihash:
+ logger.info("Task %s unihash changed to %s" % (tid, unihash))
+ self.rqdata.runtaskentries[tid].unihash = unihash
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash)
+
+ # Work out all tasks which depend on this one
+ total = set()
+ next = set(self.rqdata.runtaskentries[tid].revdeps)
+ while next:
+ current = next.copy()
+ total = total |next
+ next = set()
+ for ntid in current:
+ next |= self.rqdata.runtaskentries[ntid].revdeps
+ next.difference_update(total)
+
+ # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
+ done = set()
+ next = set(self.rqdata.runtaskentries[tid].revdeps)
+ while next:
+ current = next.copy()
+ next = set()
+ for tid in current:
+ if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
+ continue
+ procdep = []
+ for dep in self.rqdata.runtaskentries[tid].depends:
+ procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep))
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ orighash = self.rqdata.runtaskentries[tid].hash
+ self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc])
+ origuni = self.rqdata.runtaskentries[tid].unihash
+ self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname)
+ logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
+ next |= self.rqdata.runtaskentries[tid].revdeps
+ changed.add(tid)
+ total.remove(tid)
+ next.intersection_update(total)
+
+ if changed:
+ for mc in self.rq.worker:
+ self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ for mc in self.rq.fakeworker:
+ self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+
+ logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
+
+ for tid in changed:
+ if tid not in self.rqdata.runq_setscene_tids:
+ continue
+ valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False)
+ if not valid:
+ continue
+ self.changed_setscene.add(tid)
+
+ if changed:
+ self.update_holdofftasks()
+
+ def update_holdofftasks(self):
+ self.holdoff_tasks = set(self.changed_setscene)
+
+ for tid in self.rqdata.runq_setscene_tids:
+ if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
+ self.holdoff_tasks.add(tid)
+
+ for tid in self.holdoff_tasks.copy():
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep not in self.runq_complete:
+ self.holdoff_tasks.add(dep)
+ logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks))
+
+ def process_possible_migrations(self):
+ changes = False
+ for tid in self.changed_setscene.copy():
+ if tid in self.runq_running:
+ self.changed_setscene.remove(tid)
+ continue
+
+ valid = True
+ # Check no tasks this covers are running
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep in self.runq_running and dep not in self.runq_complete:
+ logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid))
+ valid = False
+ break
+ if not valid:
+ continue
+
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep not in self.runq_complete:
+ if dep in self.tasks_scenequeue_done:
+ self.tasks_scenequeue_done.remove(dep)
+ if dep in self.tasks_notcovered:
+ self.tasks_notcovered.remove(dep)
+
+ if tid in self.sq_buildable:
+ self.sq_buildable.remove(tid)
+ if tid in self.sq_running:
+ self.sq_running.remove(tid)
+ if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
+ if tid not in self.sq_buildable:
+ self.sq_buildable.add(tid)
+
+ if tid in self.sqdata.outrightfail:
+ self.sqdata.outrightfail.remove(tid)
+ if tid in self.scenequeue_notcovered:
+ self.scenequeue_notcovered.remove(tid)
+
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
+
+ if tid in self.build_stamps:
+ del self.build_stamps[tid]
+
+ logger.info("Setscene task %s now valid and being rerun" % tid)
+ self.sqdone = False
+ self.changed_setscene.remove(tid)
+ changes = True
+
+ if changes:
+ self.update_holdofftasks()
+
def scenequeue_process_notcovered(self, task):
if len(self.rqdata.runtaskentries[task].depends) == 0:
self.setbuildable(task)
@@ -2194,7 +2326,7 @@ class RunQueueExecute:
for deptask in self.rqdata.runtaskentries[t].revdeps:
if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids:
continue
- if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done):
+ if deptask in self.sqdata.unskippable:
new.add(deptask)
self.tasks_scenequeue_done.add(deptask)
self.tasks_notcovered.add(deptask)
@@ -2254,8 +2386,9 @@ class RunQueueExecute:
self.tasks_covered.update(covered)
self.coveredtopocess.remove(task)
for tid in covered:
- if len(self.rqdata.runtaskentries[tid].depends) == 0:
+ if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
self.setbuildable(tid)
+ self.update_holdofftasks()
def sq_task_completeoutright(self, task):
"""
@@ -2454,8 +2587,8 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
rqdata.init_progress_reporter.next_stage()
- # Build a list of setscene tasks which are "unskippable"
- # These are direct endpoints referenced by the build
+ # Build a list of tasks which are "unskippable"
+ # These are direct endpoints referenced by the build upto and including setscene tasks
# Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
new = True
for tid in rqdata.runtaskentries:
@@ -2463,18 +2596,19 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
sqdata.unskippable.add(tid)
while new:
new = False
- for tid in sqdata.unskippable.copy():
+ orig = sqdata.unskippable.copy()
+ for tid in orig:
if tid in rqdata.runq_setscene_tids:
continue
- sqdata.unskippable.remove(tid)
if len(rqdata.runtaskentries[tid].depends) == 0:
# These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
sqrq.tasks_notcovered.add(tid)
sqrq.tasks_scenequeue_done.add(tid)
sqrq.setbuildable(tid)
- sqrq.scenequeue_process_unskippable(tid)
+ sqrq.scenequeue_process_unskippable(tid)
sqdata.unskippable |= rqdata.runtaskentries[tid].depends
- new = True
+ if sqdata.unskippable != orig:
+ new = True
rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
@@ -2710,6 +2844,15 @@ class runQueueTaskSkipped(runQueueEvent):
runQueueEvent.__init__(self, task, stats, rq)
self.reason = reason
+class taskUniHashUpdate(bb.event.Event):
+ """
+ Base runQueue event class
+ """
+ def __init__(self, task, unihash):
+ self.taskid = task
+ self.unihash = unihash
+ bb.event.Event.__init__(self)
+
class runQueuePipe():
"""
Abstraction for a pipe between a worker thread and the server
@@ -2752,6 +2895,8 @@ class runQueuePipe():
except ValueError as e:
bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
bb.event.fire_from_worker(event, self.d)
+ if isinstance(event, taskUniHashUpdate):
+ self.rqexec.updated_taskhash(event.taskid, event.unihash)
found = True
self.queue = self.queue[index+8:]
index = self.queue.find(b"</event>")