summaryrefslogtreecommitdiffstats
path: root/meta/lib/patchtest/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/patchtest/utils.py')
-rw-r--r--meta/lib/patchtest/utils.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/meta/lib/patchtest/utils.py b/meta/lib/patchtest/utils.py
new file mode 100644
index 0000000000..dd0abc22d9
--- /dev/null
+++ b/meta/lib/patchtest/utils.py
@@ -0,0 +1,168 @@
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# utils: common methods used by the patchtest framework
+#
+# Copyright (C) 2016 Intel Corporation
+#
+# SPDX-License-Identifier: GPL-2.0-only
+#
+
+import os
+import subprocess
+import logging
+import re
+import mailbox
+
+class CmdException(Exception):
+ """ Simple exception class where its attributes are the ones passed when instantiated """
+ def __init__(self, cmd):
+ self._cmd = cmd
+ def __getattr__(self, name):
+ value = None
+ if self._cmd.has_key(name):
+ value = self._cmd[name]
+ return value
+
+def exec_cmd(cmd, cwd, ignore_error=False, input=None, strip=True, updateenv={}):
+ """
+ Input:
+
+ cmd: dict containing the following keys:
+
+ cmd : the command itself as an array of strings
+ ignore_error: if False, no exception is raised
+ strip: indicates if strip is done on the output (stdout and stderr)
+ input: input data to the command (stdin)
+ updateenv: environment variables to be appended to the current
+ process environment variables
+
+ NOTE: keys 'ignore_error' and 'input' are optional; if not included,
+ the defaults are the ones specify in the arguments
+ cwd: directory where commands are executed
+ ignore_error: raise CmdException if command fails to execute and
+ this value is False
+ input: input data (stdin) for the command
+
+ Output: dict containing the following keys:
+
+ cmd: the same as input
+ ignore_error: the same as input
+ strip: the same as input
+ input: the same as input
+ stdout: Standard output after command's execution
+ stderr: Standard error after command's execution
+ returncode: Return code after command's execution
+
+ """
+ cmddefaults = {
+ 'cmd':'',
+ 'ignore_error':ignore_error,
+ 'strip':strip,
+ 'input':input,
+ 'updateenv':updateenv,
+ }
+
+ # update input values if necessary
+ cmddefaults.update(cmd)
+
+ _cmd = cmddefaults
+
+ if not _cmd['cmd']:
+ raise CmdException({'cmd':None, 'stderr':'no command given'})
+
+ # update the environment
+ env = os.environ
+ env.update(_cmd['updateenv'])
+
+ _command = [e for e in _cmd['cmd']]
+ p = subprocess.Popen(_command,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True,
+ cwd=cwd,
+ env=env)
+
+ # execute the command and strip output
+ (_stdout, _stderr) = p.communicate(_cmd['input'])
+ if _cmd['strip']:
+ _stdout, _stderr = map(str.strip, [_stdout, _stderr])
+
+ # generate the result
+ result = _cmd
+ result.update({'cmd':_command,'stdout':_stdout,'stderr':_stderr,'returncode':p.returncode})
+
+ # launch exception if necessary
+ if not _cmd['ignore_error'] and p.returncode:
+ raise CmdException(result)
+
+ return result
+
+def exec_cmds(cmds, cwd):
+ """ Executes commands
+
+ Input:
+ cmds: Array of commands
+ cwd: directory where commands are executed
+
+ Output: Array of output commands
+ """
+ results = []
+ _cmds = cmds
+
+ for cmd in _cmds:
+ result = exec_cmd(cmd, cwd)
+ results.append(result)
+
+ return results
+
+def logger_create(name):
+ logger = logging.getLogger(name)
+ loggerhandler = logging.StreamHandler()
+ loggerhandler.setFormatter(logging.Formatter("%(message)s"))
+ logger.addHandler(loggerhandler)
+ logger.setLevel(logging.INFO)
+ return logger
+
+def get_subject_prefix(path):
+ prefix = ""
+ mbox = mailbox.mbox(path)
+
+ if len(mbox):
+ subject = mbox[0]['subject']
+ if subject:
+ pattern = re.compile(r"(\[.*\])", re.DOTALL)
+ match = pattern.search(subject)
+ if match:
+ prefix = match.group(1)
+
+ return prefix
+
+def valid_branch(branch):
+ """ Check if branch is valid name """
+ lbranch = branch.lower()
+
+ invalid = lbranch.startswith('patch') or \
+ lbranch.startswith('rfc') or \
+ lbranch.startswith('resend') or \
+ re.search(r'^v\d+', lbranch) or \
+ re.search(r'^\d+/\d+', lbranch)
+
+ return not invalid
+
+def get_branch(path):
+ """ Get the branch name from mbox """
+ fullprefix = get_subject_prefix(path)
+ branch, branches, valid_branches = None, [], []
+
+ if fullprefix:
+ prefix = fullprefix.strip('[]')
+ branches = [ b.strip() for b in prefix.split(',')]
+ valid_branches = [b for b in branches if valid_branch(b)]
+
+ if len(valid_branches):
+ branch = valid_branches[0]
+
+ return branch
+