#! /usr/bin/env python3 # # Copyright (C) 2023 Garmin Ltd. # # SPDX-License-Identifier: GPL-2.0-only # import sqlite3 import logging from contextlib import closing from . import User logger = logging.getLogger("hashserv.sqlite") UNIHASH_TABLE_DEFINITION = ( ("method", "TEXT NOT NULL", "UNIQUE"), ("taskhash", "TEXT NOT NULL", "UNIQUE"), ("unihash", "TEXT NOT NULL", ""), ("gc_mark", "TEXT NOT NULL", ""), ) UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) OUTHASH_TABLE_DEFINITION = ( ("method", "TEXT NOT NULL", "UNIQUE"), ("taskhash", "TEXT NOT NULL", "UNIQUE"), ("outhash", "TEXT NOT NULL", "UNIQUE"), ("created", "DATETIME", ""), # Optional fields ("owner", "TEXT", ""), ("PN", "TEXT", ""), ("PV", "TEXT", ""), ("PR", "TEXT", ""), ("task", "TEXT", ""), ("outhash_siginfo", "TEXT", ""), ) OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) USERS_TABLE_DEFINITION = ( ("username", "TEXT NOT NULL", "UNIQUE"), ("token", "TEXT NOT NULL", ""), ("permissions", "TEXT NOT NULL", ""), ) USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) CONFIG_TABLE_DEFINITION = ( ("name", "TEXT NOT NULL", "UNIQUE"), ("value", "TEXT", ""), ) CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) def _make_table(cursor, name, definition): cursor.execute( """ CREATE TABLE IF NOT EXISTS {name} ( id INTEGER PRIMARY KEY AUTOINCREMENT, {fields} UNIQUE({unique}) ) """.format( name=name, fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), unique=", ".join( name for name, _, flags in definition if "UNIQUE" in flags ), ) ) def map_user(row): if row is None: return None return User( username=row["username"], permissions=set(row["permissions"].split()), ) def _make_condition_statement(columns, condition): where = {} for c in columns: if c in condition and condition[c] is not None: where[c] = condition[c] return where, " AND ".join("%s=:%s" % (k, k) for k in where.keys()) def _get_sqlite_version(cursor): cursor.execute("SELECT sqlite_version()") version = [] for v in cursor.fetchone()[0].split("."): try: version.append(int(v)) except ValueError: version.append(v) return tuple(version) def _schema_table_name(version): if version >= (3, 33): return "sqlite_schema" return "sqlite_master" class DatabaseEngine(object): def __init__(self, dbname, sync): self.dbname = dbname self.logger = logger self.sync = sync async def create(self): db = sqlite3.connect(self.dbname) db.row_factory = sqlite3.Row with closing(db.cursor()) as cursor: _make_table(cursor, "unihashes_v3", UNIHASH_TABLE_DEFINITION) _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) _make_table(cursor, "users", USERS_TABLE_DEFINITION) _make_table(cursor, "config", CONFIG_TABLE_DEFINITION) cursor.execute("PRAGMA journal_mode = WAL") cursor.execute( "PRAGMA synchronous = %s" % ("NORMAL" if self.sync else "OFF") ) # Drop old indexes cursor.execute("DROP INDEX IF EXISTS taskhash_lookup") cursor.execute("DROP INDEX IF EXISTS outhash_lookup") cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v3") # TODO: Upgrade from tasks_v2? cursor.execute("DROP TABLE IF EXISTS tasks_v2") # Create new indexes cursor.execute( "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" ) cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") sqlite_version = _get_sqlite_version(cursor) cursor.execute( f""" SELECT name FROM {_schema_table_name(sqlite_version)} WHERE type = 'table' AND name = 'unihashes_v2' """ ) if cursor.fetchone(): self.logger.info("Upgrading Unihashes V2 -> V3...") cursor.execute( """ INSERT INTO unihashes_v3 (id, method, unihash, taskhash, gc_mark) SELECT id, method, unihash, taskhash, '' FROM unihashes_v2 """ ) cursor.execute("DROP TABLE unihashes_v2") db.commit() self.logger.info("Upgrade complete") def connect(self, logger): return Database(logger, self.dbname, self.sync) class Database(object): def __init__(self, logger, dbname, sync): self.dbname = dbname self.logger = logger self.db = sqlite3.connect(self.dbname) self.db.row_factory = sqlite3.Row with closing(self.db.cursor()) as cursor: cursor.execute("PRAGMA journal_mode = WAL") cursor.execute( "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") ) self.sqlite_version = _get_sqlite_version(cursor) async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): await self.close() async def _set_config(self, cursor, name, value): cursor.execute( """ INSERT OR REPLACE INTO config (id, name, value) VALUES ((SELECT id FROM config WHERE name=:name), :name, :value) """, { "name": name, "value": value, }, ) async def _get_config(self, cursor, name): cursor.execute( "SELECT value FROM config WHERE name=:name", { "name": name, }, ) row = cursor.fetchone() if row is None: return None return row["value"] async def close(self): self.db.close() async def get_unihash_by_taskhash_full(self, method, taskhash): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash ORDER BY outhashes_v2.created ASC LIMIT 1 """, { "method": method, "taskhash": taskhash, }, ) return cursor.fetchone() async def get_unihash_by_outhash(self, method, outhash): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash ORDER BY outhashes_v2.created ASC LIMIT 1 """, { "method": method, "outhash": outhash, }, ) return cursor.fetchone() async def unihash_exists(self, unihash): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT * FROM unihashes_v3 WHERE unihash=:unihash LIMIT 1 """, { "unihash": unihash, }, ) return cursor.fetchone() is not None async def get_outhash(self, method, outhash): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT * FROM outhashes_v2 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash ORDER BY outhashes_v2.created ASC LIMIT 1 """, { "method": method, "outhash": outhash, }, ) return cursor.fetchone() async def get_equivalent_for_outhash(self, method, outhash, taskhash): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT outhashes_v2.taskhash AS taskhash, unihashes_v3.unihash AS unihash FROM outhashes_v2 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash -- Select any matching output hash except the one we just inserted WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash -- Pick the oldest hash ORDER BY outhashes_v2.created ASC LIMIT 1 """, { "method": method, "outhash": outhash, "taskhash": taskhash, }, ) return cursor.fetchone() async def get_equivalent(self, method, taskhash): with closing(self.db.cursor()) as cursor: cursor.execute( "SELECT taskhash, method, unihash FROM unihashes_v3 WHERE method=:method AND taskhash=:taskhash", { "method": method, "taskhash": taskhash, }, ) return cursor.fetchone() async def remove(self, condition): def do_remove(columns, table_name, cursor): where, clause = _make_condition_statement(columns, condition) if where: query = f"DELETE FROM {table_name} WHERE {clause}" cursor.execute(query, where) return cursor.rowcount return 0 count = 0 with closing(self.db.cursor()) as cursor: count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v3", cursor) self.db.commit() return count async def get_current_gc_mark(self): with closing(self.db.cursor()) as cursor: return await self._get_config(cursor, "gc-mark") async def gc_status(self): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT COUNT() FROM unihashes_v3 WHERE gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') """ ) keep_rows = cursor.fetchone()[0] cursor.execute( """ SELECT COUNT() FROM unihashes_v3 WHERE gc_mark!=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') """ ) remove_rows = cursor.fetchone()[0] current_mark = await self._get_config(cursor, "gc-mark") return (keep_rows, remove_rows, current_mark) async def gc_mark(self, mark, condition): with closing(self.db.cursor()) as cursor: await self._set_config(cursor, "gc-mark", mark) where, clause = _make_condition_statement(UNIHASH_TABLE_COLUMNS, condition) new_rows = 0 if where: cursor.execute( f""" UPDATE unihashes_v3 SET gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') WHERE {clause} """, where, ) new_rows = cursor.rowcount self.db.commit() return new_rows async def gc_sweep(self): with closing(self.db.cursor()) as cursor: # NOTE: COALESCE is not used in this query so that if the current # mark is NULL, nothing will happen cursor.execute( """ DELETE FROM unihashes_v3 WHERE gc_mark!=(SELECT value FROM config WHERE name='gc-mark') """ ) count = cursor.rowcount await self._set_config(cursor, "gc-mark", None) self.db.commit() return count async def clean_unused(self, oldest): with closing(self.db.cursor()) as cursor: cursor.execute( """ DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( SELECT unihashes_v3.id FROM unihashes_v3 WHERE unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash LIMIT 1 ) """, { "oldest": oldest, }, ) self.db.commit() return cursor.rowcount async def insert_unihash(self, method, taskhash, unihash): with closing(self.db.cursor()) as cursor: prevrowid = cursor.lastrowid cursor.execute( """ INSERT OR IGNORE INTO unihashes_v3 (method, taskhash, unihash, gc_mark) VALUES ( :method, :taskhash, :unihash, COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') ) """, { "method": method, "taskhash": taskhash, "unihash": unihash, }, ) self.db.commit() return cursor.lastrowid != prevrowid async def insert_outhash(self, data): data = {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS} keys = sorted(data.keys()) query = "INSERT OR IGNORE INTO outhashes_v2 ({fields}) VALUES({values})".format( fields=", ".join(keys), values=", ".join(":" + k for k in keys), ) with closing(self.db.cursor()) as cursor: prevrowid = cursor.lastrowid cursor.execute(query, data) self.db.commit() return cursor.lastrowid != prevrowid def _get_user(self, username): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT username, permissions, token FROM users WHERE username=:username """, { "username": username, }, ) return cursor.fetchone() async def lookup_user_token(self, username): row = self._get_user(username) if row is None: return None, None return map_user(row), row["token"] async def lookup_user(self, username): return map_user(self._get_user(username)) async def set_user_token(self, username, token): with closing(self.db.cursor()) as cursor: cursor.execute( """ UPDATE users SET token=:token WHERE username=:username """, { "username": username, "token": token, }, ) self.db.commit() return cursor.rowcount != 0 async def set_user_perms(self, username, permissions): with closing(self.db.cursor()) as cursor: cursor.execute( """ UPDATE users SET permissions=:permissions WHERE username=:username """, { "username": username, "permissions": " ".join(permissions), }, ) self.db.commit() return cursor.rowcount != 0 async def get_all_users(self): with closing(self.db.cursor()) as cursor: cursor.execute("SELECT username, permissions FROM users") return [map_user(r) for r in cursor.fetchall()] async def new_user(self, username, permissions, token): with closing(self.db.cursor()) as cursor: try: cursor.execute( """ INSERT INTO users (username, token, permissions) VALUES (:username, :token, :permissions) """, { "username": username, "token": token, "permissions": " ".join(permissions), }, ) self.db.commit() return True except sqlite3.IntegrityError: return False async def delete_user(self, username): with closing(self.db.cursor()) as cursor: cursor.execute( """ DELETE FROM users WHERE username=:username """, { "username": username, }, ) self.db.commit() return cursor.rowcount != 0 async def get_usage(self): usage = {} with closing(self.db.cursor()) as cursor: cursor.execute( f""" SELECT name FROM {_schema_table_name(self.sqlite_version)} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' """ ) for row in cursor.fetchall(): cursor.execute( """ SELECT COUNT() FROM %s """ % row["name"], ) usage[row["name"]] = { "rows": cursor.fetchone()[0], } return usage async def get_query_columns(self): columns = set() for name, typ, _ in UNIHASH_TABLE_DEFINITION + OUTHASH_TABLE_DEFINITION: if typ.startswith("TEXT"): columns.add(name) return list(columns)