aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEtienne Cordonnier <ecordonnier@snap.com>2023-09-21 09:56:58 +0200
committerLuca Ceresoli <luca.ceresoli@bootlin.com>2023-10-27 12:28:03 +0200
commitaf6ca0ca827ff1e797ec74d95aa1443687f3ae0f (patch)
tree703be57a9007b369b107bf3014a14e10052ed904
parentefbd9d54f57be7a7a10f0b56e7e62c25974e99e6 (diff)
downloadbitbake-contrib-af6ca0ca827ff1e797ec74d95aa1443687f3ae0f.tar.gz
bitbake-worker: add header with length of message
The IPC mechanism between runqueue.py and bitbake-worker is currently not scalable: The data is sent with the format <tag>pickled-data</tag>, and bitbake-worker has no information about the size of the message. Therefore, the bitbake-worker is calling select() and read() in a loop, and then calling "self.queue.find(b"</" + item + b">")" for each chunk received. This does not scale, because queue.find has a linear complexity relative to the size of the queue, and workerdata messages get very big e.g. for builds which reference a lot of files in SRC_URI. The number of chunks varies, but on my test system a lot of chunks of 65536 bytes are sent, and each iteration takes 0.1 seconds, making the transfer of the "workerdata" data very slow (on my test setup 35 seconds before this fix, and 1.5 seconds after this fix). This commit adds a 4 bytes header after <tag>, so that bitbake-worker knows how many bytes need to be received, and does not need to constantly search the whole queue for </tag>. Signed-off-by: Etienne Cordonnier <ecordonnier@snap.com> Signed-off-by: Alexandre Belloni <alexandre.belloni@bootlin.com>
-rwxr-xr-xbin/bitbake-worker34
-rw-r--r--lib/bb/runqueue.py34
2 files changed, 45 insertions, 23 deletions
diff --git a/bin/bitbake-worker b/bin/bitbake-worker
index 609e276fe..eba9c562c 100755
--- a/bin/bitbake-worker
+++ b/bin/bitbake-worker
@@ -433,18 +433,30 @@ class BitbakeWorker(object):
while self.process_waitpid():
continue
-
def handle_item(self, item, func):
- if self.queue.startswith(b"<" + item + b">"):
- index = self.queue.find(b"</" + item + b">")
- while index != -1:
- try:
- func(self.queue[(len(item) + 2):index])
- except pickle.UnpicklingError:
- workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
- raise
- self.queue = self.queue[(index + len(item) + 3):]
- index = self.queue.find(b"</" + item + b">")
+ opening_tag = b"<" + item + b">"
+ if not self.queue.startswith(opening_tag):
+ return
+
+ tag_len = len(opening_tag)
+ if len(self.queue) < tag_len + 4:
+ # we need to receive more data
+ return
+ header = self.queue[tag_len:tag_len + 4]
+ payload_len = int.from_bytes(header, 'big')
+ # closing tag has length (tag_len + 1)
+ if len(self.queue) < tag_len * 2 + 1 + payload_len:
+ # we need to receive more data
+ return
+
+ index = self.queue.find(b"</" + item + b">")
+ if index != -1:
+ try:
+ func(self.queue[(tag_len + 4):index])
+ except pickle.UnpicklingError:
+ workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
+ raise
+ self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
def handle_cookercfg(self, data):
self.cookercfg = pickle.loads(data)
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 50475ea0c..1029eec07 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -1318,6 +1318,16 @@ class RunQueue:
self.worker = {}
self.fakeworker = {}
+ @staticmethod
+ def send_pickled_data(worker, data, name):
+ msg = bytearray()
+ msg.extend(b"<" + name.encode() + b">")
+ pickled_data = pickle.dumps(data)
+ msg.extend(len(pickled_data).to_bytes(4, 'big'))
+ msg.extend(pickled_data)
+ msg.extend(b"</" + name.encode() + b">")
+ worker.stdin.write(msg)
+
def _start_worker(self, mc, fakeroot = False, rqexec = None):
logger.debug("Starting bitbake-worker")
magic = "decafbad"
@@ -1355,9 +1365,9 @@ class RunQueue:
"umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
}
- worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
- worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
- worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
+ RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
+ RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
+ RunQueue.send_pickled_data(worker, workerdata, "workerdata")
worker.stdin.flush()
return RunQueueWorker(worker, workerpipe)
@@ -1367,7 +1377,7 @@ class RunQueue:
return
logger.debug("Teardown for bitbake-worker")
try:
- worker.process.stdin.write(b"<quit></quit>")
+ RunQueue.send_pickled_data(worker.process, b"", "quit")
worker.process.stdin.flush()
worker.process.stdin.close()
except IOError:
@@ -1894,14 +1904,14 @@ class RunQueueExecute:
def finish_now(self):
for mc in self.rq.worker:
try:
- self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
self.rq.worker[mc].process.stdin.flush()
except IOError:
# worker must have died?
pass
for mc in self.rq.fakeworker:
try:
- self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
self.rq.fakeworker[mc].process.stdin.flush()
except IOError:
# worker must have died?
@@ -2196,10 +2206,10 @@ class RunQueueExecute:
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
if not mc in self.rq.fakeworker:
self.rq.start_fakeworker(self, mc)
- self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
self.rq.fakeworker[mc].process.stdin.flush()
else:
- self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
self.rq.worker[mc].process.stdin.flush()
self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2297,10 +2307,10 @@ class RunQueueExecute:
self.rq.state = runQueueFailed
self.stats.taskFailed()
return True
- self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
self.rq.fakeworker[mc].process.stdin.flush()
else:
- self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
self.rq.worker[mc].process.stdin.flush()
self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2502,9 +2512,9 @@ class RunQueueExecute:
if changed:
for mc in self.rq.worker:
- self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
for mc in self.rq.fakeworker:
- self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+ RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))