summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/selftest/cases/runcmd.py
blob: fa6113d7faa9404d4643a2f545e0f0a3ba718234 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#
# SPDX-License-Identifier: MIT
#

from oeqa.selftest.case import OESelftestTestCase
from oeqa.utils.commands import runCmd
from oeqa.utils import CommandError

import subprocess
import threading
import time
import signal

class MemLogger(object):
    def __init__(self):
        self.info_msgs = []
        self.error_msgs = []

    def info(self, msg):
        self.info_msgs.append(msg)

    def error(self, msg):
        self.error_msgs.append(msg)

class RunCmdTests(OESelftestTestCase):
    """ Basic tests for runCmd() utility function """

    # The delta is intentionally smaller than the timeout, to detect cases where
    # we incorrectly apply the timeout more than once.
    TIMEOUT = 5
    DELTA = 3

    def test_result_okay(self):
        result = runCmd("true")
        self.assertEqual(result.status, 0)

    def test_result_false(self):
        result = runCmd("false", ignore_status=True)
        self.assertEqual(result.status, 1)

    def test_shell(self):
        # A shell is used for all string commands.
        result = runCmd("false; true", ignore_status=True)
        self.assertEqual(result.status, 0)

    def test_no_shell(self):
        self.assertRaises(FileNotFoundError,
                          runCmd, "false; true", shell=False)

    def test_list_not_found(self):
        self.assertRaises(FileNotFoundError,
                          runCmd, ["false; true"])

    def test_list_okay(self):
        result = runCmd(["true"])
        self.assertEqual(result.status, 0)

    def test_result_assertion(self):
        self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar",
                                runCmd, "echo foobar >&2; false", shell=True)

    def test_result_exception(self):
        self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar",
                                runCmd, "echo foobar >&2; false", shell=True, assert_error=False)

    def test_output(self):
        result = runCmd("echo stdout; echo stderr >&2", shell=True, sync=False)
        self.assertEqual("stdout\nstderr", result.output)
        self.assertEqual("", result.error)

    def test_output_split(self):
        result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE, sync=False)
        self.assertEqual("stdout", result.output)
        self.assertEqual("stderr", result.error)

    def test_timeout(self):
        numthreads = threading.active_count()
        start = time.time()
        # Killing a hanging process only works when not using a shell?!
        result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, sync=False)
        self.assertEqual(result.status, -signal.SIGTERM)
        end = time.time()
        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))

    def test_timeout_split(self):
        numthreads = threading.active_count()
        start = time.time()
        # Killing a hanging process only works when not using a shell?!
        result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE, sync=False)
        self.assertEqual(result.status, -signal.SIGTERM)
        end = time.time()
        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))

    def test_stdin(self):
        numthreads = threading.active_count()
        result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT, sync=False)
        self.assertEqual("hello world", result.output)
        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))
        self.assertEqual(numthreads, 1)

    def test_stdin_timeout(self):
        numthreads = threading.active_count()
        start = time.time()
        result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True, sync=False)
        self.assertEqual(result.status, -signal.SIGTERM)
        end = time.time()
        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
        self.assertEqual(numthreads, threading.active_count(), msg="Thread counts were not equal before (%s) and after (%s), active threads: %s" % (numthreads, threading.active_count(), threading.enumerate()))

    def test_log(self):
        log = MemLogger()
        result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, sync=False)
        self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs)
        self.assertEqual([], log.error_msgs)

    def test_log_split(self):
        log = MemLogger()
        result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE, sync=False)
        self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs)
        self.assertEqual(["stderr"], log.error_msgs)