From cfe0ac071cfb998e4a1dd263f8860b140843361a Mon Sep 17 00:00:00 2001 From: Joshua Watt Date: Sun, 18 Feb 2024 15:59:48 -0700 Subject: hashserv: Add unihash-exists API Adds API to check if the server is aware of the existence of a given unihash. This can be used as an optimization for sstate where a client can query the hash equivalence server to check if a unihash exists before querying the sstate cache. If the hash server isn't aware of the existence of a unihash, then there is very likely not a matching sstate object, so this should be able to significantly cut down on the number of negative hits on the sstate cache. Signed-off-by: Joshua Watt Signed-off-by: Richard Purdie --- lib/hashserv/client.py | 44 +++++++++++++++++++++++++-------- lib/hashserv/server.py | 61 +++++++++++++++++++++++++++++----------------- lib/hashserv/sqlalchemy.py | 11 +++++++++ lib/hashserv/sqlite.py | 16 ++++++++++++ lib/hashserv/tests.py | 39 +++++++++++++++++++++++++++++ 5 files changed, 138 insertions(+), 33 deletions(-) (limited to 'lib/hashserv') diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py index e6dc41791..daf1e1284 100644 --- a/lib/hashserv/client.py +++ b/lib/hashserv/client.py @@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client") class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 + MODE_EXIST_STREAM = 2 def __init__(self, username=None, password=None): super().__init__("OEHASHEQUIV", "1.1", logger) @@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient): await self.socket.send("END") return await self.socket.recv() - if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: + async def normal_to_stream(command): + r = await self.invoke({command: None}) + if r != "ok": + raise ConnectionError( + f"Unable to transition to stream mode: Bad response from server {r!r}" + ) + + self.logger.debug("Mode is now %s", command) + + if new_mode == self.mode: + return + + self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode) + + # Always transition to normal mode before switching to any other mode + if self.mode != self.MODE_NORMAL: r = await self._send_wrapper(stream_to_normal) if r != "ok": self.check_invoke_error(r) - raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) - elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: - r = await self.invoke({"get-stream": None}) - if r != "ok": - raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) - elif new_mode != self.mode: - raise Exception( - "Undefined mode transition %r -> %r" % (self.mode, new_mode) - ) + raise ConnectionError( + f"Unable to transition to normal mode: Bad response from server {r!r}" + ) + self.logger.debug("Mode is now normal") + + if new_mode == self.MODE_GET_STREAM: + await normal_to_stream("get-stream") + elif new_mode == self.MODE_EXIST_STREAM: + await normal_to_stream("exists-stream") + elif new_mode != self.MODE_NORMAL: + raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") self.mode = new_mode @@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient): {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} ) + async def unihash_exists(self, unihash): + await self._set_mode(self.MODE_EXIST_STREAM) + r = await self.send_stream(unihash) + return r == "true" + async def get_outhash(self, method, outhash, taskhash, with_unihash=True): await self._set_mode(self.MODE_NORMAL) return await self.invoke( @@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client): "report_unihash", "report_unihash_equiv", "get_taskhash", + "unihash_exists", "get_outhash", "get_stats", "reset_stats", diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py index 5ed852d1f..68f64f983 100644 --- a/lib/hashserv/server.py +++ b/lib/hashserv/server.py @@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): "get": self.handle_get, "get-outhash": self.handle_get_outhash, "get-stream": self.handle_get_stream, + "exists-stream": self.handle_exists_stream, "get-stats": self.handle_get_stats, "get-db-usage": self.handle_get_db_usage, "get-db-query-columns": self.handle_get_db_query_columns, @@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) await self.db.insert_outhash(data) - @permissions(READ_PERM) - async def handle_get_stream(self, request): + async def _stream_handler(self, handler): await self.socket.send_message("ok") while True: @@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if l == "END": break - (method, taskhash) = l.split() - # self.logger.debug('Looking up %s %s' % (method, taskhash)) - row = await self.db.get_equivalent(method, taskhash) - - if row is not None: - msg = row["unihash"] - # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) - elif self.upstream_client is not None: - upstream = await self.upstream_client.get_unihash(method, taskhash) - if upstream: - msg = upstream - else: - msg = "" - else: - msg = "" - + msg = await handler(l) await self.socket.send(msg) finally: request_measure.end() self.request_sample.end() - # Post to the backfill queue after writing the result to minimize - # the turn around time on a request - if upstream is not None: - await self.server.backfill_queue.put((method, taskhash)) - await self.socket.send("ok") return self.NO_RESPONSE + @permissions(READ_PERM) + async def handle_get_stream(self, request): + async def handler(l): + (method, taskhash) = l.split() + # self.logger.debug('Looking up %s %s' % (method, taskhash)) + row = await self.db.get_equivalent(method, taskhash) + + if row is not None: + # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) + return row["unihash"] + + if self.upstream_client is not None: + upstream = await self.upstream_client.get_unihash(method, taskhash) + if upstream: + await self.server.backfill_queue.put((method, taskhash)) + return upstream + + return "" + + return await self._stream_handler(handler) + + @permissions(READ_PERM) + async def handle_exists_stream(self, request): + async def handler(l): + if await self.db.unihash_exists(l): + return "true" + + if self.upstream_client is not None: + if await self.upstream_client.unihash_exists(l): + return "true" + + return "false" + + return await self._stream_handler(handler) + async def report_readonly(self, data): method = data["method"] outhash = data["outhash"] diff --git a/lib/hashserv/sqlalchemy.py b/lib/hashserv/sqlalchemy.py index 873547809..0e28d738f 100644 --- a/lib/hashserv/sqlalchemy.py +++ b/lib/hashserv/sqlalchemy.py @@ -48,6 +48,7 @@ class UnihashesV3(Base): __table_args__ = ( UniqueConstraint("method", "taskhash"), Index("taskhash_lookup_v4", "method", "taskhash"), + Index("unihash_lookup_v1", "unihash"), ) @@ -279,6 +280,16 @@ class Database(object): ) return map_row(result.first()) + async def unihash_exists(self, unihash): + async with self.db.begin(): + result = await self._execute( + select(UnihashesV3) + .where(UnihashesV3.unihash == unihash) + .limit(1) + ) + + return result.first() is not None + async def get_outhash(self, method, outhash): async with self.db.begin(): result = await self._execute( diff --git a/lib/hashserv/sqlite.py b/lib/hashserv/sqlite.py index 608490730..da2e844a0 100644 --- a/lib/hashserv/sqlite.py +++ b/lib/hashserv/sqlite.py @@ -144,6 +144,9 @@ class DatabaseEngine(object): cursor.execute( "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)" + ) cursor.execute( "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" ) @@ -255,6 +258,19 @@ class Database(object): ) return cursor.fetchone() + async def unihash_exists(self, unihash): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT * FROM unihashes_v3 WHERE unihash=:unihash + LIMIT 1 + """, + { + "unihash": unihash, + }, + ) + return cursor.fetchone() is not None + async def get_outhash(self, method, outhash): with closing(self.db.cursor()) as cursor: cursor.execute( diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py index aeedab357..fbbe81512 100644 --- a/lib/hashserv/tests.py +++ b/lib/hashserv/tests.py @@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object): self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') self.assertEqual(result['method'], self.METHOD) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + self.assertTrue(self.client.unihash_exists(unihash)) + self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8')) + def test_ro_server(self): rw_server = self.start_server() rw_client = self.start_client(rw_server.address) @@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): def test_stress(self): self.run_hashclient(["--address", self.server_address, "stress"], check=True) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + ], check=True) + self.assertEqual(p.stdout.strip(), "true") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + ], check=True) + self.assertEqual(p.stdout.strip(), "false") + + def test_unihash_exsits_quiet(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + "--quiet", + ]) + self.assertEqual(p.returncode, 0) + self.assertEqual(p.stdout.strip(), "") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + "--quiet", + ]) + self.assertEqual(p.returncode, 1) + self.assertEqual(p.stdout.strip(), "") + def test_remove_taskhash(self): taskhash, outhash, unihash = self.create_test_hash(self.client) self.run_hashclient([ -- cgit 1.2.3-korg