aboutsummaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/cooker.py
diff options
context:
space:
mode:
authorChris Larson <chris_larson@mentor.com>2010-11-18 20:21:54 -0700
committerRichard Purdie <rpurdie@linux.intel.com>2011-01-04 14:46:42 +0000
commit32ea7668712a50d8f8b67d5e4558039e5092a485 (patch)
tree2473f8b1aade6131c7a37fbad2cc4d23998a3a56 /bitbake/lib/bb/cooker.py
parent570bec37a898fb502d166a22f20bdb1da8c21c38 (diff)
downloadopenembedded-core-contrib-32ea7668712a50d8f8b67d5e4558039e5092a485.tar.gz
Implement parallel parsing support
This utilizes python's multiprocessing module. The default number of threads to be used is the same as the number of available processor cores, however, you can manually set this with the BB_NUMBER_PARSE_THREADS variable. (Bitbake rev: c7b3ec819549e51e438d293969e205883fee725f) Signed-off-by: Chris Larson <chris_larson@mentor.com> Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
Diffstat (limited to 'bitbake/lib/bb/cooker.py')
-rw-r--r--bitbake/lib/bb/cooker.py135
1 files changed, 97 insertions, 38 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py
index 6194919e4c..0143c149b8 100644
--- a/bitbake/lib/bb/cooker.py
+++ b/bitbake/lib/bb/cooker.py
@@ -25,6 +25,8 @@ from __future__ import print_function
import sys, os, glob, os.path, re, time
import logging
import sre_constants
+import multiprocessing
+import signal
from cStringIO import StringIO
from contextlib import closing
import bb
@@ -976,7 +978,7 @@ class CookerExit(bb.event.Event):
def __init__(self):
bb.event.Event.__init__(self)
-class CookerParser:
+class CookerParser(object):
def __init__(self, cooker, filelist, masked):
# Internal data
self.filelist = filelist
@@ -987,49 +989,106 @@ class CookerParser:
self.cached = 0
self.error = 0
self.masked = masked
- self.total = len(filelist)
self.skipped = 0
self.virtuals = 0
+ self.total = len(filelist)
- # Pointer to the next file to parse
- self.pointer = 0
-
- def parse_next(self):
- cooker = self.cooker
- if self.pointer < len(self.filelist):
- f = self.filelist[self.pointer]
-
- try:
- fromCache, skipped, virtuals = cooker.bb_cache.loadData(f, cooker.get_file_appends(f), cooker.configuration.data, cooker.status)
- if fromCache:
- self.cached += 1
- else:
- self.parsed += 1
-
- self.skipped += skipped
- self.virtuals += virtuals
+ # current to the next file to parse
+ self.current = 0
+ self.result_queue = None
+ self.fromcache = None
- except KeyboardInterrupt:
- cooker.bb_cache.remove(f)
- cooker.bb_cache.sync()
- raise
- except Exception as e:
- self.error += 1
- cooker.bb_cache.remove(f)
- parselog.exception("Unable to open %s", f)
- except:
- cooker.bb_cache.remove(f)
- raise
- finally:
- bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed, self.skipped, self.masked, self.virtuals, self.error, self.total), cooker.configuration.event_data)
+ self.launch_processes()
- self.pointer += 1
+ def launch_processes(self):
+ self.task_queue = multiprocessing.Queue()
+ self.result_queue = multiprocessing.Queue()
+
+ self.fromcache = []
+ cfgdata = self.cooker.configuration.data
+ for filename in self.filelist:
+ appends = self.cooker.get_file_appends(filename)
+ if not self.cooker.bb_cache.cacheValid(filename):
+ self.task_queue.put((filename, appends))
+ else:
+ self.fromcache.append((filename, appends))
+
+ def worker(input, output, cfgdata):
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ for filename, appends in iter(input.get, 'STOP'):
+ infos = bb.cache.Cache.parse(filename, appends, cfgdata)
+ output.put(infos)
+
+ self.processes = []
+ num_processes = int(cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or
+ multiprocessing.cpu_count())
+ for i in xrange(num_processes):
+ process = multiprocessing.Process(target=worker,
+ args=(self.task_queue,
+ self.result_queue,
+ cfgdata))
+ process.start()
+ self.processes.append(process)
+
+ def shutdown(self, clean=True):
+ self.result_queue.close()
+ for process in self.processes:
+ if clean:
+ self.task_queue.put('STOP')
+ else:
+ process.terminate()
+ self.task_queue.close()
+ for process in self.processes:
+ process.join()
+ self.cooker.bb_cache.sync()
+ bb.codeparser.parser_cache_save(self.cooker.configuration.data)
+ if self.error > 0:
+ raise ParsingErrorsFound()
+
+ def progress(self):
+ bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed,
+ self.skipped, self.masked,
+ self.virtuals, self.error,
+ self.total),
+ self.cooker.configuration.event_data)
- if self.pointer >= self.total:
- cooker.bb_cache.sync()
- bb.codeparser.parser_cache_save(cooker.configuration.data)
- if self.error > 0:
- raise ParsingErrorsFound
+ def parse_next(self):
+ cooker = self.cooker
+ if self.current >= self.total:
+ self.shutdown()
return False
+
+ try:
+ if self.result_queue.empty() and self.fromcache:
+ filename, appends = self.fromcache.pop()
+ _, infos = cooker.bb_cache.load(filename, appends,
+ self.cooker.configuration.data)
+ parsed = False
+ else:
+ infos = self.result_queue.get()
+ parsed = True
+ except KeyboardInterrupt:
+ self.shutdown(clean=False)
+ raise
+ except Exception as e:
+ self.error += 1
+ parselog.critical(str(e))
+ else:
+ if parsed:
+ self.parsed += 1
+ else:
+ self.cached += 1
+ self.virtuals += len(infos)
+
+ for virtualfn, info in infos:
+ cooker.bb_cache.add_info(virtualfn, info, cooker.status,
+ parsed=parsed)
+ if info.skipped:
+ self.skipped += 1
+ finally:
+ self.progress()
+
+ self.current += 1
return True
+