summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/core/utils/concurrencytest.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oeqa/core/utils/concurrencytest.py')
-rw-r--r--meta/lib/oeqa/core/utils/concurrencytest.py336
1 files changed, 336 insertions, 0 deletions
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
new file mode 100644
index 0000000000..d10f8f7f04
--- /dev/null
+++ b/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -0,0 +1,336 @@
+#!/usr/bin/env python3
+#
+# Copyright OpenEmbedded Contributors
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Modified for use in OE by Richard Purdie, 2018
+#
+# Modified by: Corey Goldberg, 2013
+# License: GPLv2+
+#
+# Original code from:
+# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
+# Copyright (C) 2005-2011 Canonical Ltd
+# License: GPLv2+
+
+import os
+import sys
+import traceback
+import unittest
+import subprocess
+import testtools
+import threading
+import time
+import io
+import json
+import subunit
+
+from queue import Queue
+from itertools import cycle
+from subunit import ProtocolTestCase, TestProtocolClient
+from subunit.test_results import AutoTimingTestResultDecorator
+from testtools import ThreadsafeForwardingResult, iterate_tests
+from testtools.content import Content
+from testtools.content_type import ContentType
+from oeqa.utils.commands import get_test_layer
+
+import bb.utils
+import oe.path
+
+_all__ = [
+ 'ConcurrentTestSuite',
+ 'fork_for_tests',
+ 'partition_tests',
+]
+
+#
+# Patch the version from testtools to allow access to _test_start and allow
+# computation of timing information and threading progress
+#
+class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
+
+ def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult):
+ super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
+ self.threadnum = threadnum
+ self.totalinprocess = totalinprocess
+ self.totaltests = totaltests
+ self.buffer = True
+ self.outputbuf = output
+ self.finalresult = finalresult
+ self.finalresult.buffer = True
+ self.target = target
+
+ def _add_result_with_semaphore(self, method, test, *args, **kwargs):
+ self.semaphore.acquire()
+ try:
+ if self._test_start:
+ self.result.starttime[test.id()] = self._test_start.timestamp()
+ self.result.threadprogress[self.threadnum].append(test.id())
+ totalprogress = sum(len(x) for x in self.result.threadprogress.values())
+ self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s failed) (%s)" % (
+ self.threadnum,
+ len(self.result.threadprogress[self.threadnum]),
+ self.totalinprocess,
+ totalprogress,
+ self.totaltests,
+ "{0:.2f}".format(time.time()-self._test_start.timestamp()),
+ self.target.failed_tests,
+ test.id())
+ finally:
+ self.semaphore.release()
+ self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8"))
+ self.finalresult._stdout_buffer = io.StringIO()
+ super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
+
+class ProxyTestResult:
+ # a very basic TestResult proxy, in order to modify add* calls
+ def __init__(self, target):
+ self.result = target
+ self.failed_tests = 0
+
+ def _addResult(self, method, test, *args, exception = False, **kwargs):
+ return method(test, *args, **kwargs)
+
+ def addError(self, test, err = None, **kwargs):
+ self.failed_tests += 1
+ self._addResult(self.result.addError, test, err, exception = True, **kwargs)
+
+ def addFailure(self, test, err = None, **kwargs):
+ self.failed_tests += 1
+ self._addResult(self.result.addFailure, test, err, exception = True, **kwargs)
+
+ def addSuccess(self, test, **kwargs):
+ self._addResult(self.result.addSuccess, test, **kwargs)
+
+ def addExpectedFailure(self, test, err = None, **kwargs):
+ self._addResult(self.result.addExpectedFailure, test, err, exception = True, **kwargs)
+
+ def addUnexpectedSuccess(self, test, **kwargs):
+ self._addResult(self.result.addUnexpectedSuccess, test, **kwargs)
+
+ def wasSuccessful(self):
+ return self.failed_tests == 0
+
+ def __getattr__(self, attr):
+ return getattr(self.result, attr)
+
+class ExtraResultsDecoderTestResult(ProxyTestResult):
+ def _addResult(self, method, test, *args, exception = False, **kwargs):
+ if "details" in kwargs and "extraresults" in kwargs["details"]:
+ if isinstance(kwargs["details"]["extraresults"], Content):
+ kwargs = kwargs.copy()
+ kwargs["details"] = kwargs["details"].copy()
+ extraresults = kwargs["details"]["extraresults"]
+ data = bytearray()
+ for b in extraresults.iter_bytes():
+ data += b
+ extraresults = json.loads(data.decode())
+ kwargs["details"]["extraresults"] = extraresults
+ return method(test, *args, **kwargs)
+
+class ExtraResultsEncoderTestResult(ProxyTestResult):
+ def _addResult(self, method, test, *args, exception = False, **kwargs):
+ if hasattr(test, "extraresults"):
+ extras = lambda : [json.dumps(test.extraresults).encode()]
+ kwargs = kwargs.copy()
+ if "details" not in kwargs:
+ kwargs["details"] = {}
+ else:
+ kwargs["details"] = kwargs["details"].copy()
+ kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras)
+ # if using details, need to encode any exceptions into the details obj,
+ # testtools does not handle "err" and "details" together.
+ if "details" in kwargs and exception and (len(args) >= 1 and args[0] is not None):
+ kwargs["details"]["traceback"] = testtools.content.TracebackContent(args[0], test)
+ args = []
+ return method(test, *args, **kwargs)
+
+#
+# We have to patch subunit since it doesn't understand how to handle addError
+# outside of a running test case. This can happen if classSetUp() fails
+# for a class of tests. This unfortunately has horrible internal knowledge.
+#
+def outSideTestaddError(self, offset, line):
+ """An 'error:' directive has been read."""
+ test_name = line[offset:-1].decode('utf8')
+ self.parser._current_test = subunit.RemotedTestCase(test_name)
+ self.parser.current_test_description = test_name
+ self.parser._state = self.parser._reading_error_details
+ self.parser._reading_error_details.set_simple()
+ self.parser.subunitLineReceived(line)
+
+subunit._OutSideTest.addError = outSideTestaddError
+
+# Like outSideTestaddError above, we need an equivalent for skips
+# happening at the setUpClass() level, otherwise we will see "UNKNOWN"
+# as a result for concurrent tests
+#
+def outSideTestaddSkip(self, offset, line):
+ """A 'skip:' directive has been read."""
+ test_name = line[offset:-1].decode('utf8')
+ self.parser._current_test = subunit.RemotedTestCase(test_name)
+ self.parser.current_test_description = test_name
+ self.parser._state = self.parser._reading_skip_details
+ self.parser._reading_skip_details.set_simple()
+ self.parser.subunitLineReceived(line)
+
+subunit._OutSideTest.addSkip = outSideTestaddSkip
+
+#
+# A dummy structure to add to io.StringIO so that the .buffer object
+# is available and accepts writes. This allows unittest with buffer=True
+# to interact ok with subunit which wants to access sys.stdout.buffer.
+#
+class dummybuf(object):
+ def __init__(self, parent):
+ self.p = parent
+ def write(self, data):
+ self.p.write(data.decode("utf-8"))
+
+#
+# Taken from testtools.ConncurrencyTestSuite but modified for OE use
+#
+class ConcurrentTestSuite(unittest.TestSuite):
+
+ def __init__(self, suite, processes, setupfunc, removefunc, bb_vars):
+ super(ConcurrentTestSuite, self).__init__([suite])
+ self.processes = processes
+ self.setupfunc = setupfunc
+ self.removefunc = removefunc
+ self.bb_vars = bb_vars
+
+ def run(self, result):
+ testservers, totaltests = fork_for_tests(self.processes, self)
+ try:
+ threads = {}
+ queue = Queue()
+ semaphore = threading.Semaphore(1)
+ result.threadprogress = {}
+ for i, (testserver, testnum, output) in enumerate(testservers):
+ result.threadprogress[i] = []
+ process_result = BBThreadsafeForwardingResult(
+ ExtraResultsDecoderTestResult(result),
+ semaphore, i, testnum, totaltests, output, result)
+ reader_thread = threading.Thread(
+ target=self._run_test, args=(testserver, process_result, queue))
+ threads[testserver] = reader_thread, process_result
+ reader_thread.start()
+ while threads:
+ finished_test = queue.get()
+ threads[finished_test][0].join()
+ del threads[finished_test]
+ except:
+ for thread, process_result in threads.values():
+ process_result.stop()
+ raise
+ finally:
+ for testserver in testservers:
+ testserver[0]._stream.close()
+
+ def _run_test(self, testserver, process_result, queue):
+ try:
+ try:
+ testserver.run(process_result)
+ except Exception:
+ # The run logic itself failed
+ case = testtools.ErrorHolder(
+ "broken-runner",
+ error=sys.exc_info())
+ case.run(process_result)
+ finally:
+ queue.put(testserver)
+
+def fork_for_tests(concurrency_num, suite):
+ testservers = []
+ if 'BUILDDIR' in os.environ:
+ selftestdir = get_test_layer(suite.bb_vars['BBLAYERS'])
+
+ test_blocks = partition_tests(suite, concurrency_num)
+ # Clear the tests from the original suite so it doesn't keep them alive
+ suite._tests[:] = []
+ totaltests = sum(len(x) for x in test_blocks)
+ for process_tests in test_blocks:
+ numtests = len(process_tests)
+ process_suite = unittest.TestSuite(process_tests)
+ # Also clear each split list so new suite has only reference
+ process_tests[:] = []
+ c2pread, c2pwrite = os.pipe()
+ # Clear buffers before fork to avoid duplicate output
+ sys.stdout.flush()
+ sys.stderr.flush()
+ pid = os.fork()
+ if pid == 0:
+ ourpid = os.getpid()
+ try:
+ newbuilddir = None
+ stream = os.fdopen(c2pwrite, 'wb')
+ os.close(c2pread)
+
+ (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite)
+
+ # Leave stderr and stdout open so we can see test noise
+ # Close stdin so that the child goes away if it decides to
+ # read from stdin (otherwise its a roulette to see what
+ # child actually gets keystrokes for pdb etc).
+ newsi = os.open(os.devnull, os.O_RDWR)
+ os.dup2(newsi, sys.stdin.fileno())
+
+ # Send stdout/stderr over the stream
+ os.dup2(c2pwrite, sys.stdout.fileno())
+ os.dup2(c2pwrite, sys.stderr.fileno())
+
+ subunit_client = TestProtocolClient(stream)
+ subunit_result = AutoTimingTestResultDecorator(subunit_client)
+ unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result))
+ if ourpid != os.getpid():
+ os._exit(0)
+ if newbuilddir and unittest_result.wasSuccessful():
+ suite.removefunc(newbuilddir)
+ except:
+ # Don't do anything with process children
+ if ourpid != os.getpid():
+ os._exit(1)
+ # Try and report traceback on stream, but exit with error
+ # even if stream couldn't be created or something else
+ # goes wrong. The traceback is formatted to a string and
+ # written in one go to avoid interleaving lines from
+ # multiple failing children.
+ try:
+ stream.write(traceback.format_exc().encode('utf-8'))
+ except:
+ sys.stderr.write(traceback.format_exc())
+ finally:
+ if newbuilddir:
+ suite.removefunc(newbuilddir)
+ stream.flush()
+ os._exit(1)
+ stream.flush()
+ os._exit(0)
+ else:
+ os.close(c2pwrite)
+ stream = os.fdopen(c2pread, 'rb')
+ # Collect stdout/stderr into an io buffer
+ output = io.BytesIO()
+ testserver = ProtocolTestCase(stream, passthrough=output)
+ testservers.append((testserver, numtests, output))
+ return testservers, totaltests
+
+def partition_tests(suite, count):
+ # Keep tests from the same class together but allow tests from modules
+ # to go to different processes to aid parallelisation.
+ modules = {}
+ for test in iterate_tests(suite):
+ m = test.__module__ + "." + test.__class__.__name__
+ if m not in modules:
+ modules[m] = []
+ modules[m].append(test)
+
+ # Simply divide the test blocks between the available processes
+ partitions = [list() for _ in range(count)]
+ for partition, m in zip(cycle(partitions), modules):
+ partition.extend(modules[m])
+
+ # No point in empty threads so drop them
+ return [p for p in partitions if p]
+