From e7c7e1f2e58dc985a9041f4eb426947e33d00910 Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Wed, 18 Aug 2010 17:13:06 +0100 Subject: bitbake/runqueue.py: Create RunQueueExecute and RunQueueExecuteTasks classes, further splitting up runqueue Signed-off-by: Richard Purdie --- lib/bb/runqueue.py | 313 ++++++++++++++++++++++++++++------------------------- 1 file changed, 164 insertions(+), 149 deletions(-) (limited to 'lib/bb/runqueue.py') diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index 7efb77736..00a63cdd2 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -92,7 +92,7 @@ class RunQueueScheduler(object): """ Return the id of the first task we find that is buildable """ - for tasknum in xrange(len(self.rq.runq_fnid)): + for tasknum in xrange(len(self.rqdata.runq_fnid)): taskid = self.prio_map[tasknum] if self.rq.runq_running[taskid] == 1: continue @@ -709,7 +709,6 @@ class RunQueueData: self.rqdata.runq_depends[task], self.rqdata.runq_revdeps[task]) - class RunQueue: def __init__(self, cooker, cfgData, dataCache, taskData, targets): @@ -717,34 +716,10 @@ class RunQueue: self.cfgData = cfgData self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets) - self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) - self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile" self.state = runQueuePrepare - def get_schedulers(self): - schedulers = set(obj for obj in globals().values() - if type(obj) is type and - issubclass(obj, RunQueueScheduler)) - - user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True) - if user_schedulers: - for sched in user_schedulers.split(): - if not "." in sched: - bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) - continue - - modname, name = sched.rsplit(".", 1) - try: - module = __import__(modname, fromlist=(name,)) - except ImportError, exc: - logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) - raise SystemExit(1) - else: - schedulers.add(getattr(module, name)) - return schedulers - def check_stamps(self): unchecked = {} current = [] @@ -897,24 +872,25 @@ class RunQueue: if self.state is runQueueRunInit: logger.info("Executing runqueue") - self.execute_runqueue_initVars() + self.rqexe = RunQueueExecuteTasks(self) + self.state = runQueueRunning if self.state is runQueueRunning: - self.execute_runqueue_internal() + self.rqexe.execute() if self.state is runQueueCleanUp: - self.finish_runqueue() + self.rqexe.finish() if self.state is runQueueFailed: if not self.rqdata.taskData.tryaltconfigs: - raise bb.runqueue.TaskFailure(self.failed_fnids) - for fnid in self.failed_fnids: + raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids) + for fnid in self.rqexe.failed_fnids: self.rqdata.taskData.fail_fnid(fnid) self.rqdata.reset() if self.state is runQueueComplete: # All done - logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.stats.completed, self.stats.skipped, self.stats.failed) + logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed) return False if self.state is runQueueChildProcess: @@ -924,9 +900,23 @@ class RunQueue: # Loop return retval - def execute_runqueue_initVars(self): + def finish_runqueue(self, now = False): + if now: + self.rqexe.finish_now() + else: + self.rqexe.finish() - self.stats = RunQueueStats(len(self.rqdata.runq_fnid)) + +class RunQueueExecute: + + def __init__(self, rq): + self.rq = rq + self.cooker = rq.cooker + self.cfgData = rq.cfgData + self.rqdata = rq.rqdata + + self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1) + self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed" self.runq_buildable = [] self.runq_running = [] @@ -935,6 +925,115 @@ class RunQueue: self.build_pipes = {} self.failed_fnids = [] + def runqueue_process_waitpid(self): + """ + Return none is there are no processes awaiting result collection, otherwise + collect the process exit codes and close the information pipe. + """ + result = os.waitpid(-1, os.WNOHANG) + if result[0] is 0 and result[1] is 0: + return None + task = self.build_pids[result[0]] + del self.build_pids[result[0]] + self.build_pipes[result[0]].close() + del self.build_pipes[result[0]] + if result[1] != 0: + self.task_fail(task, result[1]) + else: + self.task_complete(task) + self.stats.taskCompleted() + bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) + + def finish_now(self): + if self.stats.active: + logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active) + for k, v in self.build_pids.iteritems(): + try: + os.kill(-k, signal.SIGTERM) + except: + pass + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + + def finish(self): + self.rq.state = runQueueCleanUp + + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + + try: + while self.stats.active > 0: + bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) + if self.runqueue_process_waitpid() is None: + return + except: + self.finish_now() + raise + + if len(self.failed_fnids) != 0: + self.rq.state = runQueueFailed + return + + self.rq.state = runQueueComplete + return + + + def notify_task_started(self, task): + bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData) + logger.info("Running task %d of %d (ID: %s, %s)", self.stats.completed + self.stats.active + self.stats.failed + 1, + self.stats.total, + task, + self.rqdata.get_user_idstring(task)) + + def notify_task_completed(self, task): + bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) + + def fork_off_task(self, fn, task, taskname): + sys.stdout.flush() + sys.stderr.flush() + try: + pipein, pipeout = os.pipe() + pid = os.fork() + except OSError as e: + bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) + if pid == 0: + os.close(pipein) + # Save out the PID so that the event can include it the + # events + bb.event.worker_pid = os.getpid() + bb.event.worker_pipe = pipeout + + # Child processes should send their messages to the UI + # process via the server process, not print them + # themselves + bblogger.handlers = [bb.event.LogHandler()] + + self.rq.state = runQueueChildProcess + # Make the child the process group leader + os.setpgid(0, 0) + # No stdin + newsi = os.open('/dev/null', os.O_RDWR) + os.dup2(newsi, sys.stdin.fileno()) + + self.notify_task_started(task) + + bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) + bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data) + try: + the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) + bb.build.exec_task(fn, taskname, the_data) + except Exception as exc: + logger.critical(str(exc)) + os._exit(1) + os._exit(0) + return pid, pipein, pipeout + +class RunQueueExecuteTasks(RunQueueExecute): + def __init__(self, rq): + RunQueueExecute.__init__(self, rq) + + self.stats = RunQueueStats(len(self.rqdata.runq_fnid)) + # Mark initial buildable tasks for task in xrange(self.stats.total): self.runq_running.append(0) @@ -944,19 +1043,39 @@ class RunQueue: else: self.runq_buildable.append(0) - self.state = runQueueRunning - event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) for scheduler in self.get_schedulers(): if self.scheduler == scheduler.name: - self.sched = scheduler(self) + self.sched = scheduler(self, self.rqdata) logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) break else: bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % (self.scheduler, ", ".join(obj.name for obj in self.schedulers))) + def get_schedulers(self): + schedulers = set(obj for obj in globals().values() + if type(obj) is type and + issubclass(obj, RunQueueScheduler)) + + user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True) + if user_schedulers: + for sched in user_schedulers.split(): + if not "." in sched: + bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) + continue + + modname, name = sched.rsplit(".", 1) + try: + module = __import__(modname, fromlist=(name,)) + except ImportError, exc: + logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) + raise SystemExit(1) + else: + schedulers.add(getattr(module, name)) + return schedulers + def task_complete(self, task): """ Mark a task as completed @@ -989,25 +1108,25 @@ class RunQueue: self.stats.taskFailed() fnid = self.rqdata.runq_fnid[task] self.failed_fnids.append(fnid) - bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) + bb.event.fire(runQueueTaskFailed(task, self.stats, self.rq), self.cfgData) if self.rqdata.taskData.abort: - self.state = runQueueCleanUp + self.rq.state = runQueueCleanUp - def execute_runqueue_internal(self): + def execute(self): """ Run the tasks in a queue prepared by rqdata.prepare() """ if self.stats.total == 0: # nothing to do - self.state = runQueueCleanUp + self.rq.state = runQueueCleanUp while True: for task in iter(self.sched.next, None): fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] taskname = self.rqdata.runq_task[task] - if self.check_stamp_task(task, taskname): + if self.rq.check_stamp_task(task, taskname): logger.debug(2, "Stamp current task %s (%s)", task, self.rqdata.get_user_idstring(task)) self.runq_running[task] = 1 @@ -1037,12 +1156,12 @@ class RunQueue: self.build_pipes[pipe].read() if self.stats.active > 0: - if self.runqueue_process_waitpid(self.task_complete, self.task_fail) is None: + if self.runqueue_process_waitpid() is None: return continue if len(self.failed_fnids) != 0: - self.state = runQueueFailed + self.rq.state = runQueueFailed return # Sanity Checks @@ -1053,113 +1172,9 @@ class RunQueue: logger.error("Task %s never ran!", task) if self.runq_complete[task] == 0: logger.error("Task %s never completed!", task) - self.state = runQueueComplete + self.rq.state = runQueueComplete return - def runqueue_process_waitpid(self, success, failure): - """ - Return none is there are no processes awaiting result collection, otherwise - collect the process exit codes and close the information pipe. - """ - result = os.waitpid(-1, os.WNOHANG) - if result[0] is 0 and result[1] is 0: - return None - task = self.build_pids[result[0]] - del self.build_pids[result[0]] - self.build_pipes[result[0]].close() - del self.build_pipes[result[0]] - if result[1] != 0: - failure(task, result[1]>>8) - else: - success(task) - self.stats.taskCompleted() - self.notify_task_completed(task) - - def finish_runqueue_now(self): - if self.stats.active: - logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active) - for k, v in self.build_pids.iteritems(): - try: - os.kill(-k, signal.SIGTERM) - except: - pass - for pipe in self.build_pipes: - self.build_pipes[pipe].read() - - def finish_runqueue(self, now = False): - self.state = runQueueCleanUp - - for pipe in self.build_pipes: - self.build_pipes[pipe].read() - - if now: - self.finish_runqueue_now() - try: - while self.stats.active > 0: - bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) - if self.runqueue_process_waitpid(self.task_complete, self.task_fail) is None: - return - except: - self.finish_runqueue_now() - raise - - if len(self.failed_fnids) != 0: - self.state = runQueueFailed - return - - self.state = runQueueComplete - return - - def notify_task_started(self, task): - bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData) - logger.info("Running task %d of %d (ID: %s, %s)", self.stats.completed + self.stats.active + self.stats.failed + 1, - self.stats.total, - task, - self.get_user_idstring(task)) - - def notify_task_completed(self, task): - bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) - - def fork_off_task(self, fn, task, taskname): - sys.stdout.flush() - sys.stderr.flush() - try: - pipein, pipeout = os.pipe() - pid = os.fork() - except OSError as e: - bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) - if pid == 0: - os.close(pipein) - # Save out the PID so that the event can include it the - # events - bb.event.worker_pid = os.getpid() - bb.event.worker_pipe = pipeout - - # Child processes should send their messages to the UI - # process via the server process, not print them - # themselves - bblogger.handlers = [bb.event.LogHandler()] - - self.state = runQueueChildProcess - # Make the child the process group leader - os.setpgid(0, 0) - # No stdin - newsi = os.open('/dev/null', os.O_RDWR) - os.dup2(newsi, sys.stdin.fileno()) - - self.notify_task_started(task) - - bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) - bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data) - try: - the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) - bb.build.exec_task(fn, taskname, the_data) - except Exception as exc: - logger.critical(str(exc)) - os._exit(1) - os._exit(0) - return pid, pipein, pipeout - class TaskFailure(Exception): """ -- cgit 1.2.3-korg