aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAníbal Limón <anibal.limon@linux.intel.com>2017-05-26 15:37:37 -0500
committerRichard Purdie <richard.purdie@linuxfoundation.org>2017-05-30 10:15:22 +0100
commit48b7a407d692e6c49c41b16f2bd11e8c3f47a421 (patch)
tree6ca7b601652a9fda51693f8ceddbec10a03bbc1d
parent8e71844fc4dd3fcc8a19f9d4c25aafb09c5525fe (diff)
downloadopenembedded-core-contrib-48b7a407d692e6c49c41b16f2bd11e8c3f47a421.tar.gz
oeqa/core/threaded: Add support of OETestRunnerThreaded
The OETestRunnerThreaded overrides the run method of OETestRunner it recieves a list of suites to be executed by a ThreadPool. The new run method handles the ThreadPool creation and the OETestResultThreaded fill. [YOCTO #11450] Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--meta/lib/oeqa/core/threaded.py75
1 files changed, 74 insertions, 1 deletions
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
index f216685f46..81df340366 100644
--- a/meta/lib/oeqa/core/threaded.py
+++ b/meta/lib/oeqa/core/threaded.py
@@ -3,11 +3,13 @@
import threading
import multiprocessing
+import queue
+import time
from unittest.suite import TestSuite
from oeqa.core.loader import OETestLoader
-from oeqa.core.runner import OEStreamLogger, OETestResult
+from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner
class OETestLoaderThreaded(OETestLoader):
def __init__(self, tc, module_paths, modules, tests, modules_required,
@@ -185,3 +187,74 @@ class OETestResultThreaded(object):
tid = list(self._results)[0]
result = self._results[tid]['result']
result.logDetails()
+
+class _Worker(threading.Thread):
+ """Thread executing tasks from a given tasks queue"""
+ def __init__(self, tasks, result, stream):
+ threading.Thread.__init__(self)
+ self.tasks = tasks
+
+ self.result = result
+ self.stream = stream
+
+ def run(self):
+ while True:
+ try:
+ func, args, kargs = self.tasks.get(block=False)
+ except queue.Empty:
+ break
+
+ try:
+ run_start_time = time.time()
+ rc = func(*args, **kargs)
+ run_end_time = time.time()
+ self.result.addResult(rc, run_start_time, run_end_time)
+ self.stream.finish()
+ except Exception as e:
+ print(e)
+ finally:
+ self.tasks.task_done()
+
+class _ThreadedPool:
+ """Pool of threads consuming tasks from a queue"""
+ def __init__(self, num_workers, num_tasks, stream=None, result=None):
+ self.tasks = queue.Queue(num_tasks)
+ self.workers = []
+
+ for _ in range(num_workers):
+ worker = _Worker(self.tasks, result, stream)
+ self.workers.append(worker)
+
+ def start(self):
+ for worker in self.workers:
+ worker.start()
+
+ def add_task(self, func, *args, **kargs):
+ """Add a task to the queue"""
+ self.tasks.put((func, args, kargs))
+
+ def wait_completion(self):
+ """Wait for completion of all the tasks in the queue"""
+ self.tasks.join()
+ for worker in self.workers:
+ worker.join()
+
+class OETestRunnerThreaded(OETestRunner):
+ streamLoggerClass = OEStreamLoggerThreaded
+
+ def __init__(self, tc, *args, **kwargs):
+ super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
+ self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
+
+ def run(self, suites):
+ result = OETestResultThreaded(self.tc)
+
+ pool = _ThreadedPool(len(suites), len(suites), stream=self.stream,
+ result=result)
+ for s in suites:
+ pool.add_task(super(OETestRunnerThreaded, self).run, s)
+ pool.start()
+ pool.wait_completion()
+ result._fill_tc_results()
+
+ return result