diff options
Diffstat (limited to 'meta/lib/oeqa/core/runner.py')
-rw-r--r-- | meta/lib/oeqa/core/runner.py | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py new file mode 100644 index 0000000000..f656e1a9c5 --- /dev/null +++ b/meta/lib/oeqa/core/runner.py @@ -0,0 +1,328 @@ +# +# Copyright (C) 2016 Intel Corporation +# +# SPDX-License-Identifier: MIT +# + +import os +import time +import unittest +import logging +import re +import json +import sys + +from unittest import TextTestResult as _TestResult +from unittest import TextTestRunner as _TestRunner + +class OEStreamLogger(object): + def __init__(self, logger): + self.logger = logger + self.buffer = "" + + def write(self, msg): + if len(msg) > 1 and msg[0] != '\n': + if '...' in msg: + self.buffer += msg + elif self.buffer: + self.buffer += msg + self.logger.log(logging.INFO, self.buffer) + self.buffer = "" + else: + self.logger.log(logging.INFO, msg) + + def flush(self): + for handler in self.logger.handlers: + handler.flush() + +class OETestResult(_TestResult): + def __init__(self, tc, *args, **kwargs): + super(OETestResult, self).__init__(*args, **kwargs) + + self.successes = [] + self.starttime = {} + self.endtime = {} + self.progressinfo = {} + self.extraresults = {} + + # Inject into tc so that TestDepends decorator can see results + tc.results = self + + self.tc = tc + + # stdout and stderr for each test case + self.logged_output = {} + + def startTest(self, test): + # May have been set by concurrencytest + if test.id() not in self.starttime: + self.starttime[test.id()] = time.time() + super(OETestResult, self).startTest(test) + + def stopTest(self, test): + self.endtime[test.id()] = time.time() + if self.buffer: + self.logged_output[test.id()] = ( + sys.stdout.getvalue(), sys.stderr.getvalue()) + super(OETestResult, self).stopTest(test) + if test.id() in self.progressinfo: + self.tc.logger.info(self.progressinfo[test.id()]) + + # Print the errors/failures early to aid/speed debugging, its a pain + # to wait until selftest finishes to see them. + for t in ['failures', 'errors', 'skipped', 'expectedFailures']: + for (scase, msg) in getattr(self, t): + if test.id() == scase.id(): + self.tc.logger.info(str(msg)) + break + + def logSummary(self, component, context_msg=''): + elapsed_time = self.tc._run_end_time - self.tc._run_start_time + self.tc.logger.info("SUMMARY:") + self.tc.logger.info("%s (%s) - Ran %d test%s in %.3fs" % (component, + context_msg, self.testsRun, self.testsRun != 1 and "s" or "", + elapsed_time)) + + if self.wasSuccessful(): + msg = "%s - OK - All required tests passed" % component + else: + msg = "%s - FAIL - Required tests failed" % component + msg += " (successes=%d, skipped=%d, failures=%d, errors=%d)" % (len(self.successes), len(self.skipped), len(self.failures), len(self.errors)) + self.tc.logger.info(msg) + + def _getTestResultDetails(self, case): + result_types = {'failures': 'FAILED', 'errors': 'ERROR', 'skipped': 'SKIPPED', + 'expectedFailures': 'EXPECTEDFAIL', 'successes': 'PASSED', + 'unexpectedSuccesses' : 'PASSED'} + + for rtype in result_types: + found = False + for resultclass in getattr(self, rtype): + # unexpectedSuccesses are just lists, not lists of tuples + if isinstance(resultclass, tuple): + scase, msg = resultclass + else: + scase, msg = resultclass, None + if case.id() == scase.id(): + found = True + break + scase_str = str(scase.id()) + + # When fails at module or class level the class name is passed as string + # so figure out to see if match + m = re.search(r"^setUpModule \((?P<module_name>.*)\).*$", scase_str) + if m: + if case.__class__.__module__ == m.group('module_name'): + found = True + break + + m = re.search(r"^setUpClass \((?P<class_name>.*)\).*$", scase_str) + if m: + class_name = "%s.%s" % (case.__class__.__module__, + case.__class__.__name__) + + if class_name == m.group('class_name'): + found = True + break + + if found: + return result_types[rtype], msg + + return 'UNKNOWN', None + + def extractExtraResults(self, test, details = None): + extraresults = None + if details is not None and "extraresults" in details: + extraresults = details.get("extraresults", {}) + elif hasattr(test, "extraresults"): + extraresults = test.extraresults + + if extraresults is not None: + for k, v in extraresults.items(): + # handle updating already existing entries (e.g. ptestresults.sections) + if k in self.extraresults: + self.extraresults[k].update(v) + else: + self.extraresults[k] = v + + def addError(self, test, *args, details = None): + self.extractExtraResults(test, details = details) + return super(OETestResult, self).addError(test, *args) + + def addFailure(self, test, *args, details = None): + self.extractExtraResults(test, details = details) + return super(OETestResult, self).addFailure(test, *args) + + def addSuccess(self, test, details = None): + #Added so we can keep track of successes too + self.successes.append((test, None)) + self.extractExtraResults(test, details = details) + return super(OETestResult, self).addSuccess(test) + + def addExpectedFailure(self, test, *args, details = None): + self.extractExtraResults(test, details = details) + return super(OETestResult, self).addExpectedFailure(test, *args) + + def addUnexpectedSuccess(self, test, details = None): + self.extractExtraResults(test, details = details) + return super(OETestResult, self).addUnexpectedSuccess(test) + + def logDetails(self, json_file_dir=None, configuration=None, result_id=None, + dump_streams=False): + self.tc.logger.info("RESULTS:") + + result = self.extraresults + logs = {} + if hasattr(self.tc, "extraresults"): + result.update(self.tc.extraresults) + + for case_name in self.tc._registry['cases']: + case = self.tc._registry['cases'][case_name] + + (status, log) = self._getTestResultDetails(case) + + t = "" + if case.id() in self.starttime and case.id() in self.endtime: + t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)" + + if status not in logs: + logs[status] = [] + logs[status].append("RESULTS - %s: %s%s" % (case.id(), status, t)) + report = {'status': status} + if log: + report['log'] = log + if dump_streams and case.id() in self.logged_output: + (stdout, stderr) = self.logged_output[case.id()] + report['stdout'] = stdout + report['stderr'] = stderr + result[case.id()] = report + + for i in ['PASSED', 'SKIPPED', 'EXPECTEDFAIL', 'ERROR', 'FAILED', 'UNKNOWN']: + if i not in logs: + continue + for l in logs[i]: + self.tc.logger.info(l) + + if json_file_dir: + tresultjsonhelper = OETestResultJSONHelper() + tresultjsonhelper.dump_testresult_file(json_file_dir, configuration, result_id, result) + + def wasSuccessful(self): + # Override as we unexpected successes aren't failures for us + return (len(self.failures) == len(self.errors) == 0) + +class OEListTestsResult(object): + def wasSuccessful(self): + return True + +class OETestRunner(_TestRunner): + streamLoggerClass = OEStreamLogger + + def __init__(self, tc, *args, **kwargs): + kwargs['stream'] = self.streamLoggerClass(tc.logger) + super(OETestRunner, self).__init__(*args, **kwargs) + self.tc = tc + self.resultclass = OETestResult + + def _makeResult(self): + return self.resultclass(self.tc, self.stream, self.descriptions, + self.verbosity) + + def _walk_suite(self, suite, func): + for obj in suite: + if isinstance(obj, unittest.suite.TestSuite): + if len(obj._tests): + self._walk_suite(obj, func) + elif isinstance(obj, unittest.case.TestCase): + func(self.tc.logger, obj) + self._walked_cases = self._walked_cases + 1 + + def _list_tests_name(self, suite): + self._walked_cases = 0 + + def _list_cases(logger, case): + oetags = [] + if hasattr(case, '__oeqa_testtags'): + oetags = getattr(case, '__oeqa_testtags') + if oetags: + logger.info("%s (%s)" % (case.id(), ",".join(oetags))) + else: + logger.info("%s" % (case.id())) + + self.tc.logger.info("Listing all available tests:") + self._walked_cases = 0 + self.tc.logger.info("test (tags)") + self.tc.logger.info("-" * 80) + self._walk_suite(suite, _list_cases) + self.tc.logger.info("-" * 80) + self.tc.logger.info("Total found:\t%s" % self._walked_cases) + + def _list_tests_class(self, suite): + self._walked_cases = 0 + + curr = {} + def _list_classes(logger, case): + if not 'module' in curr or curr['module'] != case.__module__: + curr['module'] = case.__module__ + logger.info(curr['module']) + + if not 'class' in curr or curr['class'] != \ + case.__class__.__name__: + curr['class'] = case.__class__.__name__ + logger.info(" -- %s" % curr['class']) + + logger.info(" -- -- %s" % case._testMethodName) + + self.tc.logger.info("Listing all available test classes:") + self._walk_suite(suite, _list_classes) + + def _list_tests_module(self, suite): + self._walked_cases = 0 + + listed = [] + def _list_modules(logger, case): + if not case.__module__ in listed: + if case.__module__.startswith('_'): + logger.info("%s (hidden)" % case.__module__) + else: + logger.info(case.__module__) + listed.append(case.__module__) + + self.tc.logger.info("Listing all available test modules:") + self._walk_suite(suite, _list_modules) + + def list_tests(self, suite, display_type): + if display_type == 'name': + self._list_tests_name(suite) + elif display_type == 'class': + self._list_tests_class(suite) + elif display_type == 'module': + self._list_tests_module(suite) + + return OEListTestsResult() + +class OETestResultJSONHelper(object): + + testresult_filename = 'testresults.json' + + def _get_existing_testresults_if_available(self, write_dir): + testresults = {} + file = os.path.join(write_dir, self.testresult_filename) + if os.path.exists(file): + with open(file, "r") as f: + testresults = json.load(f) + return testresults + + def _write_file(self, write_dir, file_name, file_content): + file_path = os.path.join(write_dir, file_name) + with open(file_path, 'w') as the_file: + the_file.write(file_content) + + def dump_testresult_file(self, write_dir, configuration, result_id, test_result): + bb.utils.mkdirhier(write_dir) + lf = bb.utils.lockfile(os.path.join(write_dir, 'jsontestresult.lock')) + test_results = self._get_existing_testresults_if_available(write_dir) + test_results[result_id] = {'configuration': configuration, 'result': test_result} + json_testresults = json.dumps(test_results, sort_keys=True, indent=4) + self._write_file(write_dir, self.testresult_filename, json_testresults) + bb.utils.unlockfile(lf) |