summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/core/threaded.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oeqa/core/threaded.py')
-rw-r--r--meta/lib/oeqa/core/threaded.py275
1 files changed, 0 insertions, 275 deletions
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
deleted file mode 100644
index 2cafe03a21..0000000000
--- a/meta/lib/oeqa/core/threaded.py
+++ /dev/null
@@ -1,275 +0,0 @@
-# Copyright (C) 2017 Intel Corporation
-# Released under the MIT license (see COPYING.MIT)
-
-import threading
-import multiprocessing
-import queue
-import time
-
-from unittest.suite import TestSuite
-
-from oeqa.core.loader import OETestLoader
-from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner
-from oeqa.core.context import OETestContext
-
-class OETestLoaderThreaded(OETestLoader):
- def __init__(self, tc, module_paths, modules, tests, modules_required,
- filters, process_num=0, *args, **kwargs):
- super(OETestLoaderThreaded, self).__init__(tc, module_paths, modules,
- tests, modules_required, filters, *args, **kwargs)
-
- self.process_num = process_num
-
- def discover(self):
- suite = super(OETestLoaderThreaded, self).discover()
-
- if self.process_num <= 0:
- self.process_num = min(multiprocessing.cpu_count(),
- len(suite._tests))
-
- suites = []
- for _ in range(self.process_num):
- suites.append(self.suiteClass())
-
- def _search_for_module_idx(suites, case):
- """
- Cases in the same module needs to be run
- in the same thread because PyUnit keeps track
- of setUp{Module, Class,} and tearDown{Module, Class,}.
- """
-
- for idx in range(self.process_num):
- suite = suites[idx]
- for c in suite._tests:
- if case.__module__ == c.__module__:
- return idx
-
- return -1
-
- def _search_for_depend_idx(suites, depends):
- """
- Dependency cases needs to be run in the same
- thread, because OEQA framework look at the state
- of dependant test to figure out if skip or not.
- """
-
- for idx in range(self.process_num):
- suite = suites[idx]
-
- for case in suite._tests:
- if case.id() in depends:
- return idx
- return -1
-
- def _get_best_idx(suites):
- sizes = [len(suite._tests) for suite in suites]
- return sizes.index(min(sizes))
-
- def _fill_suites(suite):
- idx = -1
- for case in suite:
- if isinstance(case, TestSuite):
- _fill_suites(case)
- else:
- idx = _search_for_module_idx(suites, case)
-
- depends = {}
- if 'depends' in self.tc._registry:
- depends = self.tc._registry['depends']
-
- if idx == -1 and case.id() in depends:
- case_depends = depends[case.id()]
- idx = _search_for_depend_idx(suites, case_depends)
-
- if idx == -1:
- idx = _get_best_idx(suites)
-
- suites[idx].addTest(case)
- _fill_suites(suite)
-
- suites_tmp = suites
- suites = []
- for suite in suites_tmp:
- if len(suite._tests) > 0:
- suites.append(suite)
-
- return suites
-
-class OEStreamLoggerThreaded(OEStreamLogger):
- _lock = threading.Lock()
- buffers = {}
-
- def write(self, msg):
- tid = threading.get_ident()
-
- if not tid in self.buffers:
- self.buffers[tid] = ""
-
- if msg:
- self.buffers[tid] += msg
-
- def finish(self):
- tid = threading.get_ident()
-
- self._lock.acquire()
- self.logger.info('THREAD: %d' % tid)
- self.logger.info('-' * 70)
- for line in self.buffers[tid].split('\n'):
- self.logger.info(line)
- self._lock.release()
-
-class OETestResultThreadedInternal(OETestResult):
- def _tc_map_results(self):
- tid = threading.get_ident()
-
- # PyUnit generates a result for every test module run, test
- # if the thread already has an entry to avoid lose the previous
- # test module results.
- if not tid in self.tc._results:
- self.tc._results[tid] = {}
- self.tc._results[tid]['failures'] = self.failures
- self.tc._results[tid]['errors'] = self.errors
- self.tc._results[tid]['skipped'] = self.skipped
- self.tc._results[tid]['expectedFailures'] = self.expectedFailures
-
-class OETestResultThreaded(object):
- _results = {}
- _lock = threading.Lock()
-
- def __init__(self, tc):
- self.tc = tc
-
- def _fill_tc_results(self):
- tids = list(self.tc._results.keys())
- fields = ['failures', 'errors', 'skipped', 'expectedFailures']
-
- for tid in tids:
- result = self.tc._results[tid]
- for field in fields:
- if not field in self.tc._results:
- self.tc._results[field] = []
- self.tc._results[field].extend(result[field])
-
- def addResult(self, result, run_start_time, run_end_time):
- tid = threading.get_ident()
-
- self._lock.acquire()
- self._results[tid] = {}
- self._results[tid]['result'] = result
- self._results[tid]['run_start_time'] = run_start_time
- self._results[tid]['run_end_time'] = run_end_time
- self._results[tid]['result'] = result
- self._lock.release()
-
- def wasSuccessful(self):
- wasSuccessful = True
- for tid in self._results.keys():
- wasSuccessful = wasSuccessful and \
- self._results[tid]['result'].wasSuccessful()
- return wasSuccessful
-
- def stop(self):
- for tid in self._results.keys():
- self._results[tid]['result'].stop()
-
- def logSummary(self, component, context_msg=''):
- elapsed_time = (self.tc._run_end_time - self.tc._run_start_time)
-
- self.tc.logger.info("SUMMARY:")
- self.tc.logger.info("%s (%s) - Ran %d tests in %.3fs" % (component,
- context_msg, len(self.tc._registry['cases']), elapsed_time))
- if self.wasSuccessful():
- msg = "%s - OK - All required tests passed" % component
- else:
- msg = "%s - FAIL - Required tests failed" % component
- self.tc.logger.info(msg)
-
- def logDetails(self):
- if list(self._results):
- tid = list(self._results)[0]
- result = self._results[tid]['result']
- result.logDetails()
-
-class _Worker(threading.Thread):
- """Thread executing tasks from a given tasks queue"""
- def __init__(self, tasks, result, stream):
- threading.Thread.__init__(self)
- self.tasks = tasks
-
- self.result = result
- self.stream = stream
-
- def run(self):
- while True:
- try:
- func, args, kargs = self.tasks.get(block=False)
- except queue.Empty:
- break
-
- try:
- run_start_time = time.time()
- rc = func(*args, **kargs)
- run_end_time = time.time()
- self.result.addResult(rc, run_start_time, run_end_time)
- self.stream.finish()
- except Exception as e:
- print(e)
- finally:
- self.tasks.task_done()
-
-class _ThreadedPool:
- """Pool of threads consuming tasks from a queue"""
- def __init__(self, num_workers, num_tasks, stream=None, result=None):
- self.tasks = queue.Queue(num_tasks)
- self.workers = []
-
- for _ in range(num_workers):
- worker = _Worker(self.tasks, result, stream)
- self.workers.append(worker)
-
- def start(self):
- for worker in self.workers:
- worker.start()
-
- def add_task(self, func, *args, **kargs):
- """Add a task to the queue"""
- self.tasks.put((func, args, kargs))
-
- def wait_completion(self):
- """Wait for completion of all the tasks in the queue"""
- self.tasks.join()
- for worker in self.workers:
- worker.join()
-
-class OETestRunnerThreaded(OETestRunner):
- streamLoggerClass = OEStreamLoggerThreaded
-
- def __init__(self, tc, *args, **kwargs):
- super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
- self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
-
- def run(self, suites):
- result = OETestResultThreaded(self.tc)
-
- pool = _ThreadedPool(len(suites), len(suites), stream=self.stream,
- result=result)
- for s in suites:
- pool.add_task(super(OETestRunnerThreaded, self).run, s)
- pool.start()
- pool.wait_completion()
- result._fill_tc_results()
-
- return result
-
-class OETestContextThreaded(OETestContext):
- loaderClass = OETestLoaderThreaded
- runnerClass = OETestRunnerThreaded
-
- def loadTests(self, module_paths, modules=[], tests=[],
- modules_manifest="", modules_required=[], filters={}, process_num=0):
- if modules_manifest:
- modules = self._read_modules_from_manifest(modules_manifest)
-
- self.loader = self.loaderClass(self, module_paths, modules, tests,
- modules_required, filters, process_num)
- self.suites = self.loader.discover()