summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2019-07-23 22:51:15 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2019-08-06 11:21:31 +0100
commit7df31ff36892c2f9c65326b06b4c7093b1462f54 (patch)
tree7992a608c29625c40fc1701537bf185dd93f8aec
parent40eb5b344b4de5310a89e36024b826fc99484747 (diff)
downloadopenembedded-core-contrib-7df31ff36892c2f9c65326b06b4c7093b1462f54.tar.gz
bitbake: runqueue: Enable dynamic task adjustment to hash equivalency
There is a compelling usecase for tasks being able to notify runqueue that their "unihash" has changed. When this is recieved, the hashes of all subsequent tasks should be recomputed and their new hashes checked against existing setscene validity. Any newly available setscene tasks should then be executed. Making this work effectively needs several pieces. An event is added which the cooker listen for. If a new hash becomes available it can send an event to notify of this. When such an event is seen, hash recomputations are made. A setscene task can't be run until all the tasks it "covers" are stopped. The notion of "holdoff" tasks is therefore added, these are removed from the buildable list with the assumption that some setscene task will run and cover them. The workers need to be notified when taskhashes change to update their own internal siggen data stores. A new worker command is added to do this which will affect all newly spawned worker processes from that worker. An example workflow which tests this code is: Configuration: BB_SIGNATURE_HANDLER = "OEEquivHash" SSTATE_HASHEQUIV_SERVER = "http://localhost:8686" $ bitbake-hashserv & $ bitbake automake-native $ bitbake autoconf-native automake-native -c clean $ bitbake m4-native -c install -f $ bitbake automake-native with the test being whether automake-native is installed from sstate. (Bitbake rev: 1f630fdf0260db08541d3ca9f25f852931c19905) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rwxr-xr-xbitbake/bin/bitbake-worker6
-rw-r--r--bitbake/lib/bb/runqueue.py165
2 files changed, 161 insertions, 10 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
index f63f060c57..3e502d5ca9 100755
--- a/bitbake/bin/bitbake-worker
+++ b/bitbake/bin/bitbake-worker
@@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
the_data.setVar(varname, value)
bb.parse.siggen.set_taskdata(workerdata["sigdata"])
+ if "newhashes" in workerdata:
+ bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
ret = 0
the_data = bb_cache.loadDataFull(fn, appends)
@@ -377,6 +379,7 @@ class BitbakeWorker(object):
self.handle_item(b"cookerconfig", self.handle_cookercfg)
self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
self.handle_item(b"workerdata", self.handle_workerdata)
+ self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
self.handle_item(b"runtask", self.handle_runtask)
self.handle_item(b"finishnow", self.handle_finishnow)
self.handle_item(b"ping", self.handle_ping)
@@ -416,6 +419,9 @@ class BitbakeWorker(object):
for mc in self.databuilder.mcdata:
self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
+ def handle_newtaskhashes(self, data):
+ self.workerdata["newhashes"] = pickle.loads(data)
+
def handle_ping(self, _):
workerlog_write("Handling ping\n")
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 519561c231..11b98f698d 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -149,7 +149,7 @@ class RunQueueScheduler(object):
Return the id of the first task we find that is buildable
"""
self.buildable = [x for x in self.buildable if x not in self.rq.runq_running]
- buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)]
+ buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks]
if not buildable:
return None
@@ -206,6 +206,9 @@ class RunQueueScheduler(object):
def newbuildable(self, task):
self.buildable.append(task)
+ def removebuildable(self, task):
+ self.buildable.remove(task)
+
def describe_task(self, taskid):
result = 'ID %s' % taskid
if self.rev_prio_map:
@@ -1719,6 +1722,8 @@ class RunQueueExecute:
self.sq_running = set()
self.sq_live = set()
+ self.changed_setscene = set()
+
self.runq_buildable = set()
self.runq_running = set()
self.runq_complete = set()
@@ -1730,6 +1735,7 @@ class RunQueueExecute:
self.stampcache = {}
+ self.holdoff_tasks = set()
self.sqdone = False
self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
@@ -1925,6 +1931,7 @@ class RunQueueExecute:
"""
self.rq.read_workers()
+ self.process_possible_migrations()
task = None
if not self.sqdone and self.can_start_task():
@@ -2007,7 +2014,7 @@ class RunQueueExecute:
if self.can_start_task():
return True
- if not self.sq_live and not self.sqdone and not self.sq_deferred:
+ if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks:
logger.info("Setscene tasks completed")
logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
@@ -2167,6 +2174,131 @@ class RunQueueExecute:
#bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
return taskdepdata
+ def updated_taskhash(self, tid, unihash):
+ changed = set()
+ if unihash != self.rqdata.runtaskentries[tid].unihash:
+ logger.info("Task %s unihash changed to %s" % (tid, unihash))
+ self.rqdata.runtaskentries[tid].unihash = unihash
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash)
+
+ # Work out all tasks which depend on this one
+ total = set()
+ next = set(self.rqdata.runtaskentries[tid].revdeps)
+ while next:
+ current = next.copy()
+ total = total |next
+ next = set()
+ for ntid in current:
+ next |= self.rqdata.runtaskentries[ntid].revdeps
+ next.difference_update(total)
+
+ # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
+ done = set()
+ next = set(self.rqdata.runtaskentries[tid].revdeps)
+ while next:
+ current = next.copy()
+ next = set()
+ for tid in current:
+ if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
+ continue
+ procdep = []
+ for dep in self.rqdata.runtaskentries[tid].depends:
+ procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep))
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ orighash = self.rqdata.runtaskentries[tid].hash
+ self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc])
+ origuni = self.rqdata.runtaskentries[tid].unihash
+ self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname)
+ logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
+ next |= self.rqdata.runtaskentries[tid].revdeps
+ changed.add(tid)
+ total.remove(tid)
+ next.intersection_update(total)
+
+ if changed:
+ for mc in self.rq.worker:
+ self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ for mc in self.rq.fakeworker:
+ self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+
+ logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
+
+ for tid in changed:
+ if tid not in self.rqdata.runq_setscene_tids:
+ continue
+ valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False)
+ if not valid:
+ continue
+ self.changed_setscene.add(tid)
+
+ if changed:
+ self.update_holdofftasks()
+
+ def update_holdofftasks(self):
+ self.holdoff_tasks = set(self.changed_setscene)
+
+ for tid in self.rqdata.runq_setscene_tids:
+ if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
+ self.holdoff_tasks.add(tid)
+
+ for tid in self.holdoff_tasks.copy():
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep not in self.runq_complete:
+ self.holdoff_tasks.add(dep)
+ logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks))
+
+ def process_possible_migrations(self):
+ changes = False
+ for tid in self.changed_setscene.copy():
+ if tid in self.runq_running:
+ self.changed_setscene.remove(tid)
+ continue
+
+ valid = True
+ # Check no tasks this covers are running
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep in self.runq_running and dep not in self.runq_complete:
+ logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid))
+ valid = False
+ break
+ if not valid:
+ continue
+
+ for dep in self.sqdata.sq_covered_tasks[tid]:
+ if dep not in self.runq_complete:
+ if dep in self.tasks_scenequeue_done:
+ self.tasks_scenequeue_done.remove(dep)
+ if dep in self.tasks_notcovered:
+ self.tasks_notcovered.remove(dep)
+
+ if tid in self.sq_buildable:
+ self.sq_buildable.remove(tid)
+ if tid in self.sq_running:
+ self.sq_running.remove(tid)
+ if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
+ if tid not in self.sq_buildable:
+ self.sq_buildable.add(tid)
+
+ if tid in self.sqdata.outrightfail:
+ self.sqdata.outrightfail.remove(tid)
+ if tid in self.scenequeue_notcovered:
+ self.scenequeue_notcovered.remove(tid)
+
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
+
+ if tid in self.build_stamps:
+ del self.build_stamps[tid]
+
+ logger.info("Setscene task %s now valid and being rerun" % tid)
+ self.sqdone = False
+ self.changed_setscene.remove(tid)
+ changes = True
+
+ if changes:
+ self.update_holdofftasks()
+
def scenequeue_process_notcovered(self, task):
if len(self.rqdata.runtaskentries[task].depends) == 0:
self.setbuildable(task)
@@ -2194,7 +2326,7 @@ class RunQueueExecute:
for deptask in self.rqdata.runtaskentries[t].revdeps:
if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids:
continue
- if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done):
+ if deptask in self.sqdata.unskippable:
new.add(deptask)
self.tasks_scenequeue_done.add(deptask)
self.tasks_notcovered.add(deptask)
@@ -2254,8 +2386,9 @@ class RunQueueExecute:
self.tasks_covered.update(covered)
self.coveredtopocess.remove(task)
for tid in covered:
- if len(self.rqdata.runtaskentries[tid].depends) == 0:
+ if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
self.setbuildable(tid)
+ self.update_holdofftasks()
def sq_task_completeoutright(self, task):
"""
@@ -2454,8 +2587,8 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
rqdata.init_progress_reporter.next_stage()
- # Build a list of setscene tasks which are "unskippable"
- # These are direct endpoints referenced by the build
+ # Build a list of tasks which are "unskippable"
+ # These are direct endpoints referenced by the build upto and including setscene tasks
# Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
new = True
for tid in rqdata.runtaskentries:
@@ -2463,18 +2596,19 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
sqdata.unskippable.add(tid)
while new:
new = False
- for tid in sqdata.unskippable.copy():
+ orig = sqdata.unskippable.copy()
+ for tid in orig:
if tid in rqdata.runq_setscene_tids:
continue
- sqdata.unskippable.remove(tid)
if len(rqdata.runtaskentries[tid].depends) == 0:
# These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
sqrq.tasks_notcovered.add(tid)
sqrq.tasks_scenequeue_done.add(tid)
sqrq.setbuildable(tid)
- sqrq.scenequeue_process_unskippable(tid)
+ sqrq.scenequeue_process_unskippable(tid)
sqdata.unskippable |= rqdata.runtaskentries[tid].depends
- new = True
+ if sqdata.unskippable != orig:
+ new = True
rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
@@ -2710,6 +2844,15 @@ class runQueueTaskSkipped(runQueueEvent):
runQueueEvent.__init__(self, task, stats, rq)
self.reason = reason
+class taskUniHashUpdate(bb.event.Event):
+ """
+ Base runQueue event class
+ """
+ def __init__(self, task, unihash):
+ self.taskid = task
+ self.unihash = unihash
+ bb.event.Event.__init__(self)
+
class runQueuePipe():
"""
Abstraction for a pipe between a worker thread and the server
@@ -2752,6 +2895,8 @@ class runQueuePipe():
except ValueError as e:
bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
bb.event.fire_from_worker(event, self.d)
+ if isinstance(event, taskUniHashUpdate):
+ self.rqexec.updated_taskhash(event.taskid, event.unihash)
found = True
self.queue = self.queue[index+8:]
index = self.queue.find(b"</event>")