aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2019-07-24 14:14:50 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2019-08-06 11:21:07 +0100
commita03d60671a53d9ff70e07cc42fe35f6f8776dac2 (patch)
treecc15d86d45b9f9b3bc02554a98866fe3bd01db18
parentee3fc6030e653f3244b065fc89aafd2a7c36ae04 (diff)
downloadbitbake-a03d60671a53d9ff70e07cc42fe35f6f8776dac2.tar.gz
hashserv: Use separate threads for answering requests and handling them
Experience with the prserv shows that having two threads, one accepting and queueing connections and one handling the requests leads to much more reliable behaviour than having everything in a single thread. Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--lib/hashserv/__init__.py41
1 files changed, 40 insertions, 1 deletions
diff --git a/lib/hashserv/__init__.py b/lib/hashserv/__init__.py
index 544bc86b1..86aa7e9de 100644
--- a/lib/hashserv/__init__.py
+++ b/lib/hashserv/__init__.py
@@ -10,6 +10,9 @@ import sqlite3
import json
import traceback
import logging
+import socketserver
+import queue
+import threading
from datetime import datetime
logger = logging.getLogger('hashserv')
@@ -135,6 +138,41 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
self.send_error(400, explain=traceback.format_exc())
return
+class ThreadedHTTPServer(HTTPServer):
+ quit = False
+
+ def serve_forever(self):
+ self.requestqueue = queue.Queue()
+ self.handlerthread = threading.Thread(target=self.process_request_thread)
+ self.handlerthread.daemon = False
+
+ self.handlerthread.start()
+ super().serve_forever()
+
+ def process_request_thread(self):
+ while not self.quit:
+ try:
+ (request, client_address) = self.requestqueue.get(True)
+ except queue.Empty:
+ continue
+ if request is None:
+ continue
+ try:
+ self.finish_request(request, client_address)
+ except Exception:
+ self.handle_error(request, client_address)
+ finally:
+ self.shutdown_request(request)
+
+ def process_request(self, request, client_address):
+ self.requestqueue.put((request, client_address))
+
+ def server_close(self):
+ super().server_close()
+ self.quit = True
+ self.requestqueue.put((None, None))
+ self.handlerthread.join()
+
def create_server(addr, dbname, prefix=''):
class Handler(HashEquivalenceServer):
pass
@@ -171,4 +209,5 @@ def create_server(addr, dbname, prefix=''):
cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)')
logger.info('Starting server on %s', addr)
- return HTTPServer(addr, Handler)
+
+ return ThreadedHTTPServer(addr, Handler)