From 3b2a20eee4a39f40287bf67545839eaa09fc892d Mon Sep 17 00:00:00 2001 From: Leonardo Sandoval Date: Thu, 25 May 2017 15:20:56 -0500 Subject: scripts/oe-selftest: Migrate to new framework into oeqa.selftest.context MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The new OEQA framework aims to re-use code into the different Test components. The previous oe-selftest implements it-self loading, run, and list test cases in a non-standard way (unittest base) and other functionalities like logging that is now on oeqa core. This ends on a compact oe-selftest script. All needed command line options was migrated but there are some of them pending of implementation and others deprecated. Deprecated options: list-tags: The tag functionality into the old oeqa framework isn't work, the selftest doesn't has tag decorators. {run, list}-tests-by: Ambiguos options it accepts all the posibilites module, class, name, id or tag. Remaining to implement: coverage: It enables covrage reports over a test run, currently isn't on on use and some bugs [1], i filed a bug to add support to OEQA core module in this way other Test components could enable it. repository: It push XML results into a git repository and isn't in use, i filed a bug to implement this into OEQA core module. [2] [1] https://bugzilla.yoctoproject.org/show_bug.cgi?id=11582#c0 [2] https://bugzilla.yoctoproject.org/show_bug.cgi?id=11583#c0 Signed-off-by: Leonardo Sandoval Signed-off-by: Aníbal Limón --- meta/lib/oeqa/selftest/context.py | 224 ++++++++++++ scripts/oe-selftest | 737 ++------------------------------------ 2 files changed, 250 insertions(+), 711 deletions(-) create mode 100644 meta/lib/oeqa/selftest/context.py diff --git a/meta/lib/oeqa/selftest/context.py b/meta/lib/oeqa/selftest/context.py new file mode 100644 index 0000000000..ca73070c0b --- /dev/null +++ b/meta/lib/oeqa/selftest/context.py @@ -0,0 +1,224 @@ +# Copyright (C) 2017 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import time +import glob +import sys +import imp +from random import choice + +import oeqa + +from oeqa.core.context import OETestContext, OETestContextExecutor +from oeqa.core.exception import OEQAPreRun + +from oeqa.utils.commands import runCmd, get_bb_vars, get_test_layer + +class OESelftestTestContext(OETestContext): + def __init__(self, td=None, logger=None, machines=None, testlayer_path=None): + super(OESelftestTestContext, self).__init__(td, logger) + + self.machines = machines + self.custommachine = None + + self.testlayer_path = testlayer_path + + def runTests(self, machine=None): + if machine: + self.custommachine = machine + if machine == 'random': + self.custommachine = choice(self.machines) + self.logger.info('Run tests with custom MACHINE set to: %s' % \ + self.custommachine) + return super(OESelftestTestContext, self).runTests() + + def listTests(self, display_type, machine=None): + return super(OESelftestTestContext, self).listTests(display_type) + +class OESelftestTestContextExecutor(OETestContextExecutor): + _context_class = OESelftestTestContext + _script_executor = 'oe-selftest' + + name = 'oe-selftest' + help = 'oe-selftest test component' + description = 'Executes selftest tests' + + def register_commands(self, logger, parser): + group = parser.add_mutually_exclusive_group(required=True) + + group.add_argument('-a', '--run-all-tests', default=False, + action="store_true", dest="run_all_tests", + help='Run all (unhidden) tests') + group.add_argument('-r', '--run-tests', required=False, action='store', + nargs='+', dest="run_tests", default=None, + help='Select what tests to run (modules, classes or test methods). Format should be: ..') + + group.add_argument('-m', '--list-modules', required=False, + action="store_true", default=False, + help='List all available test modules.') + group.add_argument('--list-classes', required=False, + action="store_true", default=False, + help='List all available test classes.') + group.add_argument('-l', '--list-tests', required=False, + action="store_true", default=False, + help='List all available tests.') + + parser.add_argument('--machine', required=False, choices=['random', 'all'], + help='Run tests on different machines (random/all).') + + parser.set_defaults(func=self.run) + + def _get_available_machines(self): + machines = [] + + bbpath = self.tc_kwargs['init']['td']['BBPATH'].split(':') + + for path in bbpath: + found_machines = glob.glob(os.path.join(path, 'conf', 'machine', '*.conf')) + if found_machines: + for i in found_machines: + # eg: '/home//poky/meta-intel/conf/machine/intel-core2-32.conf' + machines.append(os.path.splitext(os.path.basename(i))[0]) + + return machines + + def _get_cases_paths(self, bbpath): + cases_paths = [] + for layer in bbpath: + cases_dir = os.path.join(layer, 'lib', 'oeqa', 'selftest', 'cases') + if os.path.isdir(cases_dir): + cases_paths.append(cases_dir) + return cases_paths + + def _process_args(self, logger, args): + args.output_log = '%s-results-%s.log' % (self.name, + time.strftime("%Y%m%d%H%M%S")) + args.test_data_file = None + args.CASES_PATHS = None + + super(OESelftestTestContextExecutor, self)._process_args(logger, args) + + if args.list_modules: + args.list_tests = 'module' + elif args.list_classes: + args.list_tests = 'class' + elif args.list_tests: + args.list_tests = 'name' + + self.tc_kwargs['init']['td'] = get_bb_vars() + self.tc_kwargs['init']['machines'] = self._get_available_machines() + self.tc_kwargs['init']['testlayer_path'] = get_test_layer() + + def _pre_run(self): + def _check_required_env_variables(vars): + for var in vars: + if not os.environ.get(var): + self.tc.logger.error("%s is not set. Did you forget to source your build environment setup script?" % var) + raise OEQAPreRun + + def _check_presence_meta_selftest(): + builddir = os.environ.get("BUILDDIR") + if os.getcwd() != builddir: + self.tc.logger.info("Changing cwd to %s" % builddir) + os.chdir(builddir) + + if not "meta-selftest" in self.tc.td["BBLAYERS"]: + self.tc.logger.warn("meta-selftest layer not found in BBLAYERS, adding it") + meta_selftestdir = os.path.join( + self.tc.td["BBLAYERS_FETCH_DIR"], 'meta-selftest') + if os.path.isdir(meta_selftestdir): + runCmd("bitbake-layers add-layer %s" %meta_selftestdir) + # reload data is needed because a meta-selftest layer was add + self.tc.td = get_bb_vars() + else: + self.tc.logger.error("could not locate meta-selftest in:\n%s" % meta_selftestdir) + raise OEQAPreRun + + def _add_layer_libs(): + bbpath = self.tc.td['BBPATH'].split(':') + layer_libdirs = [p for p in (os.path.join(l, 'lib') \ + for l in bbpath) if os.path.exists(p)] + if layer_libdirs: + self.tc.logger.info("Adding layer libraries:") + for l in layer_libdirs: + self.tc.logger.info("\t%s" % l) + + sys.path.extend(layer_libdirs) + imp.reload(oeqa.selftest) + + _check_required_env_variables(["BUILDDIR"]) + _check_presence_meta_selftest() + + if "buildhistory.bbclass" in self.tc.td["BBINCLUDED"]: + self.tc.logger.error("You have buildhistory enabled already and this isn't recommended for selftest, please disable it first.") + raise OEQAPreRun + + if "PRSERV_HOST" in self.tc.td: + self.tc.logger.error("Please unset PRSERV_HOST in order to run oe-selftest") + raise OEQAPreRun + + if "SANITY_TESTED_DISTROS" in self.tc.td: + self.tc.logger.error("Please unset SANITY_TESTED_DISTROS in order to run oe-selftest") + raise OEQAPreRun + + _add_layer_libs() + + self.tc.logger.info("Running bitbake -p") + runCmd("bitbake -p") + + def _internal_run(self, logger, args): + self.module_paths = self._get_cases_paths( + self.tc_kwargs['init']['td']['BBPATH'].split(':')) + + self.tc = self._context_class(**self.tc_kwargs['init']) + self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) + + if args.list_tests: + rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['run']) + else: + self._pre_run() + rc = self.tc.runTests(**self.tc_kwargs['run']) + rc.logSummary(self.name) + rc.logDetails() + + return rc + + def run(self, logger, args): + self._process_args(logger, args) + rc = None + + if args.machine: + logger.info('Custom machine mode enabled. MACHINE set to %s' % + args.machine) + + if args.machine == 'all': + results = [] + for m in self.tc_kwargs['init']['machines']: + self.tc_kwargs['run']['machine'] = m + results.append(self._internal_run(logger, args)) + + # XXX: the oe-selftest script only needs to know if one + # machine run fails + for r in results: + rc = r + if not r.wasSuccessful(): + break + + else: + self.tc_kwargs['run']['machine'] = args.machine + return self._internal_run(logger, args) + + else: + self.tc_kwargs['run']['machine'] = args.machine + rc = self._internal_run(logger, args) + + output_link = os.path.join(os.path.dirname(args.output_log), + "%s-results.log" % self.name) + if os.path.exists(output_link): + os.remove(output_link) + os.symlink(args.output_log, output_link) + + return rc + +_executor_class = OESelftestTestContextExecutor diff --git a/scripts/oe-selftest b/scripts/oe-selftest index 490915759f..b200acee13 100755 --- a/scripts/oe-selftest +++ b/scripts/oe-selftest @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright (c) 2013 Intel Corporation +# Copyright (c) 2013-2017 Intel Corporation # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as @@ -25,732 +25,47 @@ # E.g: "oe-selftest -r bblayers.BitbakeLayers" will run just the BitbakeLayers class from meta/lib/oeqa/selftest/bblayers.py + import os import sys -import unittest -import logging import argparse -import subprocess -import time as t -import re -import fnmatch -import collections -import imp +import logging -sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/lib') +scripts_path = os.path.dirname(os.path.realpath(__file__)) +lib_path = scripts_path + '/lib' +sys.path = sys.path + [lib_path] +import argparse_oe +import scriptutils import scriptpath -scriptpath.add_bitbake_lib_path() scriptpath.add_oe_lib_path() -import argparse_oe - -import oeqa.selftest -import oeqa.utils.ftools as ftools -from oeqa.utils.commands import runCmd, get_bb_var, get_test_layer -from oeqa.utils.metadata import metadata_from_bb, write_metadata_file -from oeqa.selftest.base import oeSelfTest, get_available_machines - -try: - import xmlrunner - from xmlrunner.result import _XMLTestResult as TestResult - from xmlrunner import XMLTestRunner as _TestRunner -except ImportError: - # use the base runner instead - from unittest import TextTestResult as TestResult - from unittest import TextTestRunner as _TestRunner - -log_prefix = "oe-selftest-" + t.strftime("%Y%m%d-%H%M%S") - -def logger_create(): - log_file = log_prefix + ".log" - if os.path.lexists("oe-selftest.log"): - os.remove("oe-selftest.log") - os.symlink(log_file, "oe-selftest.log") - - log = logging.getLogger("selftest") - log.setLevel(logging.DEBUG) - - fh = logging.FileHandler(filename=log_file, mode='w') - fh.setLevel(logging.DEBUG) - - ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.INFO) - - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - fh.setFormatter(formatter) - ch.setFormatter(formatter) - - log.addHandler(fh) - log.addHandler(ch) +scriptpath.add_bitbake_lib_path() - return log +from oeqa.utils import load_test_components +from oeqa.core.exception import OEQAPreRun -log = logger_create() +logger = scriptutils.logger_create('oe-selftest') -def get_args_parser(): +def main(): description = "Script that runs unit tests against bitbake and other Yocto related tools. The goal is to validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest for more information." parser = argparse_oe.ArgumentParser(description=description) - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument('-r', '--run-tests', required=False, action='store', nargs='*', dest="run_tests", default=None, help='Select what tests to run (modules, classes or test methods). Format should be: ..') - group.add_argument('-a', '--run-all-tests', required=False, action="store_true", dest="run_all_tests", default=False, help='Run all (unhidden) tests') - group.add_argument('-m', '--list-modules', required=False, action="store_true", dest="list_modules", default=False, help='List all available test modules.') - group.add_argument('--list-classes', required=False, action="store_true", dest="list_allclasses", default=False, help='List all available test classes.') - parser.add_argument('--coverage', action="store_true", help="Run code coverage when testing") - parser.add_argument('--coverage-source', dest="coverage_source", nargs="+", help="Specifiy the directories to take coverage from") - parser.add_argument('--coverage-include', dest="coverage_include", nargs="+", help="Specify extra patterns to include into the coverage measurement") - parser.add_argument('--coverage-omit', dest="coverage_omit", nargs="+", help="Specify with extra patterns to exclude from the coverage measurement") - group.add_argument('--run-tests-by', required=False, dest='run_tests_by', default=False, nargs='*', - help='run-tests-by ') - group.add_argument('--list-tests-by', required=False, dest='list_tests_by', default=False, nargs='*', - help='list-tests-by ') - group.add_argument('-l', '--list-tests', required=False, action="store_true", dest="list_tests", default=False, - help='List all available tests.') - group.add_argument('--list-tags', required=False, dest='list_tags', default=False, action="store_true", - help='List all tags that have been set to test cases.') - parser.add_argument('--machine', required=False, dest='machine', choices=['random', 'all'], default=None, - help='Run tests on different machines (random/all).') - parser.add_argument('--repository', required=False, dest='repository', default='', action='store', - help='Submit test results to a repository') - return parser - -builddir = None - - -def preflight_check(): - - global builddir - - log.info("Checking that everything is in order before running the tests") - - if not os.environ.get("BUILDDIR"): - log.error("BUILDDIR isn't set. Did you forget to source your build environment setup script?") - return False - - builddir = os.environ.get("BUILDDIR") - if os.getcwd() != builddir: - log.info("Changing cwd to %s" % builddir) - os.chdir(builddir) - - if not "meta-selftest" in get_bb_var("BBLAYERS"): - log.warn("meta-selftest layer not found in BBLAYERS, adding it") - meta_selftestdir = os.path.join( - get_bb_var("BBLAYERS_FETCH_DIR"), - 'meta-selftest') - if os.path.isdir(meta_selftestdir): - runCmd("bitbake-layers add-layer %s" %meta_selftestdir) - else: - log.error("could not locate meta-selftest in:\n%s" - %meta_selftestdir) - return False - - if "buildhistory.bbclass" in get_bb_var("BBINCLUDED"): - log.error("You have buildhistory enabled already and this isn't recommended for selftest, please disable it first.") - return False - - if get_bb_var("PRSERV_HOST"): - log.error("Please unset PRSERV_HOST in order to run oe-selftest") - return False - - if get_bb_var("SANITY_TESTED_DISTROS"): - log.error("Please unset SANITY_TESTED_DISTROS in order to run oe-selftest") - return False - - log.info("Running bitbake -p") - runCmd("bitbake -p") - - return True -def get_tests_modules(include_hidden=False): - modules_list = list() - for modules_path in oeqa.selftest.__path__: - for (p, d, f) in os.walk(modules_path): - files = sorted([f for f in os.listdir(p) if f.endswith('.py') and not (f.startswith('_') and not include_hidden) and not f.startswith('__') and f != 'base.py']) - for f in files: - submodules = p.split("selftest")[-1] - module = "" - if submodules: - module = 'oeqa.selftest' + submodules.replace("/",".") + "." + f.split('.py')[0] - else: - module = 'oeqa.selftest.' + f.split('.py')[0] - if module not in modules_list: - modules_list.append(module) - return modules_list - - -def get_tests(exclusive_modules=[], include_hidden=False): - test_modules = list() - for x in exclusive_modules: - test_modules.append('oeqa.selftest.' + x) - if not test_modules: - inc_hidden = include_hidden - test_modules = get_tests_modules(inc_hidden) - - return test_modules - - -class Tc: - def __init__(self, tcname, tcclass, tcmodule, tcid=None, tctag=None): - self.tcname = tcname - self.tcclass = tcclass - self.tcmodule = tcmodule - self.tcid = tcid - # A test case can have multiple tags (as tuples) otherwise str will suffice - self.tctag = tctag - self.fullpath = '.'.join(['oeqa', 'selftest', tcmodule, tcclass, tcname]) - - -def get_tests_from_module(tmod): - tlist = [] - prefix = 'oeqa.selftest.' + comp_name, comp = load_test_components(logger, 'oe-selftest').popitem() + comp.register_commands(logger, parser) try: - import importlib - modlib = importlib.import_module(tmod) - for mod in list(vars(modlib).values()): - if isinstance(mod, type(oeSelfTest)) and issubclass(mod, oeSelfTest) and mod is not oeSelfTest: - for test in dir(mod): - if test.startswith('test_') and hasattr(vars(mod)[test], '__call__'): - # Get test case id and feature tag - # NOTE: if testcase decorator or feature tag not set will throw error - try: - tid = vars(mod)[test].test_case - except: - print('DEBUG: tc id missing for ' + str(test)) - tid = None - try: - ttag = vars(mod)[test].tag__feature - except: - # print('DEBUG: feature tag missing for ' + str(test)) - ttag = None - - # NOTE: for some reason lstrip() doesn't work for mod.__module__ - tlist.append(Tc(test, mod.__name__, mod.__module__.replace(prefix, ''), tid, ttag)) - except: - pass - - return tlist - - -def get_all_tests(): - # Get all the test modules (except the hidden ones) - testlist = [] - tests_modules = get_tests_modules() - # Get all the tests from modules - for tmod in sorted(tests_modules): - testlist += get_tests_from_module(tmod) - return testlist - - -def get_testsuite_by(criteria, keyword): - # Get a testsuite based on 'keyword' - # criteria: name, class, module, id, tag - # keyword: a list of tests, classes, modules, ids, tags - - ts = [] - all_tests = get_all_tests() - - def get_matches(values): - # Get an item and return the ones that match with keyword(s) - # values: the list of items (names, modules, classes...) - result = [] - remaining = values[:] - for key in keyword: - found = False - if key in remaining: - # Regular matching of exact item - result.append(key) - remaining.remove(key) - found = True - else: - # Wildcard matching - pattern = re.compile(fnmatch.translate(r"%s" % key)) - added = [x for x in remaining if pattern.match(x)] - if added: - result.extend(added) - remaining = [x for x in remaining if x not in added] - found = True - if not found: - log.error("Failed to find test: %s" % key) - - return result - - if criteria == 'name': - names = get_matches([ tc.tcname for tc in all_tests ]) - ts = [ tc for tc in all_tests if tc.tcname in names ] - - elif criteria == 'class': - classes = get_matches([ tc.tcclass for tc in all_tests ]) - ts = [ tc for tc in all_tests if tc.tcclass in classes ] - - elif criteria == 'module': - modules = get_matches([ tc.tcmodule for tc in all_tests ]) - ts = [ tc for tc in all_tests if tc.tcmodule in modules ] - - elif criteria == 'id': - ids = get_matches([ str(tc.tcid) for tc in all_tests ]) - ts = [ tc for tc in all_tests if str(tc.tcid) in ids ] - - elif criteria == 'tag': - values = set() - for tc in all_tests: - # tc can have multiple tags (as tuple) otherwise str will suffice - if isinstance(tc.tctag, tuple): - values |= { str(tag) for tag in tc.tctag } - else: - values.add(str(tc.tctag)) - - tags = get_matches(list(values)) - - for tc in all_tests: - for tag in tags: - if isinstance(tc.tctag, tuple) and tag in tc.tctag: - ts.append(tc) - elif tag == tc.tctag: - ts.append(tc) - - # Remove duplicates from the list - ts = list(set(ts)) - - return ts - - -def list_testsuite_by(criteria, keyword): - # Get a testsuite based on 'keyword' - # criteria: name, class, module, id, tag - # keyword: a list of tests, classes, modules, ids, tags - def tc_key(t): - if t[0] is None: - return (0,) + t[1:] - return t - # tcid may be None if no ID was assigned, in which case sorted() will throw - # a TypeError as Python 3 does not allow comparison (<,<=,>=,>) of - # heterogeneous types, handle this by using a custom key generator - ts = sorted([ (tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule) \ - for tc in get_testsuite_by(criteria, keyword) ], key=tc_key) - print('_' * 150) - for t in ts: - if isinstance(t[1], (tuple, list)): - print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t[0], ', '.join(t[1]), t[2], t[3], t[4])) - else: - print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % t) - print('_' * 150) - print('Filtering by:\t %s' % criteria) - print('Looking for:\t %s' % ', '.join(str(x) for x in keyword)) - print('Total found:\t %s' % len(ts)) - - -def list_tests(): - # List all available oe-selftest tests - - ts = get_all_tests() - - print('%-4s\t%-10s\t%-50s' % ('id', 'tag', 'test')) - print('_' * 80) - for t in ts: - if isinstance(t.tctag, (tuple, list)): - print('%-4s\t%-10s\t%-50s' % (t.tcid, ', '.join(t.tctag), '.'.join([t.tcmodule, t.tcclass, t.tcname]))) - else: - print('%-4s\t%-10s\t%-50s' % (t.tcid, t.tctag, '.'.join([t.tcmodule, t.tcclass, t.tcname]))) - print('_' * 80) - print('Total found:\t %s' % len(ts)) - -def list_tags(): - # Get all tags set to test cases - # This is useful when setting tags to test cases - # The list of tags should be kept as minimal as possible - tags = set() - all_tests = get_all_tests() - - for tc in all_tests: - if isinstance(tc.tctag, (tuple, list)): - tags.update(set(tc.tctag)) - else: - tags.add(tc.tctag) - - print('Tags:\t%s' % ', '.join(str(x) for x in tags)) - -def coverage_setup(coverage_source, coverage_include, coverage_omit): - """ Set up the coverage measurement for the testcases to be run """ - import datetime - import subprocess - global builddir - pokydir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) - curcommit= subprocess.check_output(["git", "--git-dir", os.path.join(pokydir, ".git"), "rev-parse", "HEAD"]).decode('utf-8') - coveragerc = "%s/.coveragerc" % builddir - data_file = "%s/.coverage." % builddir - data_file += datetime.datetime.now().strftime('%Y%m%dT%H%M%S') - if os.path.isfile(data_file): - os.remove(data_file) - with open(coveragerc, 'w') as cps: - cps.write("# Generated with command '%s'\n" % " ".join(sys.argv)) - cps.write("# HEAD commit %s\n" % curcommit.strip()) - cps.write("[run]\n") - cps.write("data_file = %s\n" % data_file) - cps.write("branch = True\n") - # Measure just BBLAYERS, scripts and bitbake folders - cps.write("source = \n") - if coverage_source: - for directory in coverage_source: - if not os.path.isdir(directory): - log.warn("Directory %s is not valid.", directory) - cps.write(" %s\n" % directory) - else: - for layer in get_bb_var('BBLAYERS').split(): - cps.write(" %s\n" % layer) - cps.write(" %s\n" % os.path.dirname(os.path.realpath(__file__))) - cps.write(" %s\n" % os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),'bitbake')) - - if coverage_include: - cps.write("include = \n") - for pattern in coverage_include: - cps.write(" %s\n" % pattern) - if coverage_omit: - cps.write("omit = \n") - for pattern in coverage_omit: - cps.write(" %s\n" % pattern) - - return coveragerc - -def coverage_report(): - """ Loads the coverage data gathered and reports it back """ - try: - # Coverage4 uses coverage.Coverage - from coverage import Coverage - except: - # Coverage under version 4 uses coverage.coverage - from coverage import coverage as Coverage - - import io as StringIO - from coverage.misc import CoverageException - - cov_output = StringIO.StringIO() - # Creating the coverage data with the setting from the configuration file - cov = Coverage(config_file = os.environ.get('COVERAGE_PROCESS_START')) - try: - # Load data from the data file specified in the configuration - cov.load() - # Store report data in a StringIO variable - cov.report(file = cov_output, show_missing=False) - log.info("\n%s" % cov_output.getvalue()) - except CoverageException as e: - # Show problems with the reporting. Since Coverage4 not finding any data to report raises an exception - log.warn("%s" % str(e)) - finally: - cov_output.close() - - -def main(): - parser = get_args_parser() - args = parser.parse_args() - - # Add /lib to sys.path, so layers can add selftests - log.info("Running bitbake -e to get BBPATH") - bbpath = get_bb_var('BBPATH').split(':') - layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)] - sys.path.extend(layer_libdirs) - imp.reload(oeqa.selftest) - - # act like bitbake and enforce en_US.UTF-8 locale - os.environ["LC_ALL"] = "en_US.UTF-8" - - if args.run_tests_by and len(args.run_tests_by) >= 2: - valid_options = ['name', 'class', 'module', 'id', 'tag'] - if args.run_tests_by[0] not in valid_options: - print('--run-tests-by %s not a valid option. Choose one of .' % args.run_tests_by[0]) - return 1 - else: - criteria = args.run_tests_by[0] - keyword = args.run_tests_by[1:] - ts = sorted([ tc.fullpath for tc in get_testsuite_by(criteria, keyword) ]) - if not ts: - return 1 - - if args.list_tests_by and len(args.list_tests_by) >= 2: - valid_options = ['name', 'class', 'module', 'id', 'tag'] - if args.list_tests_by[0] not in valid_options: - print('--list-tests-by %s not a valid option. Choose one of .' % args.list_tests_by[0]) - return 1 - else: - criteria = args.list_tests_by[0] - keyword = args.list_tests_by[1:] - list_testsuite_by(criteria, keyword) - - if args.list_tests: - list_tests() - - if args.list_tags: - list_tags() - - if args.list_allclasses: - args.list_modules = True - - if args.list_modules: - log.info('Listing all available test modules:') - testslist = get_tests(include_hidden=True) - for test in testslist: - module = test.split('oeqa.selftest.')[-1] - info = '' - if module.startswith('_'): - info = ' (hidden)' - print(module + info) - if args.list_allclasses: - try: - import importlib - modlib = importlib.import_module(test) - for v in vars(modlib): - t = vars(modlib)[v] - if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest: - print(" --", v) - for method in dir(t): - if method.startswith("test_") and isinstance(vars(t)[method], collections.Callable): - print(" -- --", method) - - except (AttributeError, ImportError) as e: - print(e) - pass - - if args.run_tests or args.run_all_tests or args.run_tests_by: - if not preflight_check(): - return 1 - - if args.run_tests_by: - testslist = ts - else: - testslist = get_tests(exclusive_modules=(args.run_tests or []), include_hidden=False) - - suite = unittest.TestSuite() - loader = unittest.TestLoader() - loader.sortTestMethodsUsing = None - runner = TestRunner(verbosity=2, - resultclass=buildResultClass(args)) - # we need to do this here, otherwise just loading the tests - # will take 2 minutes (bitbake -e calls) - oeSelfTest.testlayer_path = get_test_layer() - for test in testslist: - log.info("Loading tests from: %s" % test) - try: - suite.addTests(loader.loadTestsFromName(test)) - except AttributeError as e: - log.error("Failed to import %s" % test) - log.error(e) - return 1 - - if args.machine: - # Custom machine sets only weak default values (??=) for MACHINE in machine.inc - # This let test cases that require a specific MACHINE to be able to override it, using (?= or =) - log.info('Custom machine mode enabled. MACHINE set to %s' % args.machine) - if args.machine == 'random': - os.environ['CUSTOMMACHINE'] = 'random' - result = runner.run(suite) - else: # all - machines = get_available_machines() - for m in machines: - log.info('Run tests with custom MACHINE set to: %s' % m) - os.environ['CUSTOMMACHINE'] = m - result = runner.run(suite) - else: - result = runner.run(suite) - - log.info("Finished") - - if args.repository: - import git - # Commit tests results to repository - metadata = metadata_from_bb() - git_dir = os.path.join(os.getcwd(), 'selftest') - if not os.path.isdir(git_dir): - os.mkdir(git_dir) - - log.debug('Checking for git repository in %s' % git_dir) - try: - repo = git.Repo(git_dir) - except git.exc.InvalidGitRepositoryError: - log.debug("Couldn't find git repository %s; " - "cloning from %s" % (git_dir, args.repository)) - repo = git.Repo.clone_from(args.repository, git_dir) - - r_branches = repo.git.branch(r=True) - r_branches = set(r_branches.replace('origin/', '').split()) - l_branches = {str(branch) for branch in repo.branches} - branch = '%s/%s/%s' % (metadata['hostname'], - metadata['layers']['meta'].get('branch', '(nogit)'), - metadata['config']['MACHINE']) - - if branch in l_branches: - log.debug('Found branch in local repository, checking out') - repo.git.checkout(branch) - elif branch in r_branches: - log.debug('Found branch in remote repository, checking' - ' out and pulling') - repo.git.checkout(branch) - repo.git.pull() - else: - log.debug('New branch %s' % branch) - repo.git.checkout('master') - repo.git.checkout(b=branch) - - cleanResultsDir(repo) - xml_dir = os.path.join(os.getcwd(), log_prefix) - copyResultFiles(xml_dir, git_dir, repo) - metadata_file = os.path.join(git_dir, 'metadata.xml') - write_metadata_file(metadata_file, metadata) - repo.index.add([metadata_file]) - repo.index.write() - - # Get information for commit message - layer_info = '' - for layer, values in metadata['layers'].items(): - layer_info = '%s%-17s = %s:%s\n' % (layer_info, layer, - values.get('branch', '(nogit)'), values.get('commit', '0'*40)) - msg = 'Selftest for build %s of %s for machine %s on %s\n\n%s' % ( - log_prefix[12:], metadata['distro']['pretty_name'], - metadata['config']['MACHINE'], metadata['hostname'], layer_info) - - log.debug('Commiting results to local repository') - repo.index.commit(msg) - if not repo.is_dirty(): - try: - if branch in r_branches: - log.debug('Pushing changes to remote repository') - repo.git.push() - else: - log.debug('Pushing changes to remote repository ' - 'creating new branch') - repo.git.push('-u', 'origin', branch) - except GitCommandError: - log.error('Falied to push to remote repository') - return 1 - else: - log.error('Local repository is dirty, not pushing commits') - - if result.wasSuccessful(): - return 0 - else: - return 1 - -def buildResultClass(args): - """Build a Result Class to use in the testcase execution""" - import site - - class StampedResult(TestResult): - """ - Custom TestResult that prints the time when a test starts. As oe-selftest - can take a long time (ie a few hours) to run, timestamps help us understand - what tests are taking a long time to execute. - If coverage is required, this class executes the coverage setup and reporting. - """ - def startTest(self, test): - import time - self.stream.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " - ") - super(StampedResult, self).startTest(test) - - def startTestRun(self): - """ Setup coverage before running any testcase """ - - # variable holding the coverage configuration file allowing subprocess to be measured - self.coveragepth = None - - # indicates the system if coverage is currently installed - self.coverage_installed = True - - if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit: - try: - # check if user can do coverage - import coverage - except: - log.warn("python coverage is not installed. More info on https://pypi.python.org/pypi/coverage") - self.coverage_installed = False - - if self.coverage_installed: - log.info("Coverage is enabled") - - major_version = int(coverage.version.__version__[0]) - if major_version < 4: - log.error("python coverage %s installed. Require version 4 or greater." % coverage.version.__version__) - self.stop() - # In case the user has not set the variable COVERAGE_PROCESS_START, - # create a default one and export it. The COVERAGE_PROCESS_START - # value indicates where the coverage configuration file resides - # More info on https://pypi.python.org/pypi/coverage - if not os.environ.get('COVERAGE_PROCESS_START'): - os.environ['COVERAGE_PROCESS_START'] = coverage_setup(args.coverage_source, args.coverage_include, args.coverage_omit) - - # Use default site.USER_SITE and write corresponding config file - site.ENABLE_USER_SITE = True - if not os.path.exists(site.USER_SITE): - os.makedirs(site.USER_SITE) - self.coveragepth = os.path.join(site.USER_SITE, "coverage.pth") - with open(self.coveragepth, 'w') as cps: - cps.write('import sys,site; sys.path.extend(site.getsitepackages()); import coverage; coverage.process_startup();') - - def stopTestRun(self): - """ Report coverage data after the testcases are run """ - - if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit: - if self.coverage_installed: - with open(os.environ['COVERAGE_PROCESS_START']) as ccf: - log.info("Coverage configuration file (%s)" % os.environ.get('COVERAGE_PROCESS_START')) - log.info("===========================") - log.info("\n%s" % "".join(ccf.readlines())) - - log.info("Coverage Report") - log.info("===============") - try: - coverage_report() - finally: - # remove the pth file - try: - os.remove(self.coveragepth) - except OSError: - log.warn("Expected temporal file from coverage is missing, ignoring removal.") - - return StampedResult - -def cleanResultsDir(repo): - """ Remove result files from directory """ - - xml_files = [] - directory = repo.working_tree_dir - for f in os.listdir(directory): - path = os.path.join(directory, f) - if os.path.isfile(path) and path.endswith('.xml'): - xml_files.append(f) - repo.index.remove(xml_files, working_tree=True) - -def copyResultFiles(src, dst, repo): - """ Copy result files from src to dst removing the time stamp. """ - - import shutil - - re_time = re.compile("-[0-9]+") - file_list = [] - - for root, subdirs, files in os.walk(src): - tmp_dir = root.replace(src, '').lstrip('/') - for s in subdirs: - os.mkdir(os.path.join(dst, tmp_dir, s)) - for f in files: - file_name = os.path.join(dst, tmp_dir, re_time.sub("", f)) - shutil.copy2(os.path.join(root, f), file_name) - file_list.append(file_name) - repo.index.add(file_list) + args = parser.parse_args() + results = args.func(logger, args) + ret = 0 if results.wasSuccessful() else 1 + except SystemExit as err: + if err.code != 0: + raise err + ret = err.code + except OEQAPreRun as pr: + ret = 1 -class TestRunner(_TestRunner): - """Test runner class aware of exporting tests.""" - def __init__(self, *args, **kwargs): - try: - exportdir = os.path.join(os.getcwd(), log_prefix) - kwargsx = dict(**kwargs) - # argument specific to XMLTestRunner, if adding a new runner then - # also add logic to use other runner's args. - kwargsx['output'] = exportdir - kwargsx['descriptions'] = False - # done for the case where telling the runner where to export - super(TestRunner, self).__init__(*args, **kwargsx) - except TypeError: - log.info("test runner init'ed like unittest") - super(TestRunner, self).__init__(*args, **kwargs) + return ret -if __name__ == "__main__": +if __name__ == '__main__': try: ret = main() except Exception: -- cgit 1.2.3-korg