diff options
author | Richard Purdie <rpurdie@linux.intel.com> | 2010-08-19 11:36:29 +0100 |
---|---|---|
committer | Chris Larson <chris_larson@mentor.com> | 2010-12-29 23:51:07 -0700 |
commit | f51f6f8928145b41770c0322c901ab7c3043bc0e (patch) | |
tree | ad8d650273942a3fe54b90739e5204c088dffcf3 /lib/bb/runqueue.py | |
parent | ce6071c64dd98aada6649fa7811fdc6f92b6f654 (diff) | |
download | bitbake-f51f6f8928145b41770c0322c901ab7c3043bc0e.tar.gz |
runqueue: implement scenequeue
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
Diffstat (limited to 'lib/bb/runqueue.py')
-rw-r--r-- | lib/bb/runqueue.py | 320 |
1 files changed, 275 insertions, 45 deletions
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index c4f716170..087edace3 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -22,6 +22,7 @@ Handles preparation and execution of a queue of tasks # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import copy import os import sys import signal @@ -63,12 +64,14 @@ class RunQueueStats: # These values indicate the next step due to be run in the # runQueue state machine runQueuePrepare = 2 -runQueueRunInit = 3 -runQueueRunning = 4 -runQueueFailed = 6 -runQueueCleanUp = 7 -runQueueComplete = 8 -runQueueChildProcess = 9 +runQueueSceneInit = 3 +runQueueSceneRun = 4 +runQueueRunInit = 5 +runQueueRunning = 6 +runQueueFailed = 7 +runQueueCleanUp = 8 +runQueueComplete = 9 +runQueueChildProcess = 10 class RunQueueScheduler(object): """ @@ -117,13 +120,12 @@ class RunQueueSchedulerSpeed(RunQueueScheduler): """ The priority map is sorted by task weight. """ - from copy import deepcopy self.rq = runqueue self.rqdata = rqdata - sortweight = sorted(deepcopy(self.rqdata.runq_weight)) - copyweight = deepcopy(self.rqdata.runq_weight) + sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight)) + copyweight = copy.deepcopy(self.rqdata.runq_weight) self.prio_map = [] for weight in sortweight: @@ -145,12 +147,11 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): def __init__(self, runqueue, rqdata): RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata) - from copy import deepcopy #FIXME - whilst this groups all fnids together it does not reorder the #fnid groups optimally. - basemap = deepcopy(self.prio_map) + basemap = copy.deepcopy(self.prio_map) self.prio_map = [] while (len(basemap) > 0): entry = basemap.pop(0) @@ -283,7 +284,7 @@ class RunQueueData: if dep in explored_deps[revdep]: scan = True if scan: - find_chains(revdep, deepcopy(prev_chain)) + find_chains(revdep, copy.deepcopy(prev_chain)) for dep in explored_deps[revdep]: if dep not in total_deps: total_deps.append(dep) @@ -683,6 +684,14 @@ class RunQueueData: stampfnwhitelist.append(fn) self.stampfnwhitelist = stampfnwhitelist + # Interate over the task list looking for tasks with a 'setscene' function + self.runq_setscene = [] + for task in range(len(self.runq_fnid)): + setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False) + if not setscene: + continue + self.runq_setscene.append(task) + return len(self.runq_fnid) def dump_data(self, taskQueue): @@ -815,6 +824,13 @@ class RunQueue: return current def check_stamp_task(self, task, taskname = None): + def get_timestamp(f): + try: + if not os.access(f, os.F_OK): + return None + return os.stat(f)[stat.ST_MTIME] + except: + return None if self.stamppolicy == "perfile": fulldeptree = False @@ -838,23 +854,25 @@ class RunQueue: logger.debug(2, "%s.%s is nostamp\n", fn, taskname) return False + if taskname.endswith("_setscene"): + return True + iscurrent = True - t1 = os.stat(stampfile)[stat.ST_MTIME] + t1 = get_timestamp(stampfile) for dep in self.rqdata.runq_depends[task]: if iscurrent: fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]] taskname2 = self.rqdata.runq_task[dep] stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2) + t2 = get_timestamp(stampfile2) + t3 = get_timestamp(stampfile2 + "_setscene") + if t3 and t3 > t2: + continue if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): - try: - t2 = os.stat(stampfile2)[stat.ST_MTIME] - if t1 < t2: - logger.debug(2, "Stampfile %s < %s", stampfile, stampfile2) - iscurrent = False - except: - logger.debug(2, "Exception reading %s for %s", stampfile2, stampfile) + if not t2 or t1 < t2: + logger.debug(2, "Stampfile %s < %s (or does not exist)", + stampfile, stampfile2) iscurrent = False - return iscurrent def execute_runqueue(self): @@ -871,7 +889,13 @@ class RunQueue: if self.rqdata.prepare() is 0: self.state = runQueueComplete else: - self.state = runQueueRunInit + self.state = runQueueSceneInit + + if self.state is runQueueSceneInit: + self.rqexe = RunQueueExecuteScenequeue(self) + + if self.state is runQueueSceneRun: + self.rqexe.execute() if self.state is runQueueRunInit: logger.info("Executing runqueue") @@ -944,8 +968,6 @@ class RunQueueExecute: self.task_fail(task, result[1]>>8) else: self.task_complete(task) - self.stats.taskCompleted() - bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) def finish_now(self): if self.stats.active: @@ -964,14 +986,10 @@ class RunQueueExecute: for pipe in self.build_pipes: self.build_pipes[pipe].read() - try: - while self.stats.active > 0: - bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) - if self.runqueue_process_waitpid() is None: - return - except: - self.finish_now() - raise + if self.stats.active > 0: + bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) + self.runqueue_process_waitpid() + return if len(self.failed_fnids) != 0: self.rq.state = runQueueFailed @@ -980,12 +998,6 @@ class RunQueueExecute: self.rq.state = runQueueComplete return - def notify_task_started(self, task): - bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData) - - def notify_task_completed(self, task): - bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) - def fork_off_task(self, fn, task, taskname): sys.stdout.flush() sys.stderr.flush() @@ -1008,8 +1020,6 @@ class RunQueueExecute: newsi = os.open(os.devnull, os.O_RDWR) os.dup2(newsi, sys.stdin.fileno()) - self.notify_task_started(task) - bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data) try: @@ -1044,6 +1054,23 @@ class RunQueueExecuteTasks(RunQueueExecute): self.runq_buildable.append(1) else: self.runq_buildable.append(0) + if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered): + self.rq.scenequeue_covered.add(task) + + found = True + while found: + found = False + for task in range(self.stats.total): + if task in self.rq.scenequeue_covered: + continue + if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered): + self.rq.scenequeue_covered.add(task) + found = True + + logger.info('Full skip list %s', self.rq.scenequeue_covered) + + for task in self.rq.scenequeue_covered: + self.task_skip(task) event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) @@ -1078,7 +1105,7 @@ class RunQueueExecuteTasks(RunQueueExecute): schedulers.add(getattr(module, name)) return schedulers - def task_complete(self, task): + def task_completeoutright(self, task): """ Mark a task as completed Look at the reverse dependencies and mark any task with @@ -1100,6 +1127,11 @@ class RunQueueExecuteTasks(RunQueueExecute): taskname = self.rqdata.runq_task[revdep] logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname) + def task_complete(self, task): + self.stats.taskCompleted() + bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) + self.task_completeoutright(task) + def task_fail(self, task, exitcode): """ Called when a task has failed @@ -1115,7 +1147,7 @@ class RunQueueExecuteTasks(RunQueueExecute): def task_skip(self, task): self.runq_running[task] = 1 self.runq_buildable[task] = 1 - self.task_complete(task) + self.task_completeoutright(task) self.stats.taskCompleted() self.stats.taskSkipped() @@ -1141,13 +1173,11 @@ class RunQueueExecuteTasks(RunQueueExecute): elif self.cooker.configuration.dry_run: self.runq_running[task] = 1 self.runq_buildable[task] = 1 - self.notify_task_started(task) self.stats.taskActive() self.task_complete(task) - self.stats.taskCompleted() - self.notify_task_completed(task) continue + bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData) pid, pipein, pipeout = self.fork_off_task(fn, task, taskname) self.build_pids[pid] = task @@ -1178,6 +1208,206 @@ class RunQueueExecuteTasks(RunQueueExecute): self.rq.state = runQueueComplete return +class RunQueueExecuteScenequeue(RunQueueExecute): + def __init__(self, rq): + RunQueueExecute.__init__(self, rq) + + self.scenequeue_covered = set() + self.scenequeue_notcovered = set() + + # If we don't have any setscene functions, skip this step + if len(self.rqdata.runq_setscene) == 0: + rq.scenequeue_covered = set() + rq.state = runQueueRunInit + return + + self.stats = RunQueueStats(len(self.rqdata.runq_setscene)) + + endpoints = {} + sq_revdeps = [] + sq_revdeps_new = [] + sq_revdeps_squash = [] + + # We need to construct a dependency graph for the setscene functions. Intermediate + # dependencies between the setscene tasks only complicate the code. This code + # therefore aims to collapse the huge runqueue dependency tree into a smaller one + # only containing the setscene functions. + + for task in range(self.stats.total): + self.runq_running.append(0) + self.runq_complete.append(0) + self.runq_buildable.append(0) + + for task in range(len(self.rqdata.runq_fnid)): + sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task])) + sq_revdeps_new.append(set()) + if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene: + endpoints[task] = None + + for task in self.rqdata.runq_setscene: + for dep in self.rqdata.runq_depends[task]: + endpoints[dep] = task + + def process_endpoints(endpoints): + newendpoints = {} + for point, task in endpoints.items(): + tasks = set() + if task: + tasks.add(task) + if sq_revdeps_new[point]: + tasks |= sq_revdeps_new[point] + sq_revdeps_new[point] = set() + for dep in self.rqdata.runq_depends[point]: + if point in sq_revdeps[dep]: + sq_revdeps[dep].remove(point) + if tasks: + sq_revdeps_new[dep] |= tasks + if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene: + newendpoints[dep] = task + if len(newendpoints) != 0: + process_endpoints(newendpoints) + + process_endpoints(endpoints) + + for task in range(len(self.rqdata.runq_fnid)): + if task in self.rqdata.runq_setscene: + deps = set() + for dep in sq_revdeps_new[task]: + deps.add(self.rqdata.runq_setscene.index(dep)) + sq_revdeps_squash.append(deps) + elif len(sq_revdeps_new[task]) != 0: + bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.") + + #for task in range(len(sq_revdeps_squash)): + # print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task]) + + self.sq_deps = [] + self.sq_revdeps = sq_revdeps_squash + self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps) + + for task in range(len(self.sq_revdeps)): + self.sq_deps.append(set()) + for task in range(len(self.sq_revdeps)): + for dep in self.sq_revdeps[task]: + self.sq_deps[dep].add(task) + + for task in range(len(self.sq_revdeps)): + if len(self.sq_revdeps[task]) == 0: + self.runq_buildable[task] = 1 + + logger.info('Executing setscene tasks') + + self.rq.state = runQueueSceneRun + + def scenequeue_updatecounters(self, task): + for dep in self.sq_deps[task]: + self.sq_revdeps2[dep].remove(task) + if len(self.sq_revdeps2[dep]) == 0: + self.runq_buildable[dep] = 1 + + def task_completeoutright(self, task): + """ + Mark a task as completed + Look at the reverse dependencies and mark any task with + completed dependencies as buildable + """ + + index = self.rqdata.runq_setscene[task] + logger.info('Found task %s which could be accelerated', + self.rqdata.get_user_idstring(index)) + + self.scenequeue_covered.add(task) + self.scenequeue_updatecounters(task) + + def task_complete(self, task): + self.stats.taskCompleted() + self.task_completeoutright(task) + + def task_fail(self, task, result): + self.stats.taskFailed() + index = self.rqdata.runq_setscene[task] + bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) + self.scenequeue_notcovered.add(task) + self.scenequeue_updatecounters(task) + + def task_failoutright(self, task): + self.runq_running[task] = 1 + self.runq_buildable[task] = 1 + self.stats.taskCompleted() + self.stats.taskSkipped() + index = self.rqdata.runq_setscene[task] + self.scenequeue_notcovered.add(task) + self.scenequeue_updatecounters(task) + + def task_skip(self, task): + self.runq_running[task] = 1 + self.runq_buildable[task] = 1 + self.task_completeoutright(task) + self.stats.taskCompleted() + self.stats.taskSkipped() + + def execute(self): + """ + Run the tasks in a queue prepared by prepare_runqueue + """ + + task = None + if self.stats.active < self.number_tasks: + # Find the next setscene to run + for nexttask in range(self.stats.total): + if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1: + task = nexttask + break + if task is not None: + realtask = self.rqdata.runq_setscene[task] + fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]] + + taskname = self.rqdata.runq_task[realtask] + "_setscene" + if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]): + logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant', + task, self.rqdata.get_user_idstring(task)) + self.task_failoutright(task) + return True + + if self.cooker.configuration.force: + for target in self.rqdata.target_pairs: + if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]: + self.task_failoutright(task) + return True + + if self.rq.check_stamp_task(realtask, taskname): + logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies', + task, self.rqdata.get_user_idstring(realtask)) + self.task_skip(task) + return True + + pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname) + + self.build_pids[pid] = task + self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) + self.runq_running[task] = 1 + self.stats.taskActive() + if self.stats.active < self.number_tasks: + return True + + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + + if self.stats.active > 0: + if self.runqueue_process_waitpid() is None: + return True + return True + + # Convert scenequeue_covered task numbers into full taskgraph ids + oldcovered = self.scenequeue_covered + self.rq.scenequeue_covered = set() + for task in oldcovered: + self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task]) + + logger.info('We can skip tasks %s', self.rq.scenequeue_covered) + + self.rq.state = runQueueRunInit + return True class TaskFailure(Exception): """ |