diff options
-rwxr-xr-x | bin/bitbake-worker | 34 | ||||
-rw-r--r-- | lib/bb/runqueue.py | 34 |
2 files changed, 45 insertions, 23 deletions
diff --git a/bin/bitbake-worker b/bin/bitbake-worker index 609e276fe..eba9c562c 100755 --- a/bin/bitbake-worker +++ b/bin/bitbake-worker @@ -433,18 +433,30 @@ class BitbakeWorker(object): while self.process_waitpid(): continue - def handle_item(self, item, func): - if self.queue.startswith(b"<" + item + b">"): - index = self.queue.find(b"</" + item + b">") - while index != -1: - try: - func(self.queue[(len(item) + 2):index]) - except pickle.UnpicklingError: - workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) - raise - self.queue = self.queue[(index + len(item) + 3):] - index = self.queue.find(b"</" + item + b">") + opening_tag = b"<" + item + b">" + if not self.queue.startswith(opening_tag): + return + + tag_len = len(opening_tag) + if len(self.queue) < tag_len + 4: + # we need to receive more data + return + header = self.queue[tag_len:tag_len + 4] + payload_len = int.from_bytes(header, 'big') + # closing tag has length (tag_len + 1) + if len(self.queue) < tag_len * 2 + 1 + payload_len: + # we need to receive more data + return + + index = self.queue.find(b"</" + item + b">") + if index != -1: + try: + func(self.queue[(tag_len + 4):index]) + except pickle.UnpicklingError: + workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) + raise + self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):] def handle_cookercfg(self, data): self.cookercfg = pickle.loads(data) diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index 50475ea0c..1029eec07 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -1318,6 +1318,16 @@ class RunQueue: self.worker = {} self.fakeworker = {} + @staticmethod + def send_pickled_data(worker, data, name): + msg = bytearray() + msg.extend(b"<" + name.encode() + b">") + pickled_data = pickle.dumps(data) + msg.extend(len(pickled_data).to_bytes(4, 'big')) + msg.extend(pickled_data) + msg.extend(b"</" + name.encode() + b">") + worker.stdin.write(msg) + def _start_worker(self, mc, fakeroot = False, rqexec = None): logger.debug("Starting bitbake-worker") magic = "decafbad" @@ -1355,9 +1365,9 @@ class RunQueue: "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), } - worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") - worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") - worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") + RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") + RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") + RunQueue.send_pickled_data(worker, workerdata, "workerdata") worker.stdin.flush() return RunQueueWorker(worker, workerpipe) @@ -1367,7 +1377,7 @@ class RunQueue: return logger.debug("Teardown for bitbake-worker") try: - worker.process.stdin.write(b"<quit></quit>") + RunQueue.send_pickled_data(worker.process, b"", "quit") worker.process.stdin.flush() worker.process.stdin.close() except IOError: @@ -1894,14 +1904,14 @@ class RunQueueExecute: def finish_now(self): for mc in self.rq.worker: try: - self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") self.rq.worker[mc].process.stdin.flush() except IOError: # worker must have died? pass for mc in self.rq.fakeworker: try: - self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") self.rq.fakeworker[mc].process.stdin.flush() except IOError: # worker must have died? @@ -2196,10 +2206,10 @@ class RunQueueExecute: if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: if not mc in self.rq.fakeworker: self.rq.start_fakeworker(self, mc) - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) @@ -2297,10 +2307,10 @@ class RunQueueExecute: self.rq.state = runQueueFailed self.stats.taskFailed() return True - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) @@ -2502,9 +2512,9 @@ class RunQueueExecute: if changed: for mc in self.rq.worker: - self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") for mc in self.rq.fakeworker: - self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) |