aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPaul Barker <pbarker@konsulko.com>2021-04-29 15:11:12 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2021-05-06 11:07:12 +0100
commit6b09415bed6b5e7c12aaf39b677d9ef72844e233 (patch)
tree69efb525eea8853de79151208c25308a0c4620e8
parent39c7c158c52157b18f5ccbbd673e3298e6402f52 (diff)
downloadbitbake-contrib-6b09415bed6b5e7c12aaf39b677d9ef72844e233.tar.gz
prserv: Handle requests in main thread
The prserver process is cleanly separated from the main bitbake process so requests can be handled in the main thread. This removes the need for a request queue and a separate request handling thread. Signed-off-by: Paul Barker <pbarker@konsulko.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--lib/prserv/serv.py159
1 files changed, 36 insertions, 123 deletions
diff --git a/lib/prserv/serv.py b/lib/prserv/serv.py
index 24a8c4bac..5e322bf83 100644
--- a/lib/prserv/serv.py
+++ b/lib/prserv/serv.py
@@ -5,8 +5,6 @@
import os,sys,logging
import signal, time
from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
-import threading
-import queue
import socket
import io
import sqlite3
@@ -14,7 +12,6 @@ import bb.server.xmlrpcclient
import prserv
import prserv.db
import errno
-import select
import multiprocessing
logger = logging.getLogger("BitBake.PRserv")
@@ -48,54 +45,17 @@ class PRServer(SimpleXMLRPCServer):
self.dbfile=dbfile
self.logfile=logfile
- self.working_thread=None
self.host, self.port = self.socket.getsockname()
- self.pidfile=PIDPREFIX % (self.host, self.port)
self.register_function(self.getPR, "getPR")
- self.register_function(self.quit, "quit")
self.register_function(self.ping, "ping")
self.register_function(self.export, "export")
self.register_function(self.importone, "importone")
self.register_introspection_functions()
- self.quitpipein, self.quitpipeout = os.pipe()
-
- self.requestqueue = queue.Queue()
- self.handlerthread = threading.Thread(target = self.process_request_thread)
- self.handlerthread.daemon = False
-
- def process_request_thread(self):
- """Same as in BaseServer but as a thread.
-
- In addition, exception handling is done here.
-
- """
- iter_count = 1
+ self.iter_count = 0
# 60 iterations between syncs or sync if dirty every ~30 seconds
- iterations_between_sync = 60
-
- bb.utils.set_process_name("PRServ Handler")
-
- while not self.quitflag:
- try:
- (request, client_address) = self.requestqueue.get(True, 30)
- except queue.Empty:
- self.table.sync_if_dirty()
- continue
- if request is None:
- continue
- try:
- self.finish_request(request, client_address)
- self.shutdown_request(request)
- iter_count = (iter_count + 1) % iterations_between_sync
- if iter_count == 0:
- self.table.sync_if_dirty()
- except:
- self.handle_error(request, client_address)
- self.shutdown_request(request)
- self.table.sync()
- self.table.sync_if_dirty()
+ self.iterations_between_sync = 60
def sigint_handler(self, signum, stack):
if self.table:
@@ -104,11 +64,30 @@ class PRServer(SimpleXMLRPCServer):
def sigterm_handler(self, signum, stack):
if self.table:
self.table.sync()
- self.quit()
- self.requestqueue.put((None, None))
+ raise(SystemExit)
def process_request(self, request, client_address):
- self.requestqueue.put((request, client_address))
+ if request is None:
+ return
+ try:
+ self.finish_request(request, client_address)
+ self.shutdown_request(request)
+ self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
+ if self.iter_count == 0:
+ self.table.sync_if_dirty()
+ except:
+ self.handle_error(request, client_address)
+ self.shutdown_request(request)
+ self.table.sync()
+ self.table.sync_if_dirty()
+
+ def serve_forever(self, poll_interval=0.5):
+ signal.signal(signal.SIGINT, self.sigint_handler)
+ signal.signal(signal.SIGTERM, self.sigterm_handler)
+
+ self.db = prserv.db.PRData(self.dbfile)
+ self.table = self.db["PRMAIN"]
+ return super().serve_forever(poll_interval)
def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
try:
@@ -121,7 +100,7 @@ class PRServer(SimpleXMLRPCServer):
return self.table.importone(version, pkgarch, checksum, value)
def ping(self):
- return not self.quitflag
+ return True
def getinfo(self):
return (self.host, self.port)
@@ -136,45 +115,6 @@ class PRServer(SimpleXMLRPCServer):
logger.error(str(exc))
return None
- def quit(self):
- self.quitflag=True
- os.write(self.quitpipeout, b"q")
- os.close(self.quitpipeout)
- return
-
- def work_forever(self,):
- self.quitflag = False
- # This timeout applies to the poll in TCPServer, we need the select
- # below to wake on our quit pipe closing. We only ever call into handle_request
- # if there is data there.
- self.timeout = 0.01
-
- signal.signal(signal.SIGINT, self.sigint_handler)
- signal.signal(signal.SIGTERM, self.sigterm_handler)
-
- bb.utils.set_process_name("PRServ")
-
- # DB connection must be created after all forks
- self.db = prserv.db.PRData(self.dbfile)
- self.table = self.db["PRMAIN"]
-
- logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
- (self.dbfile, self.host, self.port, str(os.getpid())))
-
- self.handlerthread.start()
- while not self.quitflag:
- ready = select.select([self.fileno(), self.quitpipein], [], [], 30)
- if self.quitflag:
- break
- if self.fileno() in ready[0]:
- self.handle_request()
- self.handlerthread.join()
- self.db.disconnect()
- logger.info("PRServer: stopping...")
- self.server_close()
- os.close(self.quitpipein)
- return
-
class PRServSingleton(object):
def __init__(self, dbfile, logfile, interface):
self.dbfile = dbfile
@@ -185,7 +125,7 @@ class PRServSingleton(object):
def start(self):
self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
- self.process = multiprocessing.Process(target=self.prserv.work_forever)
+ self.process = multiprocessing.Process(target=self.prserv.serve_forever)
self.process.start()
self.host, self.port = self.prserv.getinfo()
@@ -201,13 +141,6 @@ class PRServerConnection(object):
self.port = port
self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
- def terminate(self):
- try:
- logger.info("Terminating PRServer...")
- self.connection.quit()
- except Exception as exc:
- sys.stderr.write("%s\n" % str(exc))
-
def getPR(self, version, pkgarch, checksum):
return self.connection.getPR(version, pkgarch, checksum)
@@ -308,7 +241,7 @@ def start_daemon(dbfile, host, port, logfile):
return 1
server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
- run_as_daemon(server.work_forever, pidfile, os.path.abspath(logfile))
+ run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
# Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
# the one the server actually is listening, so at least warn the user about it
@@ -345,25 +278,13 @@ def stop_daemon(host, port):
return 1
try:
- PRServerConnection(ip, port).terminate()
- except:
- logger.critical("Stop PRService %s:%d failed" % (host,port))
+ if is_running(pid):
+ print("Sending SIGTERM to pr-server.")
+ os.kill(pid, signal.SIGTERM)
+ time.sleep(0.1)
- try:
- if pid:
- wait_timeout = 0
- print("Waiting for pr-server to exit.")
- while is_running(pid) and wait_timeout < 50:
- time.sleep(0.1)
- wait_timeout += 1
-
- if is_running(pid):
- print("Sending SIGTERM to pr-server.")
- os.kill(pid,signal.SIGTERM)
- time.sleep(0.1)
-
- if os.path.exists(pidfile):
- os.remove(pidfile)
+ if os.path.exists(pidfile):
+ os.remove(pidfile)
except OSError as e:
err = str(e)
@@ -439,17 +360,9 @@ def auto_start(d):
def auto_shutdown():
global singleton
- if singleton:
- host, port = singleton.getinfo()
- try:
- PRServerConnection(host, port).terminate()
- except:
- logger.critical("Stop PRService %s:%d failed" % (host,port))
-
- try:
- os.waitpid(singleton.prserv.pid, 0)
- except ChildProcessError:
- pass
+ if singleton and singleton.process:
+ singleton.process.terminate()
+ singleton.process.join()
singleton = None
def ping(host, port):