diff options
author | Paul Barker <pbarker@konsulko.com> | 2021-04-26 09:16:30 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2021-04-27 15:12:22 +0100 |
commit | 5afb9586b0a4a23a05efb0e8ff4a97262631ae4a (patch) | |
tree | 454248c28d1d3d78b34e84c3a7a0fdf9dc9692b6 /lib/hashserv/client.py | |
parent | 4105ffd967fa86154ad67366aaf0f898abf78d14 (diff) | |
download | bitbake-5afb9586b0a4a23a05efb0e8ff4a97262631ae4a.tar.gz |
hashserv: Refactor to use asyncrpc
The asyncrpc module can now be used to provide the json & asyncio based
RPC system used by hashserv.
Signed-off-by: Paul Barker <pbarker@konsulko.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'lib/hashserv/client.py')
-rw-r--r-- | lib/hashserv/client.py | 137 |
1 files changed, 16 insertions, 121 deletions
diff --git a/lib/hashserv/client.py b/lib/hashserv/client.py index f370cba63..531170967 100644 --- a/lib/hashserv/client.py +++ b/lib/hashserv/client.py @@ -8,106 +8,26 @@ import json import logging import socket import os -from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client +import bb.asyncrpc +from . import create_async_client logger = logging.getLogger("hashserv.client") -class AsyncClient(object): +class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 def __init__(self): - self.reader = None - self.writer = None + super().__init__('OEHASHEQUIV', '1.1', logger) self.mode = self.MODE_NORMAL - self.max_chunk = DEFAULT_MAX_CHUNK - async def connect_tcp(self, address, port): - async def connect_sock(): - return await asyncio.open_connection(address, port) - - self._connect_sock = connect_sock - - async def connect_unix(self, path): - async def connect_sock(): - return await asyncio.open_unix_connection(path) - - self._connect_sock = connect_sock - - async def connect(self): - if self.reader is None or self.writer is None: - (self.reader, self.writer) = await self._connect_sock() - - self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8")) - await self.writer.drain() - - cur_mode = self.mode - self.mode = self.MODE_NORMAL - await self._set_mode(cur_mode) - - async def close(self): - self.reader = None - - if self.writer is not None: - self.writer.close() - self.writer = None - - async def _send_wrapper(self, proc): - count = 0 - while True: - try: - await self.connect() - return await proc() - except ( - OSError, - ConnectionError, - json.JSONDecodeError, - UnicodeDecodeError, - ) as e: - logger.warning("Error talking to server: %s" % e) - if count >= 3: - if not isinstance(e, ConnectionError): - raise ConnectionError(str(e)) - raise e - await self.close() - count += 1 - - async def send_message(self, msg): - async def get_line(): - line = await self.reader.readline() - if not line: - raise ConnectionError("Connection closed") - - line = line.decode("utf-8") - - if not line.endswith("\n"): - raise ConnectionError("Bad message %r" % message) - - return line - - async def proc(): - for c in chunkify(json.dumps(msg), self.max_chunk): - self.writer.write(c.encode("utf-8")) - await self.writer.drain() - - l = await get_line() - - m = json.loads(l) - if m and "chunk-stream" in m: - lines = [] - while True: - l = (await get_line()).rstrip("\n") - if not l: - break - lines.append(l) - - m = json.loads("".join(lines)) - - return m - - return await self._send_wrapper(proc) + async def setup_connection(self): + await super().setup_connection() + cur_mode = self.mode + self.mode = self.MODE_NORMAL + await self._set_mode(cur_mode) async def send_stream(self, msg): async def proc(): @@ -185,12 +105,10 @@ class AsyncClient(object): return (await self.send_message({"backfill-wait": None}))["tasks"] -class Client(object): +class Client(bb.asyncrpc.Client): def __init__(self): - self.client = AsyncClient() - self.loop = asyncio.new_event_loop() - - for call in ( + super().__init__() + self._add_methods( "connect_tcp", "close", "get_unihash", @@ -200,30 +118,7 @@ class Client(object): "get_stats", "reset_stats", "backfill_wait", - ): - downcall = getattr(self.client, call) - setattr(self, call, self._get_downcall_wrapper(downcall)) - - def _get_downcall_wrapper(self, downcall): - def wrapper(*args, **kwargs): - return self.loop.run_until_complete(downcall(*args, **kwargs)) - - return wrapper - - def connect_unix(self, path): - # AF_UNIX has path length issues so chdir here to workaround - cwd = os.getcwd() - try: - os.chdir(os.path.dirname(path)) - self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path))) - self.loop.run_until_complete(self.client.connect()) - finally: - os.chdir(cwd) - - @property - def max_chunk(self): - return self.client.max_chunk - - @max_chunk.setter - def max_chunk(self, value): - self.client.max_chunk = value + ) + + def _get_async_client(self): + return AsyncClient() |