diff options
author | Richard Purdie <rpurdie@linux.intel.com> | 2008-01-06 00:05:09 +0000 |
---|---|---|
committer | Richard Purdie <rpurdie@linux.intel.com> | 2008-01-06 00:05:09 +0000 |
commit | 223e4cbbaa8a0ea96fba850b8d6f7cb938794ef0 (patch) | |
tree | 614d1f95709f60324563143427b89c4a6b5e567e /lib/bb/runqueue.py | |
parent | 2dcf122a9a157f002d053411bc10e219b679658a (diff) | |
download | bitbake-223e4cbbaa8a0ea96fba850b8d6f7cb938794ef0.tar.gz |
runqueue.py: Add task scheduler abstraction and some example schedulers. Improve circular dependency chain debugging code and user feedback.
Diffstat (limited to 'lib/bb/runqueue.py')
-rw-r--r-- | lib/bb/runqueue.py | 311 |
1 files changed, 237 insertions, 74 deletions
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index 7c918a5d8..c970e2531 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -70,6 +70,88 @@ runQueueCleanUp = 7 runQueueComplete = 8 runQueueChildProcess = 9 +class RunQueueScheduler: + """ + Control the order tasks are scheduled in. + """ + def __init__(self, runqueue): + """ + The default scheduler just returns the first buildable task (the + priority map is sorted by task numer) + """ + self.rq = runqueue + numTasks = len(self.rq.runq_fnid) + + self.prio_map = [] + self.prio_map.extend(range(numTasks)) + + def next(self): + """ + Return the id of the first task we find that is buildable + """ + for task1 in range(len(self.rq.runq_fnid)): + task = self.prio_map[task1] + if self.rq.runq_running[task] == 1: + continue + if self.rq.runq_buildable[task] == 1: + return task + +class RunQueueSchedulerSpeed(RunQueueScheduler): + """ + A scheduler optimised for speed. The priority map is sorted by task weight, + heavier weighted tasks (tasks needed by the most other tasks) are run first. + """ + def __init__(self, runqueue): + """ + The priority map is sorted by task weight. + """ + from copy import deepcopy + + self.rq = runqueue + + sortweight = deepcopy(self.rq.runq_weight) + sortweight.sort() + copyweight = deepcopy(self.rq.runq_weight) + self.prio_map = [] + + for weight in sortweight: + idx = copyweight.index(weight) + self.prio_map.append(idx) + copyweight[idx] = -1 + + self.prio_map.reverse() + +class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): + """ + A scheduler optimised to complete .bb files are quickly as possible. The + priority map is sorted by task weight, but then reordered so once a given + .bb file starts to build, its completed as quickly as possible. This works + well where disk space is at a premium and classes like OE's rm_work are in + force. + """ + def __init__(self, runqueue): + RunQueueSchedulerSpeed.__init__(self, runqueue) + from copy import deepcopy + + #FIXME - whilst this groups all fnids together it does not reorder the + #fnid groups optimally. + + basemap = deepcopy(self.prio_map) + self.prio_map = [] + while (len(basemap) > 0): + entry = basemap.pop(0) + self.prio_map.append(entry) + fnid = self.rq.runq_fnid[entry] + todel = [] + for entry in basemap: + entry_fnid = self.rq.runq_fnid[entry] + if entry_fnid == fnid: + todel.append(basemap.index(entry)) + self.prio_map.append(entry) + todel.reverse() + for idx in todel: + del basemap[idx] + class RunQueue: """ BitBake Run Queue implementation @@ -90,8 +172,6 @@ class RunQueue: self.runq_task = [] self.runq_depends = [] self.runq_revdeps = [] - self.runq_weight = [] - self.prio_map = [] self.state = runQueuePrepare @@ -100,6 +180,152 @@ class RunQueue: taskname = self.runq_task[task] return "%s, %s" % (fn, taskname) + def circular_depchains_handler(self, tasks): + """ + Some tasks aren't buildable, likely due to circular dependency issues. + Identify the circular dependencies and print them in a user readable format. + """ + from copy import deepcopy + + valid_chains = [] + explored_deps = {} + msgs = [] + + def chain_reorder(chain): + """ + Reorder a dependency chain so the lowest task id is first + """ + lowest = 0 + new_chain = [] + for entry in range(len(chain)): + if chain[entry] < chain[lowest]: + lowest = entry + new_chain.extend(chain[lowest:]) + new_chain.extend(chain[:lowest]) + return new_chain + + def chain_compare_equal(chain1, chain2): + """ + Compare two dependency chains and see if they're the same + """ + if len(chain1) != len(chain2): + return False + for index in range(len(chain1)): + if chain1[index] != chain2[index]: + return False + return True + + def chain_array_contains(chain, chain_array): + """ + Return True if chain_array contains chain + """ + for ch in chain_array: + if chain_compare_equal(ch, chain): + return True + return False + + def find_chains(taskid, prev_chain): + prev_chain.append(taskid) + total_deps = [] + total_deps.extend(self.runq_revdeps[taskid]) + for revdep in self.runq_revdeps[taskid]: + if revdep in prev_chain: + idx = prev_chain.index(revdep) + # To prevent duplicates, reorder the chain to start with the lowest taskid + # and search through an array of those we've already printed + chain = prev_chain[idx:] + new_chain = chain_reorder(chain) + if not chain_array_contains(new_chain, valid_chains): + valid_chains.append(new_chain) + msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) + for dep in new_chain: + msgs.append(" Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep])) + msgs.append("\n") + if len(valid_chains) > 10: + msgs.append("Aborted dependency loops search after 10 matches.\n") + return msgs + continue + scan = False + if revdep not in explored_deps: + scan = True + elif revdep in explored_deps[revdep]: + scan = True + else: + for dep in prev_chain: + if dep in explored_deps[revdep]: + scan = True + if scan: + find_chains(revdep, deepcopy(prev_chain)) + for dep in explored_deps[revdep]: + if dep not in total_deps: + total_deps.append(dep) + + explored_deps[taskid] = total_deps + + for task in tasks: + find_chains(task, []) + + return msgs + + def calculate_task_weights(self, endpoints): + """ + Calculate a number representing the "weight" of each task. Heavier weighted tasks + have more dependencies and hence should be executed sooner for maximum speed. + + This function also sanity checks the task list finding tasks that its not + possible to execute due to circular dependencies. + """ + + numTasks = len(self.runq_fnid) + weight = [] + deps_left = [] + task_done = [] + + for listid in range(numTasks): + task_done.append(False) + weight.append(0) + deps_left.append(len(self.runq_revdeps[listid])) + + for listid in endpoints: + weight[listid] = 1 + task_done[listid] = True + + while 1: + next_points = [] + for listid in endpoints: + for revdep in self.runq_depends[listid]: + weight[revdep] = weight[revdep] + weight[listid] + deps_left[revdep] = deps_left[revdep] - 1 + if deps_left[revdep] == 0: + next_points.append(revdep) + task_done[revdep] = True + endpoints = next_points + if len(next_points) == 0: + break + + # Circular dependency sanity check + problem_tasks = [] + for task in range(numTasks): + if task_done[task] is False or deps_left[task] != 0: + problem_tasks.append(task) + bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task))) + bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task])) + + if problem_tasks: + message = "Unbuildable tasks were found.\n" + message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" + message = message + "Identifying dependency loops (this may take a short while)...\n" + bb.msg.error(bb.msg.domain.RunQueue, message) + + msgs = self.circular_depchains_handler(problem_tasks) + + message = "\n" + for msg in msgs: + message = message + msg + bb.msg.fatal(bb.msg.domain.RunQueue, message) + + return weight + def prepare_runqueue(self): """ Turn a set of taskData into a RunQueue and compute data needed @@ -107,9 +333,7 @@ class RunQueue: """ depends = [] - runq_weight1 = [] runq_build = [] - runq_done = [] taskData = self.taskData @@ -283,11 +507,8 @@ class RunQueue: self.runq_task.append(taskData.tasks_name[task]) self.runq_depends.append(Set(depends)) self.runq_revdeps.append(Set()) - self.runq_weight.append(0) - runq_weight1.append(0) runq_build.append(0) - runq_done.append(0) # Step B - Mark all active tasks # @@ -349,10 +570,7 @@ class RunQueue: del self.runq_fnid[listid-delcount] del self.runq_task[listid-delcount] del self.runq_depends[listid-delcount] - del self.runq_weight[listid-delcount] - del runq_weight1[listid-delcount] del runq_build[listid-delcount] - del runq_done[listid-delcount] del self.runq_revdeps[listid-delcount] delcount = delcount + 1 maps.append(-1) @@ -394,51 +612,23 @@ class RunQueue: for listid in range(len(self.runq_fnid)): revdeps = self.runq_revdeps[listid] if len(revdeps) == 0: - runq_done[listid] = 1 - self.runq_weight[listid] = 1 endpoints.append(listid) for dep in revdeps: if dep in self.runq_depends[listid]: #self.dump_data(taskData) bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid])) - runq_weight1[listid] = len(revdeps) bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints)) - while 1: - next_points = [] - for listid in endpoints: - for revdep in self.runq_depends[listid]: - self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid] - runq_weight1[revdep] = runq_weight1[revdep] - 1 - if runq_weight1[revdep] == 0: - next_points.append(revdep) - runq_done[revdep] = 1 - endpoints = next_points - if len(next_points) == 0: - break + # Calculate task weights + # Check of higher length circular dependencies + self.runq_weight = self.calculate_task_weights(endpoints) - # Sanity Checks - for task in range(len(self.runq_fnid)): - if runq_done[task] == 0: - seen = [] - deps_seen = [] - def print_chain(taskid, finish): - seen.append(taskid) - for revdep in self.runq_revdeps[taskid]: - if runq_done[revdep] == 0 and revdep not in seen and not finish: - bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) (depends: %s)" % (revdep, self.get_user_idstring(revdep), self.runq_depends[revdep])) - if revdep in deps_seen: - bb.msg.error(bb.msg.domain.RunQueue, "Chain ends at Task %s (%s)" % (revdep, self.get_user_idstring(revdep))) - finish = True - return - for dep in self.runq_depends[revdep]: - deps_seen.append(dep) - print_chain(revdep, finish) - print_chain(task, False) - bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) not processed!\nThis is probably a circular dependency (the chain might be printed above)." % (task, self.get_user_idstring(task))) - if runq_weight1[task] != 0: - bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) count not zero!" % (task, self.get_user_idstring(task))) + # Decide what order to execute the tasks in, pick a scheduler + # FIXME - Allow user selection + #self.sched = RunQueueScheduler(self) + self.sched = RunQueueSchedulerSpeed(self) + #self.sched = RunQueueSchedulerCompletion(self) # Sanity Check - Check for multiple tasks building the same provider prov_list = {} @@ -461,21 +651,6 @@ class RunQueue: #if error: # bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...") - - # Make a weight sorted map - from copy import deepcopy - - sortweight = deepcopy(self.runq_weight) - sortweight.sort() - copyweight = deepcopy(self.runq_weight) - self.prio_map = [] - - for weight in sortweight: - idx = copyweight.index(weight) - self.prio_map.append(idx) - copyweight[idx] = -1 - self.prio_map.reverse() - #self.dump_data(taskData) self.state = runQueueRunInit @@ -591,18 +766,6 @@ class RunQueue: self.state = runQueueFailedCleanUp bb.event.fire(runQueueTaskFailed(task, self.stats, self, self.cfgData)) - def get_next_task(self): - """ - Return the id of the highest priority task that is buildable - """ - for task1 in range(self.stats.total): - task = self.prio_map[task1] - if self.runq_running[task] == 1: - continue - if self.runq_buildable[task] == 1: - return task - return None - def execute_runqueue_internal(self): """ Run the tasks in a queue prepared by prepare_runqueue @@ -615,7 +778,7 @@ class RunQueue: while True: task = None if self.stats.active < self.number_tasks: - task = self.get_next_task() + task = self.sched.next() if task is not None: fn = self.taskData.fn_index[self.runq_fnid[task]] |