#!/usr/bin/env python # Copyright (c) 2013 Intel Corporation # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # DESCRIPTION # This script runs tests defined in meta/lib/selftest/ # It's purpose is to automate the testing of different bitbake tools. # To use it you just need to source your build environment setup script and # add the meta-selftest layer to your BBLAYERS. # Call the script as: "oe-selftest" to run all the tests in in meta/lib/selftest/ # Call the script as: "oe-selftest .." to run just a single test # E.g: "oe-selftest bboutput.BitbakeLayers" will run just the BitbakeLayers class from meta/lib/selftest/bboutput.py import os import sys import unittest import logging import argparse import subprocess import time as t sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/lib') import scriptpath scriptpath.add_bitbake_lib_path() scriptpath.add_oe_lib_path() import argparse_oe import oeqa.selftest import oeqa.utils.ftools as ftools from oeqa.utils.commands import runCmd, get_bb_var, get_test_layer from oeqa.selftest.base import oeSelfTest, get_available_machines def logger_create(): log_file = "oe-selftest-" + t.strftime("%Y-%m-%d_%H:%M:%S") + ".log" if os.path.exists("oe-selftest.log"): os.remove("oe-selftest.log") os.symlink(log_file, "oe-selftest.log") log = logging.getLogger("selftest") log.setLevel(logging.DEBUG) fh = logging.FileHandler(filename=log_file, mode='w') fh.setLevel(logging.DEBUG) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) ch.setFormatter(formatter) log.addHandler(fh) log.addHandler(ch) return log log = logger_create() def get_args_parser(): description = "Script that runs unit tests agains bitbake and other Yocto related tools. The goal is to validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest for more information." parser = argparse_oe.ArgumentParser(description=description) group = parser.add_mutually_exclusive_group(required=True) group.add_argument('--run-tests', required=False, action='store', nargs='*', dest="run_tests", default=None, help='Select what tests to run (modules, classes or test methods). Format should be: ..') group.add_argument('--run-all-tests', required=False, action="store_true", dest="run_all_tests", default=False, help='Run all (unhidden) tests') group.add_argument('--list-modules', required=False, action="store_true", dest="list_modules", default=False, help='List all available test modules.') group.add_argument('--list-classes', required=False, action="store_true", dest="list_allclasses", default=False, help='List all available test classes.') parser.add_argument('--coverage', action="store_true", help="Run code coverage when testing") parser.add_argument('--coverage-source', dest="coverage_source", nargs="+", help="Specifiy the directories to take coverage from") parser.add_argument('--coverage-include', dest="coverage_include", nargs="+", help="Specify extra patterns to include into the coverage measurement") parser.add_argument('--coverage-omit', dest="coverage_omit", nargs="+", help="Specify with extra patterns to exclude from the coverage measurement") group.add_argument('--run-tests-by', required=False, dest='run_tests_by', default=False, nargs='*', help='run-tests-by ') group.add_argument('--list-tests-by', required=False, dest='list_tests_by', default=False, nargs='*', help='list-tests-by ') group.add_argument('--list-tests', required=False, action="store_true", dest="list_tests", default=False, help='List all available tests.') group.add_argument('--list-tags', required=False, dest='list_tags', default=False, action="store_true", help='List all tags that have been set to test cases.') parser.add_argument('--machine', required=False, dest='machine', choices=['random', 'all'], default=None, help='Run tests on different machines (random/all).') return parser def preflight_check(): log.info("Checking that everything is in order before running the tests") if not os.environ.get("BUILDDIR"): log.error("BUILDDIR isn't set. Did you forget to source your build environment setup script?") return False builddir = os.environ.get("BUILDDIR") if os.getcwd() != builddir: log.info("Changing cwd to %s" % builddir) os.chdir(builddir) if not "meta-selftest" in get_bb_var("BBLAYERS"): log.error("You don't seem to have the meta-selftest layer in BBLAYERS") return False log.info("Running bitbake -p") runCmd("bitbake -p") return True def add_include(): builddir = os.environ.get("BUILDDIR") if "#include added by oe-selftest.py" \ not in ftools.read_file(os.path.join(builddir, "conf/local.conf")): log.info("Adding: \"include selftest.inc\" in local.conf") ftools.append_file(os.path.join(builddir, "conf/local.conf"), \ "\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc") if "#include added by oe-selftest.py" \ not in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")): log.info("Adding: \"include bblayers.inc\" in bblayers.conf") ftools.append_file(os.path.join(builddir, "conf/bblayers.conf"), \ "\n#include added by oe-selftest.py\ninclude bblayers.inc") def remove_include(): builddir = os.environ.get("BUILDDIR") if builddir is None: return if "#include added by oe-selftest.py" \ in ftools.read_file(os.path.join(builddir, "conf/local.conf")): log.info("Removing the include from local.conf") ftools.remove_from_file(os.path.join(builddir, "conf/local.conf"), \ "\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc") if "#include added by oe-selftest.py" \ in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")): log.info("Removing the include from bblayers.conf") ftools.remove_from_file(os.path.join(builddir, "conf/bblayers.conf"), \ "\n#include added by oe-selftest.py\ninclude bblayers.inc") def remove_inc_files(): try: os.remove(os.path.join(os.environ.get("BUILDDIR"), "conf/selftest.inc")) for root, _, files in os.walk(get_test_layer()): for f in files: if f == 'test_recipe.inc': os.remove(os.path.join(root, f)) except (AttributeError, OSError,) as e: # AttributeError may happen if BUILDDIR is not set pass for incl_file in ['conf/bblayers.inc', 'conf/machine.inc']: try: os.remove(os.path.join(os.environ.get("BUILDDIR"), incl_file)) except: pass def get_tests_modules(include_hidden=False): modules_list = list() for modules_path in oeqa.selftest.__path__: for (p, d, f) in os.walk(modules_path): files = sorted([f for f in os.listdir(p) if f.endswith('.py') and not (f.startswith('_') and not include_hidden) and not f.startswith('__') and f != 'base.py']) for f in files: submodules = p.split("selftest")[-1] module = "" if submodules: module = 'oeqa.selftest' + submodules.replace("/",".") + "." + f.split('.py')[0] else: module = 'oeqa.selftest.' + f.split('.py')[0] if module not in modules_list: modules_list.append(module) return modules_list def get_tests(exclusive_modules=[], include_hidden=False): test_modules = list() for x in exclusive_modules: test_modules.append('oeqa.selftest.' + x) if not test_modules: inc_hidden = include_hidden test_modules = get_tests_modules(inc_hidden) return test_modules class Tc: def __init__(self, tcname, tcclass, tcmodule, tcid=None, tctag=None): self.tcname = tcname self.tcclass = tcclass self.tcmodule = tcmodule self.tcid = tcid # A test case can have multiple tags (as list or as tuples) otherwise str suffice self.tctag = tctag self.fullpath = '.'.join(['oeqa', 'selftest', tcmodule, tcclass, tcname]) def get_tests_from_module(tmod): tlist = [] prefix = 'oeqa.selftest.' try: import importlib modlib = importlib.import_module(tmod) for mod in vars(modlib).values(): if isinstance(mod, type(oeSelfTest)) and issubclass(mod, oeSelfTest) and mod is not oeSelfTest: for test in dir(mod): if test.startswith('test_') and hasattr(vars(mod)[test], '__call__'): # Get test case id and feature tag # NOTE: if testcase decorator or feature tag not set will throw error try: tid = vars(mod)[test].test_case except: print 'DEBUG: tc id missing for ' + str(test) tid = None try: ttag = vars(mod)[test].tag__feature except: # print 'DEBUG: feature tag missing for ' + str(test) ttag = None # NOTE: for some reason lstrip() doesn't work for mod.__module__ tlist.append(Tc(test, mod.__name__, mod.__module__.replace(prefix, ''), tid, ttag)) except: pass return tlist def get_all_tests(): # Get all the test modules (except the hidden ones) testlist = [] tests_modules = get_tests_modules() # Get all the tests from modules for tmod in sorted(tests_modules): testlist += get_tests_from_module(tmod) return testlist def create_testsuite_by(criteria, keyword): # Create a testsuite based on 'keyword' # criteria: name, class, module, id, tag # keyword: a list of tests, classes, modules, ids, tags # NOTE: globing would be nice? ts = set() all_tests = get_all_tests() if criteria == 'name': for tc in all_tests: if tc.tcname in keyword: ts.add(tc.fullpath) elif criteria == 'class': for tc in all_tests: if tc.tcclass in keyword: ts.add(tc.fullpath) elif criteria == 'module': for tc in all_tests: if tc.tcmodule in keyword: ts.add(tc.fullpath) elif criteria == 'id': for tc in all_tests: if str(tc.tcid) in keyword: ts.add(tc.fullpath) elif criteria == 'tag': for tc in all_tests: # tc can have multiple tags (as list or tuple) otherwise as str if isinstance(tc.tctag, (list, tuple)): for tag in tc.tctag: if str(tag) in keyword: ts.add(tc.fullpath) elif tc.tctag in keyword: ts.add(tc.fullpath) return sorted(list(ts)) def get_testsuite_by(criteria, keyword): # Get a testsuite based on 'keyword' # criteria: name, class, module, id, tag # keyword: a list of tests, classes, modules, ids, tags # NOTE: globing would be nice? ts = set() all_tests = get_all_tests() if criteria == 'name': for tc in all_tests: if tc.tcname in keyword: ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule)) elif criteria == 'class': for tc in all_tests: if tc.tcclass in keyword: ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule)) elif criteria == 'module': for tc in all_tests: if tc.tcmodule in keyword: ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule)) elif criteria == 'id': for tc in all_tests: if str(tc.tcid) in keyword: ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule)) elif criteria == 'tag': for tc in all_tests: # tc can have multiple tags (as list or tuple) otherwise as str if isinstance(tc.tctag, (list, tuple)): for tag in tc.tctag: if str(tag) in keyword: ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule)) elif str(tc.tctag) in keyword: ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule)) return sorted(list(ts)) def list_testsuite_by(criteria, keyword): # Get a testsuite based on 'keyword' # criteria: name, class, module, id, tag # keyword: a list of tests, classes, modules, ids, tags # NOTE: globing would be nice? ts = get_testsuite_by(criteria, keyword) print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % ('id', 'tag', 'name', 'class', 'module') print '_' * 150 for t in ts: if isinstance(t[1], (tuple, list)): print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t[0], ', '.join(t[1]), t[2], t[3], t[4]) else: print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % t print '_' * 150 print 'Filtering by:\t %s' % criteria print 'Looking for:\t %s' % ', '.join(str(x) for x in keyword) print 'Total found:\t %s' % len(ts) def list_tests(): # List all available oe-selftest tests ts = get_all_tests() print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % ('id', 'tag', 'name', 'class', 'module') print '_' * 150 for t in ts: if isinstance(t.tctag, (tuple, list)): print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t.tcid, ', '.join(t.tctag), t.tcname, t.tcclass, t.tcmodule) else: print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t.tcid, t.tctag, t.tcname, t.tcclass, t.tcmodule) print '_' * 150 print 'Total found:\t %s' % len(ts) def list_tags(): # Get all tags set to test cases # This is useful when setting tags to test cases # The list of tags should be kept as minimal as possible tags = set() all_tests = get_all_tests() for tc in all_tests: if isinstance(tc.tctag, (tuple, list)): tags.update(set(tc.tctag)) else: tags.add(tc.tctag) print 'Tags:\t%s' % ', '.join(str(x) for x in tags) def coverage_setup(run_tests, run_all_tests, coverage_source, coverage_include, coverage_omit): """ Set up the coverage measurement for the testcases to be run """ builddir = os.environ.get("BUILDDIR") coveragerc = "%s/.coveragerc" % builddir data_file = "%s/.coverage." % builddir data_file += ((run_tests and '.'.join(run_tests)) or (run_all_tests and "all_tests") or "") if os.path.isfile(data_file): os.remove(data_file) with open(coveragerc, 'w') as cps: cps.write("[run]\n") cps.write("data_file = %s\n" % data_file) cps.write("branch = True\n") # Measure just BBLAYERS, scripts and bitbake folders cps.write("source = \n") if coverage_source: for directory in coverage_source: if not os.path.isdir(directory): log.warn("Directory %s is not valid.", directory) cps.write(" %s\n" % directory) else: for layer in get_bb_var('BBLAYERS').split(): cps.write(" %s\n" % layer) cps.write(" %s\n" % os.path.dirname(os.path.realpath(__file__))) cps.write(" %s\n" % os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),'bitbake')) if coverage_include: cps.write("include = \n") for pattern in coverage_include: cps.write(" %s\n" % pattern) if coverage_omit: cps.write("omit = \n") for pattern in coverage_omit: cps.write(" %s\n" % pattern) return coveragerc def coverage_report(): """ Loads the coverage data gathered and reports it back """ try: # Coverage4 uses coverage.Coverage from coverage import Coverage except: # Coverage under version 4 uses coverage.coverage from coverage import coverage as Coverage import cStringIO as StringIO from coverage.misc import CoverageException cov_output = StringIO.StringIO() # Creating the coverage data with the setting from the configuration file cov = Coverage(config_file = os.environ.get('COVERAGE_PROCESS_START')) try: # Load data from the data file specified in the configuration cov.load() # Store report data in a StringIO variable cov.report(file = cov_output, show_missing=False) log.info("\n%s" % cov_output.getvalue()) except CoverageException as e: # Show problems with the reporting. Since Coverage4 not finding any data to report raises an exception log.warn("%s" % str(e)) finally: cov_output.close() def main(): parser = get_args_parser() args = parser.parse_args() # Add /lib to sys.path, so layers can add selftests log.info("Running bitbake -e to get BBPATH") bbpath = get_bb_var('BBPATH').split(':') layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)] sys.path.extend(layer_libdirs) reload(oeqa.selftest) if args.run_tests_by and len(args.run_tests_by) >= 2: valid_options = ['name', 'class', 'module', 'id', 'tag'] if args.run_tests_by[0] not in valid_options: print '--run-tests-by %s not a valid option. Choose one of .' % args.run_tests_by[0] return 1 else: criteria = args.run_tests_by[0] keyword = args.run_tests_by[1:] ts = create_testsuite_by(criteria, keyword) if args.list_tests_by and len(args.list_tests_by) >= 2: valid_options = ['name', 'class', 'module', 'id', 'tag'] if args.list_tests_by[0] not in valid_options: print '--list-tests-by %s not a valid option. Choose one of .' % args.list_tests_by[0] return 1 else: criteria = args.list_tests_by[0] keyword = args.list_tests_by[1:] list_testsuite_by(criteria, keyword) if args.list_tests: list_tests() if args.list_tags: list_tags() if args.list_allclasses: args.list_modules = True if args.list_modules: log.info('Listing all available test modules:') testslist = get_tests(include_hidden=True) for test in testslist: module = test.split('oeqa.selftest.')[-1] info = '' if module.startswith('_'): info = ' (hidden)' print module + info if args.list_allclasses: try: import importlib modlib = importlib.import_module(test) for v in vars(modlib): t = vars(modlib)[v] if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest: print " --", v for method in dir(t): if method.startswith("test_") and callable(vars(t)[method]): print " -- --", method except (AttributeError, ImportError) as e: print e pass if args.run_tests or args.run_all_tests or args.run_tests_by: if not preflight_check(): return 1 if args.run_tests_by: testslist = ts else: testslist = get_tests(exclusive_modules=(args.run_tests or []), include_hidden=False) suite = unittest.TestSuite() loader = unittest.TestLoader() loader.sortTestMethodsUsing = None runner = unittest.TextTestRunner(verbosity=2, resultclass=buildResultClass(args)) # we need to do this here, otherwise just loading the tests # will take 2 minutes (bitbake -e calls) oeSelfTest.testlayer_path = get_test_layer() for test in testslist: log.info("Loading tests from: %s" % test) try: suite.addTests(loader.loadTestsFromName(test)) except AttributeError as e: log.error("Failed to import %s" % test) log.error(e) return 1 add_include() if args.machine: # Custom machine sets only weak default values (??=) for MACHINE in machine.inc # This let test cases that require a specific MACHINE to be able to override it, using (?= or =) log.info('Custom machine mode enabled. MACHINE set to %s' % args.machine) if args.machine == 'random': os.environ['CUSTOMMACHINE'] = 'random' result = runner.run(suite) else: # all machines = get_available_machines() for m in machines: log.info('Run tests with custom MACHINE set to: %s' % m) os.environ['CUSTOMMACHINE'] = m result = runner.run(suite) else: result = runner.run(suite) log.info("Finished") if result.wasSuccessful(): return 0 else: return 1 def buildResultClass(args): """Build a Result Class to use in the testcase execution""" import site class StampedResult(unittest.TextTestResult): """ Custom TestResult that prints the time when a test starts. As oe-selftest can take a long time (ie a few hours) to run, timestamps help us understand what tests are taking a long time to execute. If coverage is required, this class executes the coverage setup and reporting. """ def startTest(self, test): import time self.stream.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " - ") super(StampedResult, self).startTest(test) def startTestRun(self): """ Setup coverage before running any testcase """ # variable holding the coverage configuration file allowing subprocess to be measured self.coveragepth = None # indicates the system if coverage is currently installed self.coverage_installed = True if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit: try: # check if user can do coverage import coverage except: log.warn('\n'.join(["python coverage is not installed", "Make sure your coverage takes into account sub-process", "More info on https://pypi.python.org/pypi/coverage"])) self.coverage_installed = False if self.coverage_installed: log.info("Coverage is enabled") # In case the user has not set the variable COVERAGE_PROCESS_START, # create a default one and export it. The COVERAGE_PROCESS_START # value indicates where the coverage configuration file resides # More info on https://pypi.python.org/pypi/coverage if not os.environ.get('COVERAGE_PROCESS_START'): os.environ['COVERAGE_PROCESS_START'] = coverage_setup(args.run_tests, args.run_all_tests, args.coverage_source, args.coverage_include, args.coverage_omit) # Use default site.USER_SITE and write corresponding config file site.ENABLE_USER_SITE = True if not os.path.exists(site.USER_SITE): os.makedirs(site.USER_SITE) self.coveragepth = os.path.join(site.USER_SITE, "coverage.pth") with open(self.coveragepth, 'w') as cps: cps.write('import sys,site; sys.path.extend(site.getsitepackages()); import coverage; coverage.process_startup();') def stopTestRun(self): """ Report coverage data after the testcases are run """ if args.coverage and self.coverage_installed: with open(os.environ['COVERAGE_PROCESS_START']) as ccf: log.info("Coverage configuration file (%s)" % os.environ.get('COVERAGE_PROCESS_START')) log.info("===========================") log.info("\n%s" % "".join(ccf.readlines())) log.info("Coverage Report") log.info("===============") try: coverage_report() # remove the pth file finally: try: os.remove(self.coveragepth) except OSError: log.warn("Expected temporal file from coverage is missing, ignoring removal.") return StampedResult if __name__ == "__main__": try: ret = main() except Exception: ret = 1 import traceback traceback.print_exc(5) finally: remove_include() remove_inc_files() sys.exit(ret)