diff options
Diffstat (limited to 'meta/lib/oeqa/core/utils/concurrencytest.py')
-rw-r--r-- | meta/lib/oeqa/core/utils/concurrencytest.py | 336 |
1 files changed, 336 insertions, 0 deletions
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py new file mode 100644 index 0000000000..d10f8f7f04 --- /dev/null +++ b/meta/lib/oeqa/core/utils/concurrencytest.py @@ -0,0 +1,336 @@ +#!/usr/bin/env python3 +# +# Copyright OpenEmbedded Contributors +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Modified for use in OE by Richard Purdie, 2018 +# +# Modified by: Corey Goldberg, 2013 +# License: GPLv2+ +# +# Original code from: +# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) +# Copyright (C) 2005-2011 Canonical Ltd +# License: GPLv2+ + +import os +import sys +import traceback +import unittest +import subprocess +import testtools +import threading +import time +import io +import json +import subunit + +from queue import Queue +from itertools import cycle +from subunit import ProtocolTestCase, TestProtocolClient +from subunit.test_results import AutoTimingTestResultDecorator +from testtools import ThreadsafeForwardingResult, iterate_tests +from testtools.content import Content +from testtools.content_type import ContentType +from oeqa.utils.commands import get_test_layer + +import bb.utils +import oe.path + +_all__ = [ + 'ConcurrentTestSuite', + 'fork_for_tests', + 'partition_tests', +] + +# +# Patch the version from testtools to allow access to _test_start and allow +# computation of timing information and threading progress +# +class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): + + def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult): + super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) + self.threadnum = threadnum + self.totalinprocess = totalinprocess + self.totaltests = totaltests + self.buffer = True + self.outputbuf = output + self.finalresult = finalresult + self.finalresult.buffer = True + self.target = target + + def _add_result_with_semaphore(self, method, test, *args, **kwargs): + self.semaphore.acquire() + try: + if self._test_start: + self.result.starttime[test.id()] = self._test_start.timestamp() + self.result.threadprogress[self.threadnum].append(test.id()) + totalprogress = sum(len(x) for x in self.result.threadprogress.values()) + self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s failed) (%s)" % ( + self.threadnum, + len(self.result.threadprogress[self.threadnum]), + self.totalinprocess, + totalprogress, + self.totaltests, + "{0:.2f}".format(time.time()-self._test_start.timestamp()), + self.target.failed_tests, + test.id()) + finally: + self.semaphore.release() + self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8")) + self.finalresult._stdout_buffer = io.StringIO() + super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) + +class ProxyTestResult: + # a very basic TestResult proxy, in order to modify add* calls + def __init__(self, target): + self.result = target + self.failed_tests = 0 + + def _addResult(self, method, test, *args, exception = False, **kwargs): + return method(test, *args, **kwargs) + + def addError(self, test, err = None, **kwargs): + self.failed_tests += 1 + self._addResult(self.result.addError, test, err, exception = True, **kwargs) + + def addFailure(self, test, err = None, **kwargs): + self.failed_tests += 1 + self._addResult(self.result.addFailure, test, err, exception = True, **kwargs) + + def addSuccess(self, test, **kwargs): + self._addResult(self.result.addSuccess, test, **kwargs) + + def addExpectedFailure(self, test, err = None, **kwargs): + self._addResult(self.result.addExpectedFailure, test, err, exception = True, **kwargs) + + def addUnexpectedSuccess(self, test, **kwargs): + self._addResult(self.result.addUnexpectedSuccess, test, **kwargs) + + def wasSuccessful(self): + return self.failed_tests == 0 + + def __getattr__(self, attr): + return getattr(self.result, attr) + +class ExtraResultsDecoderTestResult(ProxyTestResult): + def _addResult(self, method, test, *args, exception = False, **kwargs): + if "details" in kwargs and "extraresults" in kwargs["details"]: + if isinstance(kwargs["details"]["extraresults"], Content): + kwargs = kwargs.copy() + kwargs["details"] = kwargs["details"].copy() + extraresults = kwargs["details"]["extraresults"] + data = bytearray() + for b in extraresults.iter_bytes(): + data += b + extraresults = json.loads(data.decode()) + kwargs["details"]["extraresults"] = extraresults + return method(test, *args, **kwargs) + +class ExtraResultsEncoderTestResult(ProxyTestResult): + def _addResult(self, method, test, *args, exception = False, **kwargs): + if hasattr(test, "extraresults"): + extras = lambda : [json.dumps(test.extraresults).encode()] + kwargs = kwargs.copy() + if "details" not in kwargs: + kwargs["details"] = {} + else: + kwargs["details"] = kwargs["details"].copy() + kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras) + # if using details, need to encode any exceptions into the details obj, + # testtools does not handle "err" and "details" together. + if "details" in kwargs and exception and (len(args) >= 1 and args[0] is not None): + kwargs["details"]["traceback"] = testtools.content.TracebackContent(args[0], test) + args = [] + return method(test, *args, **kwargs) + +# +# We have to patch subunit since it doesn't understand how to handle addError +# outside of a running test case. This can happen if classSetUp() fails +# for a class of tests. This unfortunately has horrible internal knowledge. +# +def outSideTestaddError(self, offset, line): + """An 'error:' directive has been read.""" + test_name = line[offset:-1].decode('utf8') + self.parser._current_test = subunit.RemotedTestCase(test_name) + self.parser.current_test_description = test_name + self.parser._state = self.parser._reading_error_details + self.parser._reading_error_details.set_simple() + self.parser.subunitLineReceived(line) + +subunit._OutSideTest.addError = outSideTestaddError + +# Like outSideTestaddError above, we need an equivalent for skips +# happening at the setUpClass() level, otherwise we will see "UNKNOWN" +# as a result for concurrent tests +# +def outSideTestaddSkip(self, offset, line): + """A 'skip:' directive has been read.""" + test_name = line[offset:-1].decode('utf8') + self.parser._current_test = subunit.RemotedTestCase(test_name) + self.parser.current_test_description = test_name + self.parser._state = self.parser._reading_skip_details + self.parser._reading_skip_details.set_simple() + self.parser.subunitLineReceived(line) + +subunit._OutSideTest.addSkip = outSideTestaddSkip + +# +# A dummy structure to add to io.StringIO so that the .buffer object +# is available and accepts writes. This allows unittest with buffer=True +# to interact ok with subunit which wants to access sys.stdout.buffer. +# +class dummybuf(object): + def __init__(self, parent): + self.p = parent + def write(self, data): + self.p.write(data.decode("utf-8")) + +# +# Taken from testtools.ConncurrencyTestSuite but modified for OE use +# +class ConcurrentTestSuite(unittest.TestSuite): + + def __init__(self, suite, processes, setupfunc, removefunc, bb_vars): + super(ConcurrentTestSuite, self).__init__([suite]) + self.processes = processes + self.setupfunc = setupfunc + self.removefunc = removefunc + self.bb_vars = bb_vars + + def run(self, result): + testservers, totaltests = fork_for_tests(self.processes, self) + try: + threads = {} + queue = Queue() + semaphore = threading.Semaphore(1) + result.threadprogress = {} + for i, (testserver, testnum, output) in enumerate(testservers): + result.threadprogress[i] = [] + process_result = BBThreadsafeForwardingResult( + ExtraResultsDecoderTestResult(result), + semaphore, i, testnum, totaltests, output, result) + reader_thread = threading.Thread( + target=self._run_test, args=(testserver, process_result, queue)) + threads[testserver] = reader_thread, process_result + reader_thread.start() + while threads: + finished_test = queue.get() + threads[finished_test][0].join() + del threads[finished_test] + except: + for thread, process_result in threads.values(): + process_result.stop() + raise + finally: + for testserver in testservers: + testserver[0]._stream.close() + + def _run_test(self, testserver, process_result, queue): + try: + try: + testserver.run(process_result) + except Exception: + # The run logic itself failed + case = testtools.ErrorHolder( + "broken-runner", + error=sys.exc_info()) + case.run(process_result) + finally: + queue.put(testserver) + +def fork_for_tests(concurrency_num, suite): + testservers = [] + if 'BUILDDIR' in os.environ: + selftestdir = get_test_layer(suite.bb_vars['BBLAYERS']) + + test_blocks = partition_tests(suite, concurrency_num) + # Clear the tests from the original suite so it doesn't keep them alive + suite._tests[:] = [] + totaltests = sum(len(x) for x in test_blocks) + for process_tests in test_blocks: + numtests = len(process_tests) + process_suite = unittest.TestSuite(process_tests) + # Also clear each split list so new suite has only reference + process_tests[:] = [] + c2pread, c2pwrite = os.pipe() + # Clear buffers before fork to avoid duplicate output + sys.stdout.flush() + sys.stderr.flush() + pid = os.fork() + if pid == 0: + ourpid = os.getpid() + try: + newbuilddir = None + stream = os.fdopen(c2pwrite, 'wb') + os.close(c2pread) + + (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite) + + # Leave stderr and stdout open so we can see test noise + # Close stdin so that the child goes away if it decides to + # read from stdin (otherwise its a roulette to see what + # child actually gets keystrokes for pdb etc). + newsi = os.open(os.devnull, os.O_RDWR) + os.dup2(newsi, sys.stdin.fileno()) + + # Send stdout/stderr over the stream + os.dup2(c2pwrite, sys.stdout.fileno()) + os.dup2(c2pwrite, sys.stderr.fileno()) + + subunit_client = TestProtocolClient(stream) + subunit_result = AutoTimingTestResultDecorator(subunit_client) + unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result)) + if ourpid != os.getpid(): + os._exit(0) + if newbuilddir and unittest_result.wasSuccessful(): + suite.removefunc(newbuilddir) + except: + # Don't do anything with process children + if ourpid != os.getpid(): + os._exit(1) + # Try and report traceback on stream, but exit with error + # even if stream couldn't be created or something else + # goes wrong. The traceback is formatted to a string and + # written in one go to avoid interleaving lines from + # multiple failing children. + try: + stream.write(traceback.format_exc().encode('utf-8')) + except: + sys.stderr.write(traceback.format_exc()) + finally: + if newbuilddir: + suite.removefunc(newbuilddir) + stream.flush() + os._exit(1) + stream.flush() + os._exit(0) + else: + os.close(c2pwrite) + stream = os.fdopen(c2pread, 'rb') + # Collect stdout/stderr into an io buffer + output = io.BytesIO() + testserver = ProtocolTestCase(stream, passthrough=output) + testservers.append((testserver, numtests, output)) + return testservers, totaltests + +def partition_tests(suite, count): + # Keep tests from the same class together but allow tests from modules + # to go to different processes to aid parallelisation. + modules = {} + for test in iterate_tests(suite): + m = test.__module__ + "." + test.__class__.__name__ + if m not in modules: + modules[m] = [] + modules[m].append(test) + + # Simply divide the test blocks between the available processes + partitions = [list() for _ in range(count)] + for partition, m in zip(cycle(partitions), modules): + partition.extend(modules[m]) + + # No point in empty threads so drop them + return [p for p in partitions if p] + |