aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/bitbake-worker34
-rw-r--r--lib/bb/runqueue.py34
2 files changed, 45 insertions, 23 deletions
diff --git a/bin/bitbake-worker b/bin/bitbake-worker
index 609e276fe..eba9c562c 100755
--- a/bin/bitbake-worker
+++ b/bin/bitbake-worker
@@ -433,18 +433,30 @@ class BitbakeWorker(object):
while self.process_waitpid():
continue
-
def handle_item(self, item, func):
- if self.queue.startswith(b"<" + item + b">"):
- index = self.queue.find(b"</" + item + b">")
- while index != -1:
- try:
- func(self.queue[(len(item) + 2):index])
- except pickle.UnpicklingError:
- workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
- raise
- self.queue = self.queue[(index + len(item) + 3):]
- index = self.queue.find(b"</" + item + b">")
+ opening_tag = b"<" + item + b">"
+ if not self.queue.startswith(opening_tag):
+ return
+
+ tag_len = len(opening_tag)
+ if len(self.queue) < tag_len + 4:
+ # we need to receive more data
+ return
+ header = self.queue[tag_len:tag_len + 4]
+ payload_len = int.from_bytes(header, 'big')
+ # closing tag has length (tag_len + 1)
+ if len(self.queue) < tag_len * 2 + 1 + payload_len:
+ # we need to receive more data
+ return
+
+ index = self.queue.find(b"</" + item + b">")
+ if index != -1:
+ try:
+ func(self.queue[(tag_len + 4):index])
+ except pickle.UnpicklingError:
+ workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
+ raise
+ self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
def handle_cookercfg(self, data):
self.cookercfg = pickle.loads(data)
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 50475ea0c..1029eec07 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -1318,6 +1318,16 @@ class RunQueue:
self.worker = {}
self.fakeworker = {}
+ @staticmethod
+ def send_pickled_data(worker, data, name):
+ msg = bytearray()
+ msg.extend(b"<" + name.encode() + b">")
+ pickled_data = pickle.dumps(data)
+ msg.extend(len(pickled_data).to_bytes(4, 'big'))
+ msg.extend(pickled_data)
+ msg.extend(b"</" + name.encode() + b">")
+ worker.stdin.write(msg)
+
def _start_worker(self, mc, fakeroot = False, rqexec = None):
logger.debug("Starting bitbake-worker")
magic = "decafbad"
@@ -1355,9 +1365,9 @@ class RunQueue:
"umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
}
- worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
- worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
- worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
+ RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
+ RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
+ RunQueue.send_pickled_data(worker, workerdata, "workerdata")
worker.stdin.flush()
return RunQueueWorker(worker, workerpipe)
@@ -1367,7 +1377,7 @@ class RunQueue:
return
logger.debug("Teardown for bitbake-worker")
try:
- worker.process.stdin.write(b"<quit></quit>")
+ RunQueue.send_pickled_data(worker.process, b"", "quit")
worker.process.stdin.flush()
worker.process.stdin.close()
except IOError:
@@ -1894,14 +1904,14 @@ class RunQueueExecute:
def finish_now(self):
for mc in self.rq.worker:
try:
- self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
self.rq.worker[mc].process.stdin.flush()
except IOError:
# worker must have died?
pass
for mc in self.rq.fakeworker:
try:
- self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
self.rq.fakeworker[mc].process.stdin.flush()
except IOError:
# worker must have died?
@@ -2196,10 +2206,10 @@ class RunQueueExecute:
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
if not mc in self.rq.fakeworker:
self.rq.start_fakeworker(self, mc)
- self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
self.rq.fakeworker[mc].process.stdin.flush()
else:
- self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
self.rq.worker[mc].process.stdin.flush()
self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2297,10 +2307,10 @@ class RunQueueExecute:
self.rq.state = runQueueFailed
self.stats.taskFailed()
return True
- self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
self.rq.fakeworker[mc].process.stdin.flush()
else:
- self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
self.rq.worker[mc].process.stdin.flush()
self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2502,9 +2512,9 @@ class RunQueueExecute:
if changed:
for mc in self.rq.worker:
- self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
for mc in self.rq.fakeworker:
- self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))