aboutsummaryrefslogtreecommitdiffstats
path: root/lib/hashserv
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2024-02-18 15:59:48 -0700
committerRichard Purdie <richard.purdie@linuxfoundation.org>2024-02-19 11:53:15 +0000
commitcfe0ac071cfb998e4a1dd263f8860b140843361a (patch)
tree5baa4f4eeafc8816ecceba2a55f8d8eadeb7bcd1 /lib/hashserv
parent0409a00d62f45afb1b172acbcea17bf17942e846 (diff)
downloadbitbake-cfe0ac071cfb998e4a1dd263f8860b140843361a.tar.gz
hashserv: Add unihash-exists API
Adds API to check if the server is aware of the existence of a given unihash. This can be used as an optimization for sstate where a client can query the hash equivalence server to check if a unihash exists before querying the sstate cache. If the hash server isn't aware of the existence of a unihash, then there is very likely not a matching sstate object, so this should be able to significantly cut down on the number of negative hits on the sstate cache. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'lib/hashserv')
-rw-r--r--lib/hashserv/client.py44
-rw-r--r--lib/hashserv/server.py61
-rw-r--r--lib/hashserv/sqlalchemy.py11
-rw-r--r--lib/hashserv/sqlite.py16
-rw-r--r--lib/hashserv/tests.py39
5 files changed, 138 insertions, 33 deletions
diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py
index e6dc41791..daf1e1284 100644
--- a/lib/hashserv/client.py
+++ b/lib/hashserv/client.py
@@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client")
class AsyncClient(bb.asyncrpc.AsyncClient):
MODE_NORMAL = 0
MODE_GET_STREAM = 1
+ MODE_EXIST_STREAM = 2
def __init__(self, username=None, password=None):
super().__init__("OEHASHEQUIV", "1.1", logger)
@@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
await self.socket.send("END")
return await self.socket.recv()
- if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
+ async def normal_to_stream(command):
+ r = await self.invoke({command: None})
+ if r != "ok":
+ raise ConnectionError(
+ f"Unable to transition to stream mode: Bad response from server {r!r}"
+ )
+
+ self.logger.debug("Mode is now %s", command)
+
+ if new_mode == self.mode:
+ return
+
+ self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode)
+
+ # Always transition to normal mode before switching to any other mode
+ if self.mode != self.MODE_NORMAL:
r = await self._send_wrapper(stream_to_normal)
if r != "ok":
self.check_invoke_error(r)
- raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r)
- elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
- r = await self.invoke({"get-stream": None})
- if r != "ok":
- raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r)
- elif new_mode != self.mode:
- raise Exception(
- "Undefined mode transition %r -> %r" % (self.mode, new_mode)
- )
+ raise ConnectionError(
+ f"Unable to transition to normal mode: Bad response from server {r!r}"
+ )
+ self.logger.debug("Mode is now normal")
+
+ if new_mode == self.MODE_GET_STREAM:
+ await normal_to_stream("get-stream")
+ elif new_mode == self.MODE_EXIST_STREAM:
+ await normal_to_stream("exists-stream")
+ elif new_mode != self.MODE_NORMAL:
+ raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}")
self.mode = new_mode
@@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
{"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
)
+ async def unihash_exists(self, unihash):
+ await self._set_mode(self.MODE_EXIST_STREAM)
+ r = await self.send_stream(unihash)
+ return r == "true"
+
async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
await self._set_mode(self.MODE_NORMAL)
return await self.invoke(
@@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client):
"report_unihash",
"report_unihash_equiv",
"get_taskhash",
+ "unihash_exists",
"get_outhash",
"get_stats",
"reset_stats",
diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py
index 5ed852d1f..68f64f983 100644
--- a/lib/hashserv/server.py
+++ b/lib/hashserv/server.py
@@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
"get": self.handle_get,
"get-outhash": self.handle_get_outhash,
"get-stream": self.handle_get_stream,
+ "exists-stream": self.handle_exists_stream,
"get-stats": self.handle_get_stats,
"get-db-usage": self.handle_get_db_usage,
"get-db-query-columns": self.handle_get_db_query_columns,
@@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
await self.db.insert_outhash(data)
- @permissions(READ_PERM)
- async def handle_get_stream(self, request):
+ async def _stream_handler(self, handler):
await self.socket.send_message("ok")
while True:
@@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
if l == "END":
break
- (method, taskhash) = l.split()
- # self.logger.debug('Looking up %s %s' % (method, taskhash))
- row = await self.db.get_equivalent(method, taskhash)
-
- if row is not None:
- msg = row["unihash"]
- # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
- elif self.upstream_client is not None:
- upstream = await self.upstream_client.get_unihash(method, taskhash)
- if upstream:
- msg = upstream
- else:
- msg = ""
- else:
- msg = ""
-
+ msg = await handler(l)
await self.socket.send(msg)
finally:
request_measure.end()
self.request_sample.end()
- # Post to the backfill queue after writing the result to minimize
- # the turn around time on a request
- if upstream is not None:
- await self.server.backfill_queue.put((method, taskhash))
-
await self.socket.send("ok")
return self.NO_RESPONSE
+ @permissions(READ_PERM)
+ async def handle_get_stream(self, request):
+ async def handler(l):
+ (method, taskhash) = l.split()
+ # self.logger.debug('Looking up %s %s' % (method, taskhash))
+ row = await self.db.get_equivalent(method, taskhash)
+
+ if row is not None:
+ # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
+ return row["unihash"]
+
+ if self.upstream_client is not None:
+ upstream = await self.upstream_client.get_unihash(method, taskhash)
+ if upstream:
+ await self.server.backfill_queue.put((method, taskhash))
+ return upstream
+
+ return ""
+
+ return await self._stream_handler(handler)
+
+ @permissions(READ_PERM)
+ async def handle_exists_stream(self, request):
+ async def handler(l):
+ if await self.db.unihash_exists(l):
+ return "true"
+
+ if self.upstream_client is not None:
+ if await self.upstream_client.unihash_exists(l):
+ return "true"
+
+ return "false"
+
+ return await self._stream_handler(handler)
+
async def report_readonly(self, data):
method = data["method"]
outhash = data["outhash"]
diff --git a/lib/hashserv/sqlalchemy.py b/lib/hashserv/sqlalchemy.py
index 873547809..0e28d738f 100644
--- a/lib/hashserv/sqlalchemy.py
+++ b/lib/hashserv/sqlalchemy.py
@@ -48,6 +48,7 @@ class UnihashesV3(Base):
__table_args__ = (
UniqueConstraint("method", "taskhash"),
Index("taskhash_lookup_v4", "method", "taskhash"),
+ Index("unihash_lookup_v1", "unihash"),
)
@@ -279,6 +280,16 @@ class Database(object):
)
return map_row(result.first())
+ async def unihash_exists(self, unihash):
+ async with self.db.begin():
+ result = await self._execute(
+ select(UnihashesV3)
+ .where(UnihashesV3.unihash == unihash)
+ .limit(1)
+ )
+
+ return result.first() is not None
+
async def get_outhash(self, method, outhash):
async with self.db.begin():
result = await self._execute(
diff --git a/lib/hashserv/sqlite.py b/lib/hashserv/sqlite.py
index 608490730..da2e844a0 100644
--- a/lib/hashserv/sqlite.py
+++ b/lib/hashserv/sqlite.py
@@ -145,6 +145,9 @@ class DatabaseEngine(object):
"CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)"
)
cursor.execute(
+ "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)"
+ )
+ cursor.execute(
"CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)"
)
cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)")
@@ -255,6 +258,19 @@ class Database(object):
)
return cursor.fetchone()
+ async def unihash_exists(self, unihash):
+ with closing(self.db.cursor()) as cursor:
+ cursor.execute(
+ """
+ SELECT * FROM unihashes_v3 WHERE unihash=:unihash
+ LIMIT 1
+ """,
+ {
+ "unihash": unihash,
+ },
+ )
+ return cursor.fetchone() is not None
+
async def get_outhash(self, method, outhash):
with closing(self.db.cursor()) as cursor:
cursor.execute(
diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
index aeedab357..fbbe81512 100644
--- a/lib/hashserv/tests.py
+++ b/lib/hashserv/tests.py
@@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object):
self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream')
self.assertEqual(result['method'], self.METHOD)
+ def test_unihash_exsits(self):
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
+ self.assertTrue(self.client.unihash_exists(unihash))
+ self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8'))
+
def test_ro_server(self):
rw_server = self.start_server()
rw_client = self.start_client(rw_server.address)
@@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
def test_stress(self):
self.run_hashclient(["--address", self.server_address, "stress"], check=True)
+ def test_unihash_exsits(self):
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
+
+ p = self.run_hashclient([
+ "--address", self.server_address,
+ "unihash-exists", unihash,
+ ], check=True)
+ self.assertEqual(p.stdout.strip(), "true")
+
+ p = self.run_hashclient([
+ "--address", self.server_address,
+ "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8',
+ ], check=True)
+ self.assertEqual(p.stdout.strip(), "false")
+
+ def test_unihash_exsits_quiet(self):
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
+
+ p = self.run_hashclient([
+ "--address", self.server_address,
+ "unihash-exists", unihash,
+ "--quiet",
+ ])
+ self.assertEqual(p.returncode, 0)
+ self.assertEqual(p.stdout.strip(), "")
+
+ p = self.run_hashclient([
+ "--address", self.server_address,
+ "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8',
+ "--quiet",
+ ])
+ self.assertEqual(p.returncode, 1)
+ self.assertEqual(p.stdout.strip(), "")
+
def test_remove_taskhash(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
self.run_hashclient([