diff options
author | Chris Larson <chris_larson@mentor.com> | 2010-12-07 13:00:22 -0500 |
---|---|---|
committer | Chris Larson <chris_larson@mentor.com> | 2010-12-07 14:42:04 -0500 |
commit | 9caf65e79f95fe0045e727391e974c4c1e7411ff (patch) | |
tree | df34f9a5d1ee6ed846ab8e923f966aa700fb26ca /lib/bb | |
parent | 89ce8df075ac8c9a5478c86405e6e6b60346a51c (diff) | |
download | bitbake-contrib-9caf65e79f95fe0045e727391e974c4c1e7411ff.tar.gz |
cooker: use a pool, abort on first parse error
Signed-off-by: Chris Larson <chris_larson@mentor.com>
Diffstat (limited to 'lib/bb')
-rw-r--r-- | lib/bb/cooker.py | 143 |
1 files changed, 59 insertions, 84 deletions
diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py index 30ff8f2d0..2844101f3 100644 --- a/lib/bb/cooker.py +++ b/lib/bb/cooker.py @@ -24,12 +24,13 @@ from __future__ import print_function import sys, os, glob, os.path, re, time +import atexit +import itertools import logging -import sre_constants -import threading import multiprocessing import signal -import atexit +import sre_constants +import threading from cStringIO import StringIO from contextlib import closing import bb @@ -46,11 +47,6 @@ class MultipleMatches(Exception): Exception raised when multiple file matches are found """ -class ParsingErrorsFound(Exception): - """ - Exception raised when parsing errors are found - """ - class NothingToBuild(Exception): """ Exception raised when there is nothing to build @@ -944,6 +940,10 @@ class CookerExit(bb.event.Event): def __init__(self): bb.event.Event.__init__(self) +def parse_file(task): + filename, appends = task + return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg) + class CookerParser(object): def __init__(self, cooker, filelist, masked): self.filelist = filelist @@ -961,110 +961,85 @@ class CookerParser(object): self.total = len(filelist) self.current = 0 - self.bb_cache = None - self.task_queue = None - self.result_queue = None - self.fromcache = None self.num_processes = int(self.cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or multiprocessing.cpu_count()) - def launch_processes(self): - self.task_queue = multiprocessing.Queue() - self.result_queue = multiprocessing.Queue() - + self.bb_cache = bb.cache.Cache(self.cfgdata) self.fromcache = [] + self.willparse = [] for filename in self.filelist: appends = self.cooker.get_file_appends(filename) if not self.bb_cache.cacheValid(filename): - self.task_queue.put((filename, appends)) + self.willparse.append((filename, appends)) else: self.fromcache.append((filename, appends)) self.toparse = self.total - len(self.fromcache) self.progress_chunk = max(self.toparse / 100, 1) - def worker(input, output, cfgdata): + self.start() + + def start(self): + def init(cfg): signal.signal(signal.SIGINT, signal.SIG_IGN) - for filename, appends in iter(input.get, 'STOP'): - try: - infos = bb.cache.Cache.parse(filename, appends, cfgdata) - except bb.parse.ParseError as exc: - output.put(exc) - else: - output.put(infos) + parse_file.cfg = cfg - self.processes = [] - for i in xrange(self.num_processes): - process = multiprocessing.Process(target=worker, - args=(self.task_queue, - self.result_queue, - self.cfgdata)) - process.start() - self.processes.append(process) + bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) - def shutdown(self, clean=True): - self.result_queue.close() - for process in self.processes: - if clean: - self.task_queue.put('STOP') - else: - process.terminate() - self.task_queue.close() - for process in self.processes: - process.join() - sync = threading.Thread(target=self.bb_cache.sync) - sync.start() - atexit.register(lambda: sync.join()) - if self.error > 0: - raise ParsingErrorsFound() + self.pool = multiprocessing.Pool(self.num_processes, init, [self.cfgdata]) + parsed = self.pool.imap(parse_file, self.willparse) + self.pool.close() - def parse_next(self): - if self.current >= self.total: + self.results = itertools.chain(self.load_cached(), parsed) + + def shutdown(self, clean=True): + if clean: event = bb.event.ParseCompleted(self.cached, self.parsed, self.skipped, self.masked, self.virtuals, self.error, self.total) bb.event.fire(event, self.cfgdata) - self.shutdown() - return False - elif not self.bb_cache: - self.bb_cache = bb.cache.Cache(self.cfgdata) - self.launch_processes() - bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) - return True + else: + self.pool.terminate() + self.pool.join() + sync = threading.Thread(target=self.bb_cache.sync) + sync.start() + atexit.register(lambda: sync.join()) + + def load_cached(self): + for filename, appends in self.fromcache: + cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) + yield not cached, infos + + def parse_next(self): try: - if self.result_queue.empty() and self.fromcache: - filename, appends = self.fromcache.pop() - _, result = self.bb_cache.load(filename, appends, self.cfgdata) - parsed = False - self.cached += 1 - else: - result = self.result_queue.get() - if isinstance(result, Exception): - raise result - - parsed = True - self.parsed += 1 - if self.parsed % self.progress_chunk == 0: - bb.event.fire(bb.event.ParseProgress(self.parsed), - self.cfgdata) + parsed, result = self.results.next() + except StopIteration: + self.shutdown() + return False except KeyboardInterrupt: self.shutdown(clean=False) raise - except Exception as e: - self.error += 1 - parselog.critical(str(e)) - else: - self.virtuals += len(result) - - for virtualfn, info in result: - if info.skipped: - self.skipped += 1 - else: - self.bb_cache.add_info(virtualfn, info, self.cooker.status, - parsed=parsed) + except Exception as exc: + self.shutdown(clean=False) + sys.exit(1) self.current += 1 + self.virtuals += len(result) + if parsed: + self.parsed += 1 + if self.parsed % self.progress_chunk == 0: + bb.event.fire(bb.event.ParseProgress(self.parsed), + self.cfgdata) + else: + self.cached += 1 + + for virtualfn, info in result: + if info.skipped: + self.skipped += 1 + else: + self.bb_cache.add_info(virtualfn, info, self.cooker.status, + parsed=parsed) return True def reparse(self, filename): |