aboutsummaryrefslogtreecommitdiffstats
path: root/lib/bb/runqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/bb/runqueue.py')
-rw-r--r--lib/bb/runqueue.py395
1 files changed, 259 insertions, 136 deletions
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index ce711b625..bc7e18175 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -157,7 +157,7 @@ class RunQueueScheduler(object):
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
if tid in self.rq.runq_buildable:
- self.buildable.append(tid)
+ self.buildable.add(tid)
self.rev_prio_map = None
self.is_pressure_usable()
@@ -198,16 +198,38 @@ class RunQueueScheduler(object):
curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
- exceeds_cpu_pressure = self.rq.max_cpu_pressure and (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) > self.rq.max_cpu_pressure
- exceeds_io_pressure = self.rq.max_io_pressure and (float(curr_io_pressure) - float(self.prev_io_pressure)) > self.rq.max_io_pressure
- exceeds_memory_pressure = self.rq.max_memory_pressure and (float(curr_memory_pressure) - float(self.prev_memory_pressure)) > self.rq.max_memory_pressure
now = time.time()
- if now - self.prev_pressure_time > 1.0:
+ tdiff = now - self.prev_pressure_time
+ psi_accumulation_interval = 1.0
+ cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff
+ io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff
+ memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff
+ exceeds_cpu_pressure = self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure
+ exceeds_io_pressure = self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure
+ exceeds_memory_pressure = self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure
+
+ if tdiff > psi_accumulation_interval:
self.prev_cpu_pressure = curr_cpu_pressure
self.prev_io_pressure = curr_io_pressure
self.prev_memory_pressure = curr_memory_pressure
self.prev_pressure_time = now
+
+ pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure)
+ pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure)
+ if hasattr(self, "pressure_state") and pressure_state != self.pressure_state:
+ bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)))
+ self.pressure_state = pressure_state
return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure)
+ elif self.rq.max_loadfactor:
+ limit = False
+ loadfactor = float(os.getloadavg()[0]) / os.cpu_count()
+ # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor))
+ if loadfactor > self.rq.max_loadfactor:
+ limit = True
+ if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit:
+ bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))
+ self.loadfactor_limit = limit
+ return limit
return False
def next_buildable_task(self):
@@ -258,11 +280,11 @@ class RunQueueScheduler(object):
best = None
bestprio = None
for tid in buildable:
- taskname = taskname_from_tid(tid)
- if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
- continue
prio = self.rev_prio_map[tid]
if bestprio is None or bestprio > prio:
+ taskname = taskname_from_tid(tid)
+ if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
+ continue
stamp = self.stamps[tid]
if stamp in self.rq.build_stamps.values():
continue
@@ -655,6 +677,7 @@ class RunQueueData:
self.init_progress_reporter.start()
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Step A - Work out a list of tasks to run
#
@@ -803,6 +826,7 @@ class RunQueueData:
#self.dump_data()
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Resolve recursive 'recrdeptask' dependencies (Part B)
#
@@ -899,6 +923,7 @@ class RunQueueData:
self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
#self.dump_data()
@@ -980,6 +1005,7 @@ class RunQueueData:
mark_active(tid, 1)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Step C - Prune all inactive tasks
#
@@ -988,25 +1014,32 @@ class RunQueueData:
# Handle --runall
if self.cooker.configuration.runall:
# re-run the mark_active and then drop unused tasks from new list
- reduced_tasklist = set(self.runtaskentries.keys())
- for tid in list(self.runtaskentries.keys()):
- if tid not in runq_build:
- reduced_tasklist.remove(tid)
- runq_build = {}
- for task in self.cooker.configuration.runall:
- if not task.startswith("do_"):
- task = "do_{0}".format(task)
- runall_tids = set()
- for tid in reduced_tasklist:
- wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
- if wanttid in self.runtaskentries:
- runall_tids.add(wanttid)
+ runall_tids = set()
+ added = True
+ while added:
+ reduced_tasklist = set(self.runtaskentries.keys())
+ for tid in list(self.runtaskentries.keys()):
+ if tid not in runq_build:
+ reduced_tasklist.remove(tid)
+ runq_build = {}
- for tid in list(runall_tids):
- mark_active(tid, 1)
- if self.cooker.configuration.force:
- invalidate_task(tid, False)
+ orig = runall_tids
+ runall_tids = set()
+ for task in self.cooker.configuration.runall:
+ if not task.startswith("do_"):
+ task = "do_{0}".format(task)
+ for tid in reduced_tasklist:
+ wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
+ if wanttid in self.runtaskentries:
+ runall_tids.add(wanttid)
+
+ for tid in list(runall_tids):
+ mark_active(tid, 1)
+ self.target_tids.append(tid)
+ if self.cooker.configuration.force:
+ invalidate_task(tid, False)
+ added = runall_tids - orig
delcount = set()
for tid in list(self.runtaskentries.keys()):
@@ -1019,6 +1052,7 @@ class RunQueueData:
bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Handle runonly
if self.cooker.configuration.runonly:
@@ -1059,6 +1093,7 @@ class RunQueueData:
logger.verbose("Assign Weightings")
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Generate a list of reverse dependencies to ease future calculations
for tid in self.runtaskentries:
@@ -1066,6 +1101,7 @@ class RunQueueData:
self.runtaskentries[dep].revdeps.add(tid)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Identify tasks at the end of dependency chains
# Error on circular dependency loops (length two)
@@ -1082,12 +1118,14 @@ class RunQueueData:
logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Calculate task weights
# Check of higher length circular dependencies
self.runq_weight = self.calculate_task_weights(endpoints)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Sanity Check - Check for multiple tasks building the same provider
for mc in self.dataCaches:
@@ -1188,6 +1226,7 @@ class RunQueueData:
self.init_progress_reporter.next_stage()
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Iterate over the task list looking for tasks with a 'setscene' function
self.runq_setscene_tids = set()
@@ -1200,6 +1239,7 @@ class RunQueueData:
self.runq_setscene_tids.add(tid)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Invalidate task if force mode active
if self.cooker.configuration.force:
@@ -1216,6 +1256,7 @@ class RunQueueData:
invalidate_task(fn + ":" + st, True)
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
# Create and print to the logs a virtual/xxxx -> PN (fn) table
for mc in taskData:
@@ -1228,6 +1269,7 @@ class RunQueueData:
bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
self.init_progress_reporter.next_stage()
+ bb.event.check_for_interrupts(self.cooker.data)
bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
@@ -1240,6 +1282,7 @@ class RunQueueData:
dealtwith.add(tid)
todeal.remove(tid)
self.prepare_task_hash(tid)
+ bb.event.check_for_interrupts(self.cooker.data)
bb.parse.siggen.writeout_file_checksum_cache()
@@ -1292,24 +1335,36 @@ class RunQueue:
self.worker = {}
self.fakeworker = {}
+ @staticmethod
+ def send_pickled_data(worker, data, name):
+ msg = bytearray()
+ msg.extend(b"<" + name.encode() + b">")
+ pickled_data = pickle.dumps(data)
+ msg.extend(len(pickled_data).to_bytes(4, 'big'))
+ msg.extend(pickled_data)
+ msg.extend(b"</" + name.encode() + b">")
+ worker.stdin.write(msg)
+
def _start_worker(self, mc, fakeroot = False, rqexec = None):
logger.debug("Starting bitbake-worker")
magic = "decafbad"
if self.cooker.configuration.profile:
magic = "decafbadbad"
fakerootlogs = None
+
+ workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker")
if fakeroot:
magic = magic + "beef"
mcdata = self.cooker.databuilder.mcdata[mc]
fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD"))
fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
env = os.environ.copy()
- for key, value in (var.split('=') for var in fakerootenv):
+ for key, value in (var.split('=',1) for var in fakerootenv):
env[key] = value
- worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
+ worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs
else:
- worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
+ worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
bb.utils.nonblockingfd(worker.stdout)
workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs)
@@ -1327,9 +1382,9 @@ class RunQueue:
"umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
}
- worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
- worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
- worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
+ RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
+ RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
+ RunQueue.send_pickled_data(worker, workerdata, "workerdata")
worker.stdin.flush()
return RunQueueWorker(worker, workerpipe)
@@ -1339,7 +1394,7 @@ class RunQueue:
return
logger.debug("Teardown for bitbake-worker")
try:
- worker.process.stdin.write(b"<quit></quit>")
+ RunQueue.send_pickled_data(worker.process, b"", "quit")
worker.process.stdin.flush()
worker.process.stdin.close()
except IOError:
@@ -1351,12 +1406,12 @@ class RunQueue:
continue
worker.pipe.close()
- def start_worker(self):
+ def start_worker(self, rqexec):
if self.worker:
self.teardown_workers()
self.teardown = False
for mc in self.rqdata.dataCaches:
- self.worker[mc] = self._start_worker(mc)
+ self.worker[mc] = self._start_worker(mc, False, rqexec)
def start_fakeworker(self, rqexec, mc):
if not mc in self.fakeworker:
@@ -1483,6 +1538,7 @@ class RunQueue:
"""
retval = True
+ bb.event.check_for_interrupts(self.cooker.data)
if self.state is runQueuePrepare:
# NOTE: if you add, remove or significantly refactor the stages of this
@@ -1515,6 +1571,9 @@ class RunQueue:
('bb.event.HeartbeatEvent',), data=self.cfgData)
self.dm_event_handler_registered = True
+ self.rqdata.init_progress_reporter.next_stage()
+ self.rqexe = RunQueueExecute(self)
+
dump = self.cooker.configuration.dump_signatures
if dump:
self.rqdata.init_progress_reporter.finish()
@@ -1526,10 +1585,8 @@ class RunQueue:
self.state = runQueueComplete
if self.state is runQueueSceneInit:
- self.rqdata.init_progress_reporter.next_stage()
- self.start_worker()
- self.rqdata.init_progress_reporter.next_stage()
- self.rqexe = RunQueueExecute(self)
+ self.start_worker(self.rqexe)
+ self.rqdata.init_progress_reporter.finish()
# If we don't have any setscene functions, skip execution
if not self.rqdata.runq_setscene_tids:
@@ -1644,6 +1701,17 @@ class RunQueue:
return
def print_diffscenetasks(self):
+ def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid):
+ invalidtasks = []
+ for t in taskdepends[task].depends:
+ if t not in valid and t not in visited_invalid:
+ invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid))
+ visited_invalid.add(t)
+
+ direct_invalid = [t for t in taskdepends[task].depends if t not in valid]
+ if not direct_invalid and task not in noexec:
+ invalidtasks = [task]
+ return invalidtasks
noexec = []
tocheck = set()
@@ -1677,46 +1745,49 @@ class RunQueue:
valid_new.add(dep)
invalidtasks = set()
- for tid in self.rqdata.runtaskentries:
- if tid not in valid_new and tid not in noexec:
- invalidtasks.add(tid)
- found = set()
- processed = set()
- for tid in invalidtasks:
+ toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets])
+ for tid in toptasks:
toprocess = set([tid])
while toprocess:
next = set()
+ visited_invalid = set()
for t in toprocess:
- for dep in self.rqdata.runtaskentries[t].depends:
- if dep in invalidtasks:
- found.add(tid)
- if dep not in processed:
- processed.add(dep)
+ if t not in valid_new and t not in noexec:
+ invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid))
+ continue
+ if t in self.rqdata.runq_setscene_tids:
+ for dep in self.rqexe.sqdata.sq_deps[t]:
next.add(dep)
+ continue
+
+ for dep in self.rqdata.runtaskentries[t].depends:
+ next.add(dep)
+
toprocess = next
- if tid in found:
- toprocess = set()
tasklist = []
- for tid in invalidtasks.difference(found):
+ for tid in invalidtasks:
tasklist.append(tid)
if tasklist:
bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
- return invalidtasks.difference(found)
+ return invalidtasks
def write_diffscenetasks(self, invalidtasks):
+ bb.siggen.check_siggen_version(bb.siggen)
# Define recursion callback
def recursecb(key, hash1, hash2):
hashes = [hash1, hash2]
+ bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes))
hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
+ bb.debug(1, "Found hashfiles:\n{}".format(hashfiles))
recout = []
if len(hashfiles) == 2:
- out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
+ out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb)
recout.extend(list(' ' + l for l in out2))
else:
recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
@@ -1727,20 +1798,25 @@ class RunQueue:
for tid in invalidtasks:
(mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
- h = self.rqdata.runtaskentries[tid].hash
+ h = self.rqdata.runtaskentries[tid].unihash
+ bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname))
matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc])
+ bb.debug(1, "Found hashfiles:\n{}".format(matches))
match = None
- for m in matches:
- if h in m:
- match = m
+ for m in matches.values():
+ if h in m['path']:
+ match = m['path']
if match is None:
- bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
+ bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid))
matches = {k : v for k, v in iter(matches.items()) if h not in k}
+ matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']}
+ if matches_local:
+ matches = matches_local
if matches:
- latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
+ latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path']
prevh = __find_sha256__.search(latestmatch).group(0)
output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
- bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
+ bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, most recent matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
class RunQueueExecute:
@@ -1756,6 +1832,7 @@ class RunQueueExecute:
self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU")
self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO")
self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY")
+ self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX")
self.sq_buildable = set()
self.sq_running = set()
@@ -1773,6 +1850,8 @@ class RunQueueExecute:
self.build_stamps2 = []
self.failed_tids = []
self.sq_deferred = {}
+ self.sq_needed_harddeps = set()
+ self.sq_harddep_deferred = set()
self.stampcache = {}
@@ -1782,11 +1861,6 @@ class RunQueueExecute:
self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids))
- for mc in rq.worker:
- rq.worker[mc].pipe.setrunqueueexec(self)
- for mc in rq.fakeworker:
- rq.fakeworker[mc].pipe.setrunqueueexec(self)
-
if self.number_tasks <= 0:
bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
@@ -1812,6 +1886,11 @@ class RunQueueExecute:
bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit))
if self.max_memory_pressure > upper_limit:
bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
+
+ if self.max_loadfactor:
+ self.max_loadfactor = float(self.max_loadfactor)
+ if self.max_loadfactor <= 0:
+ bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor))
# List of setscene tasks which we've covered
self.scenequeue_covered = set()
@@ -1822,11 +1901,6 @@ class RunQueueExecute:
self.tasks_notcovered = set()
self.scenequeue_notneeded = set()
- # We can't skip specified target tasks which aren't setscene tasks
- self.cantskip = set(self.rqdata.target_tids)
- self.cantskip.difference_update(self.rqdata.runq_setscene_tids)
- self.cantskip.intersection_update(self.rqdata.runtaskentries)
-
schedulers = self.get_schedulers()
for scheduler in schedulers:
if self.scheduler == scheduler.name:
@@ -1839,7 +1913,25 @@ class RunQueueExecute:
#if self.rqdata.runq_setscene_tids:
self.sqdata = SQData()
- build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
+ build_scenequeue_data(self.sqdata, self.rqdata, self)
+
+ update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True)
+
+ # Compute a list of 'stale' sstate tasks where the current hash does not match the one
+ # in any stamp files. Pass the list out to metadata as an event.
+ found = {}
+ for tid in self.rqdata.runq_setscene_tids:
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
+ stamps = bb.build.find_stale_stamps(taskname, taskfn)
+ if stamps:
+ if mc not in found:
+ found[mc] = {}
+ found[mc][tid] = stamps
+ for mc in found:
+ event = bb.event.StaleSetSceneTasks(found[mc])
+ bb.event.fire(event, self.cooker.databuilder.mcdata[mc])
+
+ self.build_taskdepdata_cache()
def runqueue_process_waitpid(self, task, status, fakerootlog=None):
@@ -1865,14 +1957,14 @@ class RunQueueExecute:
def finish_now(self):
for mc in self.rq.worker:
try:
- self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
self.rq.worker[mc].process.stdin.flush()
except IOError:
# worker must have died?
pass
for mc in self.rq.fakeworker:
try:
- self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
self.rq.fakeworker[mc].process.stdin.flush()
except IOError:
# worker must have died?
@@ -1941,8 +2033,7 @@ class RunQueueExecute:
try:
module = __import__(modname, fromlist=(name,))
except ImportError as exc:
- logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
- raise SystemExit(1)
+ bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
else:
schedulers.add(getattr(module, name))
return schedulers
@@ -1972,11 +2063,19 @@ class RunQueueExecute:
self.setbuildable(revdep)
logger.debug("Marking task %s as buildable", revdep)
- for t in self.sq_deferred.copy():
+ found = None
+ for t in sorted(self.sq_deferred.copy()):
if self.sq_deferred[t] == task:
- logger.debug2("Deferred task %s now buildable" % t)
- del self.sq_deferred[t]
- update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
+ # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task.
+ # We shouldn't allow all to run at once as it is prone to races.
+ if not found:
+ bb.debug(1, "Deferred task %s now buildable" % t)
+ del self.sq_deferred[t]
+ update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
+ found = t
+ else:
+ bb.debug(1, "Deferring %s after %s" % (t, found))
+ self.sq_deferred[t] = found
def task_complete(self, task):
self.stats.taskCompleted()
@@ -2081,8 +2180,11 @@ class RunQueueExecute:
if not self.sqdone and self.can_start_task():
# Find the next setscene to run
for nexttask in self.sorted_setscene_tids:
- if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values():
- if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
+ if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred:
+ if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \
+ nexttask not in self.sq_needed_harddeps and \
+ self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \
+ self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
if nexttask not in self.rqdata.target_tids:
logger.debug2("Skipping setscene for task %s" % nexttask)
self.sq_task_skip(nexttask)
@@ -2090,6 +2192,19 @@ class RunQueueExecute:
if nexttask in self.sq_deferred:
del self.sq_deferred[nexttask]
return True
+ if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
+ logger.debug2("Deferring %s due to hard dependencies" % nexttask)
+ updated = False
+ for dep in self.sqdata.sq_harddeps_rev[nexttask]:
+ if dep not in self.sq_needed_harddeps:
+ logger.debug2("Enabling task %s as it is a hard dependency" % dep)
+ self.sq_buildable.add(dep)
+ self.sq_needed_harddeps.add(dep)
+ updated = True
+ self.sq_harddep_deferred.add(nexttask)
+ if updated:
+ return True
+ continue
# If covered tasks are running, need to wait for them to complete
for t in self.sqdata.sq_covered_tasks[nexttask]:
if t in self.runq_running and t not in self.runq_complete:
@@ -2139,6 +2254,7 @@ class RunQueueExecute:
bb.event.fire(startevent, self.cfgData)
taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
+ realfn = bb.cache.virtualfn2realfn(taskfn)[0]
runtask = {
'fn' : taskfn,
'task' : task,
@@ -2147,6 +2263,7 @@ class RunQueueExecute:
'unihash' : self.rqdata.get_task_unihash(task),
'quieterrors' : True,
'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
+ 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
'taskdepdata' : self.sq_build_taskdepdata(task),
'dry_run' : False,
'taskdep': taskdep,
@@ -2158,10 +2275,10 @@ class RunQueueExecute:
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
if not mc in self.rq.fakeworker:
self.rq.start_fakeworker(self, mc)
- self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
self.rq.fakeworker[mc].process.stdin.flush()
else:
- self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
self.rq.worker[mc].process.stdin.flush()
self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2232,6 +2349,7 @@ class RunQueueExecute:
bb.event.fire(startevent, self.cfgData)
taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
+ realfn = bb.cache.virtualfn2realfn(taskfn)[0]
runtask = {
'fn' : taskfn,
'task' : task,
@@ -2240,6 +2358,7 @@ class RunQueueExecute:
'unihash' : self.rqdata.get_task_unihash(task),
'quieterrors' : False,
'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
+ 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
'taskdepdata' : self.build_taskdepdata(task),
'dry_run' : self.rqdata.setscene_enforce,
'taskdep': taskdep,
@@ -2257,10 +2376,10 @@ class RunQueueExecute:
self.rq.state = runQueueFailed
self.stats.taskFailed()
return True
- self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
self.rq.fakeworker[mc].process.stdin.flush()
else:
- self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
self.rq.worker[mc].process.stdin.flush()
self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2314,6 +2433,22 @@ class RunQueueExecute:
ret.add(dep)
return ret
+ # Build the individual cache entries in advance once to save time
+ def build_taskdepdata_cache(self):
+ taskdepdata_cache = {}
+ for task in self.rqdata.runtaskentries:
+ (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
+ pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
+ deps = self.rqdata.runtaskentries[task].depends
+ provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
+ taskhash = self.rqdata.runtaskentries[task].hash
+ unihash = self.rqdata.runtaskentries[task].unihash
+ deps = self.filtermcdeps(task, mc, deps)
+ hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
+ taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
+
+ self.taskdepdata_cache = taskdepdata_cache
+
# We filter out multiconfig dependencies from taskdepdata we pass to the tasks
# as most code can't handle them
def build_taskdepdata(self, task):
@@ -2325,15 +2460,9 @@ class RunQueueExecute:
while next:
additional = []
for revdep in next:
- (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
- pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
- deps = self.rqdata.runtaskentries[revdep].depends
- provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
- taskhash = self.rqdata.runtaskentries[revdep].hash
- unihash = self.rqdata.runtaskentries[revdep].unihash
- deps = self.filtermcdeps(task, mc, deps)
- taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
- for revdep2 in deps:
+ self.taskdepdata_cache[revdep][6] = self.rqdata.runtaskentries[revdep].unihash
+ taskdepdata[revdep] = self.taskdepdata_cache[revdep]
+ for revdep2 in self.taskdepdata_cache[revdep][3]:
if revdep2 not in taskdepdata:
additional.append(revdep2)
next = additional
@@ -2347,7 +2476,7 @@ class RunQueueExecute:
return
notcovered = set(self.scenequeue_notcovered)
- notcovered |= self.cantskip
+ notcovered |= self.sqdata.cantskip
for tid in self.scenequeue_notcovered:
notcovered |= self.sqdata.sq_covered_tasks[tid]
notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids)
@@ -2461,9 +2590,9 @@ class RunQueueExecute:
if changed:
for mc in self.rq.worker:
- self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
for mc in self.rq.fakeworker:
- self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
@@ -2533,8 +2662,8 @@ class RunQueueExecute:
update_tasks2 = []
for tid in update_tasks:
harddepfail = False
- for t in self.sqdata.sq_harddeps:
- if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered:
+ for t in self.sqdata.sq_harddeps_rev[tid]:
+ if t in self.scenequeue_notcovered:
harddepfail = True
break
if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
@@ -2566,12 +2695,14 @@ class RunQueueExecute:
if changed:
self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
+ self.sq_needed_harddeps = set()
+ self.sq_harddep_deferred = set()
self.holdoff_need_update = True
def scenequeue_updatecounters(self, task, fail=False):
- for dep in sorted(self.sqdata.sq_deps[task]):
- if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]:
+ if fail and task in self.sqdata.sq_harddeps:
+ for dep in sorted(self.sqdata.sq_harddeps[task]):
if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered:
# dependency could be already processed, e.g. noexec setscene task
continue
@@ -2581,6 +2712,7 @@ class RunQueueExecute:
logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
self.sq_task_failoutright(dep)
continue
+ for dep in sorted(self.sqdata.sq_deps[task]):
if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
if dep not in self.sq_buildable:
self.sq_buildable.add(dep)
@@ -2599,6 +2731,13 @@ class RunQueueExecute:
new.add(dep)
next = new
+ # If this task was one which other setscene tasks have a hard dependency upon, we need
+ # to walk through the hard dependencies and allow execution of those which have completed dependencies.
+ if task in self.sqdata.sq_harddeps:
+ for dep in self.sq_harddep_deferred.copy():
+ if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
+ self.sq_harddep_deferred.remove(dep)
+
self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
self.holdoff_need_update = True
@@ -2672,7 +2811,8 @@ class RunQueueExecute:
provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
taskhash = self.rqdata.runtaskentries[revdep].hash
unihash = self.rqdata.runtaskentries[revdep].unihash
- taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
+ hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
+ taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
for revdep2 in deps:
if revdep2 not in taskdepdata:
additional.append(revdep2)
@@ -2716,6 +2856,7 @@ class SQData(object):
self.sq_revdeps = {}
# Injected inter-setscene task dependencies
self.sq_harddeps = {}
+ self.sq_harddeps_rev = {}
# Cache of stamp files so duplicates can't run in parallel
self.stamps = {}
# Setscene tasks directly depended upon by the build
@@ -2725,12 +2866,17 @@ class SQData(object):
# A list of normal tasks a setscene task covers
self.sq_covered_tasks = {}
-def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
+def build_scenequeue_data(sqdata, rqdata, sqrq):
sq_revdeps = {}
sq_revdeps_squash = {}
sq_collated_deps = {}
+ # We can't skip specified target tasks which aren't setscene tasks
+ sqdata.cantskip = set(rqdata.target_tids)
+ sqdata.cantskip.difference_update(rqdata.runq_setscene_tids)
+ sqdata.cantskip.intersection_update(rqdata.runtaskentries)
+
# We need to construct a dependency graph for the setscene functions. Intermediate
# dependencies between the setscene tasks only complicate the code. This code
# therefore aims to collapse the huge runqueue dependency tree into a smaller one
@@ -2799,7 +2945,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
for tid in rqdata.runtaskentries:
if not rqdata.runtaskentries[tid].revdeps:
sqdata.unskippable.add(tid)
- sqdata.unskippable |= sqrq.cantskip
+ sqdata.unskippable |= sqdata.cantskip
while new:
new = False
orig = sqdata.unskippable.copy()
@@ -2838,6 +2984,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
idepends = rqdata.taskData[mc].taskentries[realtid].idepends
sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
+ sqdata.sq_harddeps_rev[tid] = set()
for (depname, idependtask) in idepends:
if depname not in rqdata.taskData[mc].build_targets:
@@ -2850,20 +2997,15 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
if deptid not in rqdata.runtaskentries:
bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
+ logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid))
+
if not deptid in sqdata.sq_harddeps:
sqdata.sq_harddeps[deptid] = set()
sqdata.sq_harddeps[deptid].add(tid)
-
- sq_revdeps_squash[tid].add(deptid)
- # Have to zero this to avoid circular dependencies
- sq_revdeps_squash[deptid] = set()
+ sqdata.sq_harddeps_rev[tid].add(deptid)
rqdata.init_progress_reporter.next_stage()
- for task in sqdata.sq_harddeps:
- for dep in sqdata.sq_harddeps[task]:
- sq_revdeps_squash[dep].add(task)
-
rqdata.init_progress_reporter.next_stage()
#for tid in sq_revdeps_squash:
@@ -2890,7 +3032,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
if not sqdata.sq_revdeps[tid]:
sqrq.sq_buildable.add(tid)
- rqdata.init_progress_reporter.finish()
+ rqdata.init_progress_reporter.next_stage()
sqdata.noexec = set()
sqdata.stamppresent = set()
@@ -2907,23 +3049,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
sqdata.hashes[h] = tid
else:
sqrq.sq_deferred[tid] = sqdata.hashes[h]
- bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h]))
-
- update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True)
-
- # Compute a list of 'stale' sstate tasks where the current hash does not match the one
- # in any stamp files. Pass the list out to metadata as an event.
- found = {}
- for tid in rqdata.runq_setscene_tids:
- (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
- stamps = bb.build.find_stale_stamps(taskname, taskfn)
- if stamps:
- if mc not in found:
- found[mc] = {}
- found[mc][tid] = stamps
- for mc in found:
- event = bb.event.StaleSetSceneTasks(found[mc])
- bb.event.fire(event, cooker.databuilder.mcdata[mc])
+ bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h]))
def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
@@ -3119,15 +3245,12 @@ class runQueuePipe():
if pipeout:
pipeout.close()
bb.utils.nonblockingfd(self.input)
- self.queue = b""
+ self.queue = bytearray()
self.d = d
self.rq = rq
self.rqexec = rqexec
self.fakerootlogs = fakerootlogs
- def setrunqueueexec(self, rqexec):
- self.rqexec = rqexec
-
def read(self):
for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
for worker in workers.values():
@@ -3138,7 +3261,7 @@ class runQueuePipe():
start = len(self.queue)
try:
- self.queue = self.queue + (self.input.read(102400) or b"")
+ self.queue.extend(self.input.read(102400) or b"")
except (OSError, IOError) as e:
if e.errno != errno.EAGAIN:
raise