diff options
Diffstat (limited to 'meta/lib/oeqa/utils/decorators.py')
-rw-r--r-- | meta/lib/oeqa/utils/decorators.py | 130 |
1 files changed, 40 insertions, 90 deletions
diff --git a/meta/lib/oeqa/utils/decorators.py b/meta/lib/oeqa/utils/decorators.py index 0d79223a29..ea90164e5e 100644 --- a/meta/lib/oeqa/utils/decorators.py +++ b/meta/lib/oeqa/utils/decorators.py @@ -1,6 +1,8 @@ +# # Copyright (C) 2013 Intel Corporation # -# Released under the MIT license (see COPYING.MIT) +# SPDX-License-Identifier: MIT +# # Some custom decorators that can be used by unittests # Most useful is skipUnlessPassed which can be used for @@ -14,94 +16,12 @@ import threading import signal from functools import wraps -#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame -class getResults(object): - def __init__(self): - #dynamically determine the unittest.case frame and use it to get the name of the test method - ident = threading.current_thread().ident - upperf = sys._current_frames()[ident] - while (upperf.f_globals['__name__'] != 'unittest.case'): - upperf = upperf.f_back - - def handleList(items): - ret = [] - # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception()) - for i in items: - s = i[0].id() - #Handle the _ErrorHolder objects from skipModule failures - if "setUpModule (" in s: - ret.append(s.replace("setUpModule (", "").replace(")","")) - else: - ret.append(s) - # Append also the test without the full path - testname = s.split('.')[-1] - if testname: - ret.append(testname) - return ret - self.faillist = handleList(upperf.f_locals['result'].failures) - self.errorlist = handleList(upperf.f_locals['result'].errors) - self.skiplist = handleList(upperf.f_locals['result'].skipped) - - def getFailList(self): - return self.faillist - - def getErrorList(self): - return self.errorlist - - def getSkipList(self): - return self.skiplist - -class skipIfFailure(object): - - def __init__(self,testcase): - self.testcase = testcase - - def __call__(self,f): - def wrapped_f(*args, **kwargs): - res = getResults() - if self.testcase in (res.getFailList() or res.getErrorList()): - raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) - return f(*args, **kwargs) - wrapped_f.__name__ = f.__name__ - return wrapped_f - -class skipIfSkipped(object): - - def __init__(self,testcase): - self.testcase = testcase - - def __call__(self,f): - def wrapped_f(*args, **kwargs): - res = getResults() - if self.testcase in res.getSkipList(): - raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) - return f(*args, **kwargs) - wrapped_f.__name__ = f.__name__ - return wrapped_f - -class skipUnlessPassed(object): - - def __init__(self,testcase): - self.testcase = testcase - - def __call__(self,f): - def wrapped_f(*args, **kwargs): - res = getResults() - if self.testcase in res.getSkipList() or \ - self.testcase in res.getFailList() or \ - self.testcase in res.getErrorList(): - raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) - return f(*args, **kwargs) - wrapped_f.__name__ = f.__name__ - wrapped_f._depends_on = self.testcase - return wrapped_f - class testcase(object): - def __init__(self, test_case): self.test_case = test_case def __call__(self, func): + @wraps(func) def wrapped_f(*args, **kwargs): return func(*args, **kwargs) wrapped_f.test_case = self.test_case @@ -112,6 +32,8 @@ class NoParsingFilter(logging.Filter): def filter(self, record): return record.levelno == 100 +import inspect + def LogResults(original_class): orig_method = original_class.run @@ -121,6 +43,19 @@ def LogResults(original_class): logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log') linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log') + def get_class_that_defined_method(meth): + if inspect.ismethod(meth): + for cls in inspect.getmro(meth.__self__.__class__): + if cls.__dict__.get(meth.__name__) is meth: + return cls + meth = meth.__func__ # fallback to __qualname__ parsing + if inspect.isfunction(meth): + cls = getattr(inspect.getmodule(meth), + meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0]) + if isinstance(cls, type): + return cls + return None + #rewrite the run method of unittest.TestCase to add testcase logging def run(self, result, *args, **kws): orig_method(self, result, *args, **kws) @@ -132,7 +67,7 @@ def LogResults(original_class): except AttributeError: test_case = self._testMethodName - class_name = str(testMethod.im_class).split("'")[1] + class_name = str(get_class_that_defined_method(testMethod)).split("'")[1] #create custom logging level for filtering. custom_log_level = 100 @@ -154,27 +89,42 @@ def LogResults(original_class): #check status of tests and record it + tcid = self.id() for (name, msg) in result.errors: - if (self._testMethodName == str(name).split(' ')[0]) and (class_name in str(name).split(' ')[1]): + if tcid == name.id(): local_log.results("Testcase "+str(test_case)+": ERROR") local_log.results("Testcase "+str(test_case)+":\n"+msg) passed = False for (name, msg) in result.failures: - if (self._testMethodName == str(name).split(' ')[0]) and (class_name in str(name).split(' ')[1]): + if tcid == name.id(): local_log.results("Testcase "+str(test_case)+": FAILED") local_log.results("Testcase "+str(test_case)+":\n"+msg) passed = False for (name, msg) in result.skipped: - if (self._testMethodName == str(name).split(' ')[0]) and (class_name in str(name).split(' ')[1]): + if tcid == name.id(): local_log.results("Testcase "+str(test_case)+": SKIPPED") passed = False if passed: local_log.results("Testcase "+str(test_case)+": PASSED") + # XXX: In order to avoid race condition when test if exists the linkfile + # use bb.utils.lock, the best solution is to create a unique name for the + # link file. + try: + import bb + has_bb = True + lockfilename = linkfile + '.lock' + except ImportError: + has_bb = False + + if has_bb: + lf = bb.utils.lockfile(lockfilename, block=True) # Create symlink to the current log - if os.path.exists(linkfile): + if os.path.lexists(linkfile): os.remove(linkfile) os.symlink(logfile, linkfile) + if has_bb: + bb.utils.unlockfile(lf) original_class.run = run @@ -212,7 +162,7 @@ def tag(*args, **kwargs): def wrap_ob(ob): for name in args: setattr(ob, __tag_prefix + name, True) - for name, value in kwargs.iteritems(): + for name, value in kwargs.items(): setattr(ob, __tag_prefix + name, value) return ob return wrap_ob |