aboutsummaryrefslogtreecommitdiffstats
path: root/lib/hashserv/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/hashserv/server.py')
-rw-r--r--lib/hashserv/server.py340
1 files changed, 221 insertions, 119 deletions
diff --git a/lib/hashserv/server.py b/lib/hashserv/server.py
index a059e5211..ef8227d43 100644
--- a/lib/hashserv/server.py
+++ b/lib/hashserv/server.py
@@ -5,11 +5,12 @@
from contextlib import closing, contextmanager
from datetime import datetime
+import enum
import asyncio
import logging
import math
import time
-from . import create_async_client, TABLE_COLUMNS
+from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS
import bb.asyncrpc
@@ -106,56 +107,64 @@ class Stats(object):
return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
-def insert_task(cursor, data, ignore=False):
+@enum.unique
+class Resolve(enum.Enum):
+ FAIL = enum.auto()
+ IGNORE = enum.auto()
+ REPLACE = enum.auto()
+
+
+def insert_table(cursor, table, data, on_conflict):
+ resolve = {
+ Resolve.FAIL: "",
+ Resolve.IGNORE: " OR IGNORE",
+ Resolve.REPLACE: " OR REPLACE",
+ }[on_conflict]
+
keys = sorted(data.keys())
- query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % (
- " OR IGNORE" if ignore else "",
- ', '.join(keys),
- ', '.join(':' + k for k in keys))
+ query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format(
+ resolve=resolve,
+ table=table,
+ fields=", ".join(keys),
+ values=", ".join(":" + k for k in keys),
+ )
+ prevrowid = cursor.lastrowid
cursor.execute(query, data)
-
-async def copy_from_upstream(client, db, method, taskhash):
- d = await client.get_taskhash(method, taskhash, True)
+ logging.debug(
+ "Inserting %r into %s, %s",
+ data,
+ table,
+ on_conflict
+ )
+ return (cursor.lastrowid, cursor.lastrowid != prevrowid)
+
+def insert_unihash(cursor, data, on_conflict):
+ return insert_table(cursor, "unihashes_v2", data, on_conflict)
+
+def insert_outhash(cursor, data, on_conflict):
+ return insert_table(cursor, "outhashes_v2", data, on_conflict)
+
+async def copy_unihash_from_upstream(client, db, method, taskhash):
+ d = await client.get_taskhash(method, taskhash)
if d is not None:
- # Filter out unknown columns
- d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
-
with closing(db.cursor()) as cursor:
- insert_task(cursor, d)
+ insert_unihash(
+ cursor,
+ {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS},
+ Resolve.IGNORE,
+ )
db.commit()
-
return d
-async def copy_outhash_from_upstream(client, db, method, outhash, taskhash):
- d = await client.get_outhash(method, outhash, taskhash)
- if d is not None:
- # Filter out unknown columns
- d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
- with closing(db.cursor()) as cursor:
- insert_task(cursor, d)
- db.commit()
+class ServerCursor(object):
+ def __init__(self, db, cursor, upstream):
+ self.db = db
+ self.cursor = cursor
+ self.upstream = upstream
- return d
class ServerClient(bb.asyncrpc.AsyncServerConnection):
- FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
- ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
- OUTHASH_QUERY = '''
- -- Find tasks with a matching outhash (that is, tasks that
- -- are equivalent)
- SELECT * FROM tasks_v2 WHERE method=:method AND outhash=:outhash
-
- -- If there is an exact match on the taskhash, return it.
- -- Otherwise return the oldest matching outhash of any
- -- taskhash
- ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
- created ASC
-
- -- Only return one row
- LIMIT 1
- '''
-
def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only):
super().__init__(reader, writer, 'OEHASHEQUIV', logger)
self.db = db
@@ -210,36 +219,102 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
async def handle_get(self, request):
method = request['method']
taskhash = request['taskhash']
+ fetch_all = request.get('all', False)
- if request.get('all', False):
- row = self.query_equivalent(method, taskhash, self.ALL_QUERY)
- else:
- row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
+ with closing(self.db.cursor()) as cursor:
+ d = await self.get_unihash(cursor, method, taskhash, fetch_all)
- if row is not None:
- logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
- d = {k: row[k] for k in row.keys()}
- elif self.upstream_client is not None:
- d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash)
+ self.write_message(d)
+
+ async def get_unihash(self, cursor, method, taskhash, fetch_all=False):
+ d = None
+
+ if fetch_all:
+ cursor.execute(
+ '''
+ SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
+ INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
+ WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
+ ORDER BY outhashes_v2.created ASC
+ LIMIT 1
+ ''',
+ {
+ 'method': method,
+ 'taskhash': taskhash,
+ }
+
+ )
+ row = cursor.fetchone()
+
+ if row is not None:
+ d = {k: row[k] for k in row.keys()}
+ elif self.upstream_client is not None:
+ d = await self.upstream_client.get_taskhash(method, taskhash, True)
+ self.update_unified(cursor, d)
+ self.db.commit()
else:
- d = None
+ row = self.query_equivalent(cursor, method, taskhash)
+
+ if row is not None:
+ d = {k: row[k] for k in row.keys()}
+ elif self.upstream_client is not None:
+ d = await self.upstream_client.get_taskhash(method, taskhash)
+ d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}
+ insert_unihash(cursor, d, Resolve.IGNORE)
+ self.db.commit()
- self.write_message(d)
+ return d
async def handle_get_outhash(self, request):
+ method = request['method']
+ outhash = request['outhash']
+ taskhash = request['taskhash']
+
with closing(self.db.cursor()) as cursor:
- cursor.execute(self.OUTHASH_QUERY,
- {k: request[k] for k in ('method', 'outhash', 'taskhash')})
+ d = await self.get_outhash(cursor, method, outhash, taskhash)
- row = cursor.fetchone()
+ self.write_message(d)
+
+ async def get_outhash(self, cursor, method, outhash, taskhash):
+ d = None
+ cursor.execute(
+ '''
+ SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
+ INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
+ WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
+ ORDER BY outhashes_v2.created ASC
+ LIMIT 1
+ ''',
+ {
+ 'method': method,
+ 'outhash': outhash,
+ }
+ )
+ row = cursor.fetchone()
if row is not None:
- logger.debug('Found equivalent outhash %s -> %s', (row['outhash'], row['unihash']))
d = {k: row[k] for k in row.keys()}
- else:
- d = None
+ elif self.upstream_client is not None:
+ d = await self.upstream_client.get_outhash(method, outhash, taskhash)
+ self.update_unified(cursor, d)
+ self.db.commit()
- self.write_message(d)
+ return d
+
+ def update_unified(self, cursor, data):
+ if data is None:
+ return
+
+ insert_unihash(
+ cursor,
+ {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS},
+ Resolve.IGNORE
+ )
+ insert_outhash(
+ cursor,
+ {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS},
+ Resolve.IGNORE
+ )
async def handle_get_stream(self, request):
self.write_message('ok')
@@ -267,7 +342,12 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
(method, taskhash) = l.split()
#logger.debug('Looking up %s %s' % (method, taskhash))
- row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
+ cursor = self.db.cursor()
+ try:
+ row = self.query_equivalent(cursor, method, taskhash)
+ finally:
+ cursor.close()
+
if row is not None:
msg = ('%s\n' % row['unihash']).encode('utf-8')
#logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
@@ -294,55 +374,82 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
async def handle_report(self, data):
with closing(self.db.cursor()) as cursor:
- cursor.execute(self.OUTHASH_QUERY,
- {k: data[k] for k in ('method', 'outhash', 'taskhash')})
+ outhash_data = {
+ 'method': data['method'],
+ 'outhash': data['outhash'],
+ 'taskhash': data['taskhash'],
+ 'created': datetime.now()
+ }
- row = cursor.fetchone()
+ for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
+ if k in data:
+ outhash_data[k] = data[k]
+
+ # Insert the new entry, unless it already exists
+ (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE)
+
+ if inserted:
+ # If this row is new, check if it is equivalent to another
+ # output hash
+ cursor.execute(
+ '''
+ SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2
+ INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
+ -- Select any matching output hash except the one we just inserted
+ WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
+ -- Pick the oldest hash
+ ORDER BY outhashes_v2.created ASC
+ LIMIT 1
+ ''',
+ {
+ 'method': data['method'],
+ 'outhash': data['outhash'],
+ 'taskhash': data['taskhash'],
+ }
+ )
+ row = cursor.fetchone()
- if row is None and self.upstream_client:
- # Try upstream
- row = await copy_outhash_from_upstream(self.upstream_client,
- self.db,
- data['method'],
- data['outhash'],
- data['taskhash'])
-
- # If no matching outhash was found, or one *was* found but it
- # wasn't an exact match on the taskhash, a new entry for this
- # taskhash should be added
- if row is None or row['taskhash'] != data['taskhash']:
- # If a row matching the outhash was found, the unihash for
- # the new taskhash should be the same as that one.
- # Otherwise the caller provided unihash is used.
- unihash = data['unihash']
if row is not None:
+ # A matching output hash was found. Set our taskhash to the
+ # same unihash since they are equivalent
unihash = row['unihash']
+ resolve = Resolve.REPLACE
+ else:
+ # No matching output hash was found. This is probably the
+ # first outhash to be added.
+ unihash = data['unihash']
+ resolve = Resolve.IGNORE
+
+ # Query upstream to see if it has a unihash we can use
+ if self.upstream_client is not None:
+ upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash'])
+ if upstream_data is not None:
+ unihash = upstream_data['unihash']
+
+
+ insert_unihash(
+ cursor,
+ {
+ 'method': data['method'],
+ 'taskhash': data['taskhash'],
+ 'unihash': unihash,
+ },
+ resolve
+ )
+
+ unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash'])
+ if unihash_data is not None:
+ unihash = unihash_data['unihash']
+ else:
+ unihash = data['unihash']
- insert_data = {
- 'method': data['method'],
- 'outhash': data['outhash'],
- 'taskhash': data['taskhash'],
- 'unihash': unihash,
- 'created': datetime.now()
- }
-
- for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
- if k in data:
- insert_data[k] = data[k]
-
- insert_task(cursor, insert_data)
- self.db.commit()
-
- logger.info('Adding taskhash %s with unihash %s',
- data['taskhash'], unihash)
+ self.db.commit()
- d = {
- 'taskhash': data['taskhash'],
- 'method': data['method'],
- 'unihash': unihash
- }
- else:
- d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
+ d = {
+ 'taskhash': data['taskhash'],
+ 'method': data['method'],
+ 'unihash': unihash,
+ }
self.write_message(d)
@@ -350,23 +457,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
with closing(self.db.cursor()) as cursor:
insert_data = {
'method': data['method'],
- 'outhash': "",
'taskhash': data['taskhash'],
'unihash': data['unihash'],
- 'created': datetime.now()
}
-
- for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
- if k in data:
- insert_data[k] = data[k]
-
- insert_task(cursor, insert_data, ignore=True)
+ insert_unihash(cursor, insert_data, Resolve.IGNORE)
self.db.commit()
# Fetch the unihash that will be reported for the taskhash. If the
# unihash matches, it means this row was inserted (or the mapping
# was already valid)
- row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY)
+ row = self.query_equivalent(cursor, data['method'], data['taskhash'])
if row['unihash'] == data['unihash']:
logger.info('Adding taskhash equivalence for %s with unihash %s',
@@ -399,14 +499,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
await self.backfill_queue.join()
self.write_message(d)
- def query_equivalent(self, method, taskhash, query):
+ def query_equivalent(self, cursor, method, taskhash):
# This is part of the inner loop and must be as fast as possible
- try:
- cursor = self.db.cursor()
- cursor.execute(query, {'method': method, 'taskhash': taskhash})
- return cursor.fetchone()
- except:
- cursor.close()
+ cursor.execute(
+ 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash',
+ {
+ 'method': method,
+ 'taskhash': taskhash,
+ }
+ )
+ return cursor.fetchone()
class Server(bb.asyncrpc.AsyncServer):
@@ -435,7 +537,7 @@ class Server(bb.asyncrpc.AsyncServer):
self.backfill_queue.task_done()
break
method, taskhash = item
- await copy_from_upstream(client, self.db, method, taskhash)
+ await copy_unihash_from_upstream(client, self.db, method, taskhash)
self.backfill_queue.task_done()
finally:
await client.close()