diff options
Diffstat (limited to 'meta/lib/oeqa/core/utils/concurrencytest.py')
-rw-r--r-- | meta/lib/oeqa/core/utils/concurrencytest.py | 155 |
1 files changed, 73 insertions, 82 deletions
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py index 6293cf94ec..161a2f6e90 100644 --- a/meta/lib/oeqa/core/utils/concurrencytest.py +++ b/meta/lib/oeqa/core/utils/concurrencytest.py @@ -48,11 +48,15 @@ _all__ = [ # class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): - def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests): + def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult): super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) self.threadnum = threadnum self.totalinprocess = totalinprocess self.totaltests = totaltests + self.buffer = True + self.outputbuf = output + self.finalresult = finalresult + self.finalresult.buffer = True def _add_result_with_semaphore(self, method, test, *args, **kwargs): self.semaphore.acquire() @@ -71,36 +75,44 @@ class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): test.id()) finally: self.semaphore.release() + self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8")) + self.finalresult._stdout_buffer = io.StringIO() super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) class ProxyTestResult: # a very basic TestResult proxy, in order to modify add* calls def __init__(self, target): self.result = target + self.failed_tests = 0 - def _addResult(self, method, test, *args, **kwargs): + def _addResult(self, method, test, *args, exception = False, **kwargs): return method(test, *args, **kwargs) - def addError(self, test, *args, **kwargs): - self._addResult(self.result.addError, test, *args, **kwargs) + def addError(self, test, err = None, **kwargs): + self.failed_tests += 1 + self._addResult(self.result.addError, test, err, exception = True, **kwargs) - def addFailure(self, test, *args, **kwargs): - self._addResult(self.result.addFailure, test, *args, **kwargs) + def addFailure(self, test, err = None, **kwargs): + self.failed_tests += 1 + self._addResult(self.result.addFailure, test, err, exception = True, **kwargs) - def addSuccess(self, test, *args, **kwargs): - self._addResult(self.result.addSuccess, test, *args, **kwargs) + def addSuccess(self, test, **kwargs): + self._addResult(self.result.addSuccess, test, **kwargs) - def addExpectedFailure(self, test, *args, **kwargs): - self._addResult(self.result.addExpectedFailure, test, *args, **kwargs) + def addExpectedFailure(self, test, err = None, **kwargs): + self._addResult(self.result.addExpectedFailure, test, err, exception = True, **kwargs) - def addUnexpectedSuccess(self, test, *args, **kwargs): - self._addResult(self.result.addUnexpectedSuccess, test, *args, **kwargs) + def addUnexpectedSuccess(self, test, **kwargs): + self._addResult(self.result.addUnexpectedSuccess, test, **kwargs) + + def wasSuccessful(self): + return self.failed_tests == 0 def __getattr__(self, attr): return getattr(self.result, attr) class ExtraResultsDecoderTestResult(ProxyTestResult): - def _addResult(self, method, test, *args, **kwargs): + def _addResult(self, method, test, *args, exception = False, **kwargs): if "details" in kwargs and "extraresults" in kwargs["details"]: if isinstance(kwargs["details"]["extraresults"], Content): kwargs = kwargs.copy() @@ -114,7 +126,7 @@ class ExtraResultsDecoderTestResult(ProxyTestResult): return method(test, *args, **kwargs) class ExtraResultsEncoderTestResult(ProxyTestResult): - def _addResult(self, method, test, *args, **kwargs): + def _addResult(self, method, test, *args, exception = False, **kwargs): if hasattr(test, "extraresults"): extras = lambda : [json.dumps(test.extraresults).encode()] kwargs = kwargs.copy() @@ -123,6 +135,11 @@ class ExtraResultsEncoderTestResult(ProxyTestResult): else: kwargs["details"] = kwargs["details"].copy() kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras) + # if using details, need to encode any exceptions into the details obj, + # testtools does not handle "err" and "details" together. + if "details" in kwargs and exception and (len(args) >= 1 and args[0] is not None): + kwargs["details"]["traceback"] = testtools.content.TracebackContent(args[0], test) + args = [] return method(test, *args, **kwargs) # @@ -141,6 +158,20 @@ def outSideTestaddError(self, offset, line): subunit._OutSideTest.addError = outSideTestaddError +# Like outSideTestaddError above, we need an equivalent for skips +# happening at the setUpClass() level, otherwise we will see "UNKNOWN" +# as a result for concurrent tests +# +def outSideTestaddSkip(self, offset, line): + """A 'skip:' directive has been read.""" + test_name = line[offset:-1].decode('utf8') + self.parser._current_test = subunit.RemotedTestCase(test_name) + self.parser.current_test_description = test_name + self.parser._state = self.parser._reading_skip_details + self.parser._reading_skip_details.set_simple() + self.parser.subunitLineReceived(line) + +subunit._OutSideTest.addSkip = outSideTestaddSkip # # A dummy structure to add to io.StringIO so that the .buffer object @@ -158,33 +189,27 @@ class dummybuf(object): # class ConcurrentTestSuite(unittest.TestSuite): - def __init__(self, suite, processes): + def __init__(self, suite, processes, setupfunc, removefunc): super(ConcurrentTestSuite, self).__init__([suite]) self.processes = processes + self.setupfunc = setupfunc + self.removefunc = removefunc def run(self, result): - tests, totaltests = fork_for_tests(self.processes, self) + testservers, totaltests = fork_for_tests(self.processes, self) try: threads = {} queue = Queue() semaphore = threading.Semaphore(1) result.threadprogress = {} - for i, (test, testnum) in enumerate(tests): + for i, (testserver, testnum, output) in enumerate(testservers): result.threadprogress[i] = [] process_result = BBThreadsafeForwardingResult( ExtraResultsDecoderTestResult(result), - semaphore, i, testnum, totaltests) - # Force buffering of stdout/stderr so the console doesn't get corrupted by test output - # as per default in parent code - process_result.buffer = True - # We have to add a buffer object to stdout to keep subunit happy - process_result._stderr_buffer = io.StringIO() - process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer) - process_result._stdout_buffer = io.StringIO() - process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer) + semaphore, i, testnum, totaltests, output, result) reader_thread = threading.Thread( - target=self._run_test, args=(test, process_result, queue)) - threads[test] = reader_thread, process_result + target=self._run_test, args=(testserver, process_result, queue)) + threads[testserver] = reader_thread, process_result reader_thread.start() while threads: finished_test = queue.get() @@ -195,13 +220,13 @@ class ConcurrentTestSuite(unittest.TestSuite): process_result.stop() raise finally: - for test in tests: - test[0]._stream.close() + for testserver in testservers: + testserver[0]._stream.close() - def _run_test(self, test, process_result, queue): + def _run_test(self, testserver, process_result, queue): try: try: - test.run(process_result) + testserver.run(process_result) except Exception: # The run logic itself failed case = testtools.ErrorHolder( @@ -209,17 +234,10 @@ class ConcurrentTestSuite(unittest.TestSuite): error=sys.exc_info()) case.run(process_result) finally: - queue.put(test) - -def removebuilddir(d): - delay = 5 - while delay and os.path.exists(d + "/bitbake.lock"): - time.sleep(1) - delay = delay - 1 - bb.utils.prunedir(d, ionice=True) + queue.put(testserver) def fork_for_tests(concurrency_num, suite): - result = [] + testservers = [] if 'BUILDDIR' in os.environ: selftestdir = get_test_layer() @@ -244,37 +262,7 @@ def fork_for_tests(concurrency_num, suite): stream = os.fdopen(c2pwrite, 'wb', 1) os.close(c2pread) - # Create a new separate BUILDDIR for each group of tests - if 'BUILDDIR' in os.environ: - builddir = os.environ['BUILDDIR'] - newbuilddir = builddir + "-st-" + str(ourpid) - newselftestdir = newbuilddir + "/meta-selftest" - - bb.utils.mkdirhier(newbuilddir) - oe.path.copytree(builddir + "/conf", newbuilddir + "/conf") - oe.path.copytree(builddir + "/cache", newbuilddir + "/cache") - oe.path.copytree(selftestdir, newselftestdir) - - for e in os.environ: - if builddir in os.environ[e]: - os.environ[e] = os.environ[e].replace(builddir, newbuilddir) - - subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True) - - # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow - subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True) - - os.chdir(newbuilddir) - - for t in process_suite: - if not hasattr(t, "tc"): - continue - cp = t.tc.config_paths - for p in cp: - if selftestdir in cp[p] and newselftestdir not in cp[p]: - cp[p] = cp[p].replace(selftestdir, newselftestdir) - if builddir in cp[p] and newbuilddir not in cp[p]: - cp[p] = cp[p].replace(builddir, newbuilddir) + (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite) # Leave stderr and stdout open so we can see test noise # Close stdin so that the child goes away if it decides to @@ -283,16 +271,17 @@ def fork_for_tests(concurrency_num, suite): newsi = os.open(os.devnull, os.O_RDWR) os.dup2(newsi, sys.stdin.fileno()) + # Send stdout/stderr over the stream + os.dup2(c2pwrite, sys.stdout.fileno()) + os.dup2(c2pwrite, sys.stderr.fileno()) + subunit_client = TestProtocolClient(stream) - # Force buffering of stdout/stderr so the console doesn't get corrupted by test output - # as per default in parent code - subunit_client.buffer = True subunit_result = AutoTimingTestResultDecorator(subunit_client) - process_suite.run(ExtraResultsEncoderTestResult(subunit_result)) + unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result)) if ourpid != os.getpid(): os._exit(0) - if newbuilddir: - removebuilddir(newbuilddir) + if newbuilddir and unittest_result.wasSuccessful(): + suite.removefunc(newbuilddir) except: # Don't do anything with process children if ourpid != os.getpid(): @@ -308,7 +297,7 @@ def fork_for_tests(concurrency_num, suite): sys.stderr.write(traceback.format_exc()) finally: if newbuilddir: - removebuilddir(newbuilddir) + suite.removefunc(newbuilddir) stream.flush() os._exit(1) stream.flush() @@ -316,9 +305,11 @@ def fork_for_tests(concurrency_num, suite): else: os.close(c2pwrite) stream = os.fdopen(c2pread, 'rb', 1) - test = ProtocolTestCase(stream) - result.append((test, numtests)) - return result, totaltests + # Collect stdout/stderr into an io buffer + output = io.BytesIO() + testserver = ProtocolTestCase(stream, passthrough=output) + testservers.append((testserver, numtests, output)) + return testservers, totaltests def partition_tests(suite, count): # Keep tests from the same class together but allow tests from modules |