# Copyright (C) 2016 Intel Corporation # Released under the MIT license (see COPYING.MIT) import os import sys import json import time import logging import collections import re from oeqa.core.loader import OETestLoader from oeqa.core.runner import OETestRunner, OEStreamLogger, xmlEnabled class OETestContext(object): loaderClass = OETestLoader runnerClass = OETestRunner streamLoggerClass = OEStreamLogger files_dir = os.path.abspath(os.path.join(os.path.dirname( os.path.abspath(__file__)), "../files")) def __init__(self, td=None, logger=None): if not type(td) is dict: raise TypeError("td isn't dictionary type") self.td = td self.logger = logger self._registry = {} self._registry['cases'] = collections.OrderedDict() self._results = {} def _read_modules_from_manifest(self, manifest): if not os.path.exists(manifest): raise modules = [] for line in open(manifest).readlines(): line = line.strip() if line and not line.startswith("#"): modules.append(line) return modules def loadTests(self, module_paths, modules=[], tests=[], modules_manifest="", modules_required=[], filters={}): if modules_manifest: modules = self._read_modules_from_manifest(modules_manifest) self.loader = self.loaderClass(self, module_paths, modules, tests, modules_required, filters) self.suites = self.loader.discover() def runTests(self): streamLogger = self.streamLoggerClass(self.logger) self.runner = self.runnerClass(self, stream=streamLogger, verbosity=2) self._run_start_time = time.time() result = self.runner.run(self.suites) self._run_end_time = time.time() return result def logSummary(self, result, component, context_msg=''): self.logger.info("SUMMARY:") self.logger.info("%s (%s) - Ran %d test%s in %.3fs" % (component, context_msg, result.testsRun, result.testsRun != 1 and "s" or "", (self._run_end_time - self._run_start_time))) if result.wasSuccessful(): msg = "%s - OK - All required tests passed" % component else: msg = "%s - FAIL - Required tests failed" % component skipped = len(self._results['skipped']) if skipped: msg += " (skipped=%d)" % skipped self.logger.info(msg) def _getDetailsNotPassed(self, case, type, desc): found = False for (scase, msg) in self._results[type]: # XXX: When XML reporting is enabled scase is # xmlrunner.result._TestInfo instance instead of # string. if xmlEnabled: if case.id() == scase.test_id: found = True break scase_str = scase.test_id else: if case == scase: found = True break scase_str = str(scase) # When fails at module or class level the class name is passed as string # so figure out to see if match m = re.search("^setUpModule \((?P.*)\)$", scase_str) if m: if case.__class__.__module__ == m.group('module_name'): found = True break m = re.search("^setUpClass \((?P.*)\)$", scase_str) if m: class_name = "%s.%s" % (case.__class__.__module__, case.__class__.__name__) if class_name == m.group('class_name'): found = True break if found: return (found, msg) return (found, None) def logDetails(self): self.logger.info("RESULTS:") for case_name in self._registry['cases']: case = self._registry['cases'][case_name] result_types = ['failures', 'errors', 'skipped', 'expectedFailures'] result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL'] fail = False desc = None for idx, name in enumerate(result_types): (fail, msg) = self._getDetailsNotPassed(case, result_types[idx], result_desc[idx]) if fail: desc = result_desc[idx] break oeid = -1 for d in case.decorators: if hasattr(d, 'oeid'): oeid = d.oeid if fail: self.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), oeid, desc)) if msg: self.logger.info(msg) else: self.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), oeid, 'PASSED')) class OETestContextExecutor(object): _context_class = OETestContext name = 'core' help = 'core test component example' description = 'executes core test suite example' default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), 'cases/example')] default_test_data = os.path.join(default_cases[0], 'data.json') default_tests = None def register_commands(self, logger, subparsers): self.parser = subparsers.add_parser(self.name, help=self.help, description=self.description, group='components') self.default_output_log = '%s-results-%s.log' % (self.name, time.strftime("%Y%m%d%H%M%S")) self.parser.add_argument('--output-log', action='store', default=self.default_output_log, help="results output log, default: %s" % self.default_output_log) self.parser.add_argument('--run-tests', action='store', default=self.default_tests, help="tests to run in [.[.]] format. Just works for modules now") if self.default_test_data: self.parser.add_argument('--test-data-file', action='store', default=self.default_test_data, help="data file to load, default: %s" % self.default_test_data) else: self.parser.add_argument('--test-data-file', action='store', help="data file to load") if self.default_cases: self.parser.add_argument('CASES_PATHS', action='store', default=self.default_cases, nargs='*', help="paths to directories with test cases, default: %s"\ % self.default_cases) else: self.parser.add_argument('CASES_PATHS', action='store', nargs='+', help="paths to directories with test cases") self.parser.set_defaults(func=self.run) def _setup_logger(self, logger, args): formatter = logging.Formatter('%(asctime)s - ' + self.name + \ ' - %(levelname)s - %(message)s') sh = logger.handlers[0] sh.setFormatter(formatter) fh = logging.FileHandler(args.output_log) fh.setFormatter(formatter) logger.addHandler(fh) return logger def _process_args(self, logger, args): self.tc_kwargs = {} self.tc_kwargs['init'] = {} self.tc_kwargs['load'] = {} self.tc_kwargs['run'] = {} self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) if args.test_data_file: self.tc_kwargs['init']['td'] = json.load( open(args.test_data_file, "r")) else: self.tc_kwargs['init']['td'] = {} if args.run_tests: self.tc_kwargs['load']['modules'] = args.run_tests.split() else: self.tc_kwargs['load']['modules'] = None self.module_paths = args.CASES_PATHS def run(self, logger, args): self._process_args(logger, args) self.tc = self._context_class(**self.tc_kwargs['init']) self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) rc = self.tc.runTests(**self.tc_kwargs['run']) self.tc.logSummary(rc, self.name) self.tc.logDetails() output_link = os.path.join(os.path.dirname(args.output_log), "%s-results.log" % self.name) if os.path.exists(output_link): os.remove(output_link) os.symlink(args.output_log, output_link) return rc _executor_class = OETestContextExecutor