From 62489e58ca9975f58b48fc2bd8cf27fd22e25564 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 27 Jun 2017 13:03:41 +0200 Subject: runcmd.py: unit testing for runCmd() This covers the traditional API as well as the new output_log feature. While testing, it was noticed that killing hanging commands does not work when a shell is used to run the command(s). This might be worth fixing. Signed-off-by: Patrick Ohly Signed-off-by: Ross Burton --- meta/lib/oeqa/selftest/cases/runcmd.py | 117 +++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 meta/lib/oeqa/selftest/cases/runcmd.py (limited to 'meta/lib') diff --git a/meta/lib/oeqa/selftest/cases/runcmd.py b/meta/lib/oeqa/selftest/cases/runcmd.py new file mode 100644 index 0000000000..6c785caadc --- /dev/null +++ b/meta/lib/oeqa/selftest/cases/runcmd.py @@ -0,0 +1,117 @@ +from oeqa.selftest.case import OESelftestTestCase +from oeqa.utils.commands import runCmd +from oeqa.utils import CommandError + +import subprocess +import threading +import time +import signal + +class MemLogger(object): + def __init__(self): + self.info_msgs = [] + self.error_msgs = [] + + def info(self, msg): + self.info_msgs.append(msg) + + def error(self, msg): + self.error_msgs.append(msg) + +class RunCmdTests(OESelftestTestCase): + """ Basic tests for runCmd() utility function """ + + # The delta is intentionally smaller than the timeout, to detect cases where + # we incorrectly apply the timeout more than once. + TIMEOUT = 2 + DELTA = 1 + + def test_result_okay(self): + result = runCmd("true") + self.assertEqual(result.status, 0) + + def test_result_false(self): + result = runCmd("false", ignore_status=True) + self.assertEqual(result.status, 1) + + def test_shell(self): + # A shell is used for all string commands. + result = runCmd("false; true", ignore_status=True) + self.assertEqual(result.status, 0) + + def test_no_shell(self): + self.assertRaises(FileNotFoundError, + runCmd, "false; true", shell=False) + + def test_list_not_found(self): + self.assertRaises(FileNotFoundError, + runCmd, ["false; true"]) + + def test_list_okay(self): + result = runCmd(["true"]) + self.assertEqual(result.status, 0) + + def test_result_assertion(self): + self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar", + runCmd, "echo foobar >&2; false", shell=True) + + def test_result_exception(self): + self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar", + runCmd, "echo foobar >&2; false", shell=True, assert_error=False) + + def test_output(self): + result = runCmd("echo stdout; echo stderr >&2", shell=True) + self.assertEqual("stdout\nstderr", result.output) + self.assertEqual("", result.error) + + def test_output_split(self): + result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE) + self.assertEqual("stdout", result.output) + self.assertEqual("stderr", result.error) + + def test_timeout(self): + numthreads = threading.active_count() + start = time.time() + # Killing a hanging process only works when not using a shell?! + result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True) + self.assertEqual(result.status, -signal.SIGTERM) + end = time.time() + self.assertLess(end - start, self.TIMEOUT + self.DELTA) + self.assertEqual(numthreads, threading.active_count()) + + def test_timeout_split(self): + numthreads = threading.active_count() + start = time.time() + # Killing a hanging process only works when not using a shell?! + result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE) + self.assertEqual(result.status, -signal.SIGTERM) + end = time.time() + self.assertLess(end - start, self.TIMEOUT + self.DELTA) + self.assertEqual(numthreads, threading.active_count()) + + def test_stdin(self): + numthreads = threading.active_count() + result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT) + self.assertEqual("hello world", result.output) + self.assertEqual(numthreads, threading.active_count()) + + def test_stdin_timeout(self): + numthreads = threading.active_count() + start = time.time() + result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True) + self.assertEqual(result.status, -signal.SIGTERM) + end = time.time() + self.assertLess(end - start, self.TIMEOUT + self.DELTA) + self.assertEqual(numthreads, threading.active_count()) + + def test_log(self): + log = MemLogger() + result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log) + self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs) + self.assertEqual([], log.error_msgs) + + def test_log_split(self): + log = MemLogger() + result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE) + self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs) + self.assertEqual(["stderr"], log.error_msgs) -- cgit 1.2.3-korg