aboutsummaryrefslogtreecommitdiffstats
path: root/lib/bb/runqueue.py
diff options
context:
space:
mode:
authorRichard Purdie <rpurdie@linux.intel.com>2008-01-06 00:05:09 +0000
committerRichard Purdie <rpurdie@linux.intel.com>2008-01-06 00:05:09 +0000
commit223e4cbbaa8a0ea96fba850b8d6f7cb938794ef0 (patch)
tree614d1f95709f60324563143427b89c4a6b5e567e /lib/bb/runqueue.py
parent2dcf122a9a157f002d053411bc10e219b679658a (diff)
downloadbitbake-223e4cbbaa8a0ea96fba850b8d6f7cb938794ef0.tar.gz
runqueue.py: Add task scheduler abstraction and some example schedulers. Improve circular dependency chain debugging code and user feedback.
Diffstat (limited to 'lib/bb/runqueue.py')
-rw-r--r--lib/bb/runqueue.py311
1 files changed, 237 insertions, 74 deletions
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 7c918a5d8..c970e2531 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -70,6 +70,88 @@ runQueueCleanUp = 7
runQueueComplete = 8
runQueueChildProcess = 9
+class RunQueueScheduler:
+ """
+ Control the order tasks are scheduled in.
+ """
+ def __init__(self, runqueue):
+ """
+ The default scheduler just returns the first buildable task (the
+ priority map is sorted by task numer)
+ """
+ self.rq = runqueue
+ numTasks = len(self.rq.runq_fnid)
+
+ self.prio_map = []
+ self.prio_map.extend(range(numTasks))
+
+ def next(self):
+ """
+ Return the id of the first task we find that is buildable
+ """
+ for task1 in range(len(self.rq.runq_fnid)):
+ task = self.prio_map[task1]
+ if self.rq.runq_running[task] == 1:
+ continue
+ if self.rq.runq_buildable[task] == 1:
+ return task
+
+class RunQueueSchedulerSpeed(RunQueueScheduler):
+ """
+ A scheduler optimised for speed. The priority map is sorted by task weight,
+ heavier weighted tasks (tasks needed by the most other tasks) are run first.
+ """
+ def __init__(self, runqueue):
+ """
+ The priority map is sorted by task weight.
+ """
+ from copy import deepcopy
+
+ self.rq = runqueue
+
+ sortweight = deepcopy(self.rq.runq_weight)
+ sortweight.sort()
+ copyweight = deepcopy(self.rq.runq_weight)
+ self.prio_map = []
+
+ for weight in sortweight:
+ idx = copyweight.index(weight)
+ self.prio_map.append(idx)
+ copyweight[idx] = -1
+
+ self.prio_map.reverse()
+
+class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
+ """
+ A scheduler optimised to complete .bb files are quickly as possible. The
+ priority map is sorted by task weight, but then reordered so once a given
+ .bb file starts to build, its completed as quickly as possible. This works
+ well where disk space is at a premium and classes like OE's rm_work are in
+ force.
+ """
+ def __init__(self, runqueue):
+ RunQueueSchedulerSpeed.__init__(self, runqueue)
+ from copy import deepcopy
+
+ #FIXME - whilst this groups all fnids together it does not reorder the
+ #fnid groups optimally.
+
+ basemap = deepcopy(self.prio_map)
+ self.prio_map = []
+ while (len(basemap) > 0):
+ entry = basemap.pop(0)
+ self.prio_map.append(entry)
+ fnid = self.rq.runq_fnid[entry]
+ todel = []
+ for entry in basemap:
+ entry_fnid = self.rq.runq_fnid[entry]
+ if entry_fnid == fnid:
+ todel.append(basemap.index(entry))
+ self.prio_map.append(entry)
+ todel.reverse()
+ for idx in todel:
+ del basemap[idx]
+
class RunQueue:
"""
BitBake Run Queue implementation
@@ -90,8 +172,6 @@ class RunQueue:
self.runq_task = []
self.runq_depends = []
self.runq_revdeps = []
- self.runq_weight = []
- self.prio_map = []
self.state = runQueuePrepare
@@ -100,6 +180,152 @@ class RunQueue:
taskname = self.runq_task[task]
return "%s, %s" % (fn, taskname)
+ def circular_depchains_handler(self, tasks):
+ """
+ Some tasks aren't buildable, likely due to circular dependency issues.
+ Identify the circular dependencies and print them in a user readable format.
+ """
+ from copy import deepcopy
+
+ valid_chains = []
+ explored_deps = {}
+ msgs = []
+
+ def chain_reorder(chain):
+ """
+ Reorder a dependency chain so the lowest task id is first
+ """
+ lowest = 0
+ new_chain = []
+ for entry in range(len(chain)):
+ if chain[entry] < chain[lowest]:
+ lowest = entry
+ new_chain.extend(chain[lowest:])
+ new_chain.extend(chain[:lowest])
+ return new_chain
+
+ def chain_compare_equal(chain1, chain2):
+ """
+ Compare two dependency chains and see if they're the same
+ """
+ if len(chain1) != len(chain2):
+ return False
+ for index in range(len(chain1)):
+ if chain1[index] != chain2[index]:
+ return False
+ return True
+
+ def chain_array_contains(chain, chain_array):
+ """
+ Return True if chain_array contains chain
+ """
+ for ch in chain_array:
+ if chain_compare_equal(ch, chain):
+ return True
+ return False
+
+ def find_chains(taskid, prev_chain):
+ prev_chain.append(taskid)
+ total_deps = []
+ total_deps.extend(self.runq_revdeps[taskid])
+ for revdep in self.runq_revdeps[taskid]:
+ if revdep in prev_chain:
+ idx = prev_chain.index(revdep)
+ # To prevent duplicates, reorder the chain to start with the lowest taskid
+ # and search through an array of those we've already printed
+ chain = prev_chain[idx:]
+ new_chain = chain_reorder(chain)
+ if not chain_array_contains(new_chain, valid_chains):
+ valid_chains.append(new_chain)
+ msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
+ for dep in new_chain:
+ msgs.append(" Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep]))
+ msgs.append("\n")
+ if len(valid_chains) > 10:
+ msgs.append("Aborted dependency loops search after 10 matches.\n")
+ return msgs
+ continue
+ scan = False
+ if revdep not in explored_deps:
+ scan = True
+ elif revdep in explored_deps[revdep]:
+ scan = True
+ else:
+ for dep in prev_chain:
+ if dep in explored_deps[revdep]:
+ scan = True
+ if scan:
+ find_chains(revdep, deepcopy(prev_chain))
+ for dep in explored_deps[revdep]:
+ if dep not in total_deps:
+ total_deps.append(dep)
+
+ explored_deps[taskid] = total_deps
+
+ for task in tasks:
+ find_chains(task, [])
+
+ return msgs
+
+ def calculate_task_weights(self, endpoints):
+ """
+ Calculate a number representing the "weight" of each task. Heavier weighted tasks
+ have more dependencies and hence should be executed sooner for maximum speed.
+
+ This function also sanity checks the task list finding tasks that its not
+ possible to execute due to circular dependencies.
+ """
+
+ numTasks = len(self.runq_fnid)
+ weight = []
+ deps_left = []
+ task_done = []
+
+ for listid in range(numTasks):
+ task_done.append(False)
+ weight.append(0)
+ deps_left.append(len(self.runq_revdeps[listid]))
+
+ for listid in endpoints:
+ weight[listid] = 1
+ task_done[listid] = True
+
+ while 1:
+ next_points = []
+ for listid in endpoints:
+ for revdep in self.runq_depends[listid]:
+ weight[revdep] = weight[revdep] + weight[listid]
+ deps_left[revdep] = deps_left[revdep] - 1
+ if deps_left[revdep] == 0:
+ next_points.append(revdep)
+ task_done[revdep] = True
+ endpoints = next_points
+ if len(next_points) == 0:
+ break
+
+ # Circular dependency sanity check
+ problem_tasks = []
+ for task in range(numTasks):
+ if task_done[task] is False or deps_left[task] != 0:
+ problem_tasks.append(task)
+ bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
+ bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
+
+ if problem_tasks:
+ message = "Unbuildable tasks were found.\n"
+ message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
+ message = message + "Identifying dependency loops (this may take a short while)...\n"
+ bb.msg.error(bb.msg.domain.RunQueue, message)
+
+ msgs = self.circular_depchains_handler(problem_tasks)
+
+ message = "\n"
+ for msg in msgs:
+ message = message + msg
+ bb.msg.fatal(bb.msg.domain.RunQueue, message)
+
+ return weight
+
def prepare_runqueue(self):
"""
Turn a set of taskData into a RunQueue and compute data needed
@@ -107,9 +333,7 @@ class RunQueue:
"""
depends = []
- runq_weight1 = []
runq_build = []
- runq_done = []
taskData = self.taskData
@@ -283,11 +507,8 @@ class RunQueue:
self.runq_task.append(taskData.tasks_name[task])
self.runq_depends.append(Set(depends))
self.runq_revdeps.append(Set())
- self.runq_weight.append(0)
- runq_weight1.append(0)
runq_build.append(0)
- runq_done.append(0)
# Step B - Mark all active tasks
#
@@ -349,10 +570,7 @@ class RunQueue:
del self.runq_fnid[listid-delcount]
del self.runq_task[listid-delcount]
del self.runq_depends[listid-delcount]
- del self.runq_weight[listid-delcount]
- del runq_weight1[listid-delcount]
del runq_build[listid-delcount]
- del runq_done[listid-delcount]
del self.runq_revdeps[listid-delcount]
delcount = delcount + 1
maps.append(-1)
@@ -394,51 +612,23 @@ class RunQueue:
for listid in range(len(self.runq_fnid)):
revdeps = self.runq_revdeps[listid]
if len(revdeps) == 0:
- runq_done[listid] = 1
- self.runq_weight[listid] = 1
endpoints.append(listid)
for dep in revdeps:
if dep in self.runq_depends[listid]:
#self.dump_data(taskData)
bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
- runq_weight1[listid] = len(revdeps)
bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
- while 1:
- next_points = []
- for listid in endpoints:
- for revdep in self.runq_depends[listid]:
- self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid]
- runq_weight1[revdep] = runq_weight1[revdep] - 1
- if runq_weight1[revdep] == 0:
- next_points.append(revdep)
- runq_done[revdep] = 1
- endpoints = next_points
- if len(next_points) == 0:
- break
+ # Calculate task weights
+ # Check of higher length circular dependencies
+ self.runq_weight = self.calculate_task_weights(endpoints)
- # Sanity Checks
- for task in range(len(self.runq_fnid)):
- if runq_done[task] == 0:
- seen = []
- deps_seen = []
- def print_chain(taskid, finish):
- seen.append(taskid)
- for revdep in self.runq_revdeps[taskid]:
- if runq_done[revdep] == 0 and revdep not in seen and not finish:
- bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) (depends: %s)" % (revdep, self.get_user_idstring(revdep), self.runq_depends[revdep]))
- if revdep in deps_seen:
- bb.msg.error(bb.msg.domain.RunQueue, "Chain ends at Task %s (%s)" % (revdep, self.get_user_idstring(revdep)))
- finish = True
- return
- for dep in self.runq_depends[revdep]:
- deps_seen.append(dep)
- print_chain(revdep, finish)
- print_chain(task, False)
- bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) not processed!\nThis is probably a circular dependency (the chain might be printed above)." % (task, self.get_user_idstring(task)))
- if runq_weight1[task] != 0:
- bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) count not zero!" % (task, self.get_user_idstring(task)))
+ # Decide what order to execute the tasks in, pick a scheduler
+ # FIXME - Allow user selection
+ #self.sched = RunQueueScheduler(self)
+ self.sched = RunQueueSchedulerSpeed(self)
+ #self.sched = RunQueueSchedulerCompletion(self)
# Sanity Check - Check for multiple tasks building the same provider
prov_list = {}
@@ -461,21 +651,6 @@ class RunQueue:
#if error:
# bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
-
- # Make a weight sorted map
- from copy import deepcopy
-
- sortweight = deepcopy(self.runq_weight)
- sortweight.sort()
- copyweight = deepcopy(self.runq_weight)
- self.prio_map = []
-
- for weight in sortweight:
- idx = copyweight.index(weight)
- self.prio_map.append(idx)
- copyweight[idx] = -1
- self.prio_map.reverse()
-
#self.dump_data(taskData)
self.state = runQueueRunInit
@@ -591,18 +766,6 @@ class RunQueue:
self.state = runQueueFailedCleanUp
bb.event.fire(runQueueTaskFailed(task, self.stats, self, self.cfgData))
- def get_next_task(self):
- """
- Return the id of the highest priority task that is buildable
- """
- for task1 in range(self.stats.total):
- task = self.prio_map[task1]
- if self.runq_running[task] == 1:
- continue
- if self.runq_buildable[task] == 1:
- return task
- return None
-
def execute_runqueue_internal(self):
"""
Run the tasks in a queue prepared by prepare_runqueue
@@ -615,7 +778,7 @@ class RunQueue:
while True:
task = None
if self.stats.active < self.number_tasks:
- task = self.get_next_task()
+ task = self.sched.next()
if task is not None:
fn = self.taskData.fn_index[self.runq_fnid[task]]