summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/core
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oeqa/core')
-rw-r--r--meta/lib/oeqa/core/context.py2
-rw-r--r--meta/lib/oeqa/core/decorator/__init__.py11
-rw-r--r--meta/lib/oeqa/core/decorator/data.py76
-rw-r--r--meta/lib/oeqa/core/loader.py12
-rw-r--r--meta/lib/oeqa/core/runner.py12
-rw-r--r--meta/lib/oeqa/core/target/qemu.py42
-rw-r--r--meta/lib/oeqa/core/target/ssh.py94
-rw-r--r--meta/lib/oeqa/core/utils/concurrencytest.py15
-rw-r--r--meta/lib/oeqa/core/utils/misc.py47
9 files changed, 168 insertions, 143 deletions
diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py
index 2abe353d27..9313271f58 100644
--- a/meta/lib/oeqa/core/context.py
+++ b/meta/lib/oeqa/core/context.py
@@ -81,7 +81,7 @@ class OETestContext(object):
def runTests(self, processes=None, skips=[]):
self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
- # Dinamically skip those tests specified though arguments
+ # Dynamically skip those tests specified though arguments
self.skipTests(skips)
self._run_start_time = time.time()
diff --git a/meta/lib/oeqa/core/decorator/__init__.py b/meta/lib/oeqa/core/decorator/__init__.py
index 1a82518ab6..93efd30e1d 100644
--- a/meta/lib/oeqa/core/decorator/__init__.py
+++ b/meta/lib/oeqa/core/decorator/__init__.py
@@ -5,8 +5,7 @@
#
from functools import wraps
-from abc import abstractmethod, ABCMeta
-from oeqa.core.utils.misc import strToList
+from abc import ABCMeta
decoratorClasses = set()
@@ -65,15 +64,11 @@ class OETestDiscover(OETestDecorator):
return registry['cases']
def OETestTag(*tags):
- expandedtags = []
- for tag in tags:
- expandedtags += strToList(tag)
def decorator(item):
if hasattr(item, "__oeqa_testtags"):
# do not append, create a new list (to handle classes with inheritance)
- item.__oeqa_testtags = list(item.__oeqa_testtags) + expandedtags
+ item.__oeqa_testtags = list(item.__oeqa_testtags) + list(tags)
else:
- item.__oeqa_testtags = expandedtags
+ item.__oeqa_testtags = tags
return item
return decorator
-
diff --git a/meta/lib/oeqa/core/decorator/data.py b/meta/lib/oeqa/core/decorator/data.py
index bc4939e87c..5444b2cb75 100644
--- a/meta/lib/oeqa/core/decorator/data.py
+++ b/meta/lib/oeqa/core/decorator/data.py
@@ -13,8 +13,8 @@ def has_feature(td, feature):
Checks for feature in DISTRO_FEATURES or IMAGE_FEATURES.
"""
- if (feature in td.get('DISTRO_FEATURES', '') or
- feature in td.get('IMAGE_FEATURES', '')):
+ if (feature in td.get('DISTRO_FEATURES', '').split() or
+ feature in td.get('IMAGE_FEATURES', '').split()):
return True
return False
@@ -23,18 +23,7 @@ def has_machine(td, machine):
Checks for MACHINE.
"""
- if (machine in td.get('MACHINE', '')):
- return True
- return False
-
-def is_qemu(td, qemu):
- """
- Checks if MACHINE is qemu.
- """
-
- machine = td.get('MACHINE', '')
- if (qemu in td.get('MACHINE', '') or
- machine.startswith('qemu')):
+ if (machine == td.get('MACHINE', '')):
return True
return False
@@ -189,34 +178,53 @@ class skipIfMachine(OETestDecorator):
@registerDecorator
class skipIfNotQemu(OETestDecorator):
"""
- Skip test based on MACHINE.
-
- value must be a qemu MACHINE or it will skip the test
- with msg as the reason.
+ Skip test if MACHINE is not qemu*
"""
+ def setUpDecorator(self):
+ self.logger.debug("Checking if not qemu MACHINE")
+ if not self.case.td.get('MACHINE', '').startswith('qemu'):
+ self.case.skipTest('Test only runs on qemu machines')
- attrs = ('value', 'msg')
-
+@registerDecorator
+class skipIfNotQemuUsermode(OETestDecorator):
+ """
+ Skip test if MACHINE_FEATURES does not contain qemu-usermode
+ """
def setUpDecorator(self):
- msg = ('Checking if %s is not this MACHINE' % self.value)
- self.logger.debug(msg)
- if not is_qemu(self.case.td, self.value):
- self.case.skipTest(self.msg)
+ self.logger.debug("Checking if MACHINE_FEATURES does not contain qemu-usermode")
+ if 'qemu-usermode' not in self.case.td.get('MACHINE_FEATURES', '').split():
+ self.case.skipTest('Test requires qemu-usermode in MACHINE_FEATURES')
@registerDecorator
class skipIfQemu(OETestDecorator):
"""
- Skip test based on Qemu Machine.
-
- value must not be a qemu machine or it will skip the test
- with msg as the reason.
- """
+ Skip test if MACHINE is qemu*
+ """
+ def setUpDecorator(self):
+ self.logger.debug("Checking if qemu MACHINE")
+ if self.case.td.get('MACHINE', '').startswith('qemu'):
+ self.case.skipTest('Test only runs on real hardware')
- attrs = ('value', 'msg')
+@registerDecorator
+class skipIfArch(OETestDecorator):
+ """
+ Skip test if HOST_ARCH is present in the tuple specified.
+ """
+ attrs = ('archs',)
def setUpDecorator(self):
- msg = ('Checking if %s is this MACHINE' % self.value)
- self.logger.debug(msg)
- if is_qemu(self.case.td, self.value):
- self.case.skipTest(self.msg)
+ arch = self.case.td['HOST_ARCH']
+ if arch in self.archs:
+ self.case.skipTest('Test skipped on %s' % arch)
+
+@registerDecorator
+class skipIfNotArch(OETestDecorator):
+ """
+ Skip test if HOST_ARCH is not present in the tuple specified.
+ """
+ attrs = ('archs',)
+ def setUpDecorator(self):
+ arch = self.case.td['HOST_ARCH']
+ if arch not in self.archs:
+ self.case.skipTest('Test skipped on %s' % arch)
diff --git a/meta/lib/oeqa/core/loader.py b/meta/lib/oeqa/core/loader.py
index 11978213b8..d12d5a055c 100644
--- a/meta/lib/oeqa/core/loader.py
+++ b/meta/lib/oeqa/core/loader.py
@@ -37,7 +37,7 @@ def _find_duplicated_modules(suite, directory):
if path:
raise ImportError("Duplicated %s module found in %s" % (module, path))
-def _built_modules_dict(modules):
+def _built_modules_dict(modules, logger):
modules_dict = {}
if modules == None:
@@ -48,6 +48,9 @@ def _built_modules_dict(modules):
# characters, whereas class names do
m = re.match(r'^([0-9a-z_.]+)(?:\.(\w[^.]*)(?:\.([^.]+))?)?$', module, flags=re.ASCII)
if not m:
+ logger.warn("module '%s' was skipped from selected modules, "\
+ "because it doesn't match with module name assumptions: "\
+ "package and module names do not contain upper case characters, whereas class names do" % module)
continue
module_name, class_name, test_name = m.groups()
@@ -58,6 +61,8 @@ def _built_modules_dict(modules):
modules_dict[module_name][class_name] = []
if test_name and test_name not in modules_dict[module_name][class_name]:
modules_dict[module_name][class_name].append(test_name)
+ if modules and not modules_dict:
+ raise OEQATestNotFound("All selected modules were skipped, this would trigger selftest with all tests and -r ignored.")
return modules_dict
@@ -71,7 +76,7 @@ class OETestLoader(unittest.TestLoader):
*args, **kwargs):
self.tc = tc
- self.modules = _built_modules_dict(modules)
+ self.modules = _built_modules_dict(modules, tc.logger)
self.tests = tests
self.modules_required = modules_required
@@ -311,6 +316,9 @@ class OETestLoader(unittest.TestLoader):
module_name_small in self.modules) \
else False
+ if any(c.isupper() for c in module.__name__):
+ raise SystemExit("Module '%s' contains uppercase characters and this isn't supported. Please fix the module name." % module.__name__)
+
return (load_module, load_underscore)
diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py
index d50690ab37..a86a706bd9 100644
--- a/meta/lib/oeqa/core/runner.py
+++ b/meta/lib/oeqa/core/runner.py
@@ -44,6 +44,7 @@ class OETestResult(_TestResult):
self.endtime = {}
self.progressinfo = {}
self.extraresults = {}
+ self.shownmsg = []
# Inject into tc so that TestDepends decorator can see results
tc.results = self
@@ -74,6 +75,7 @@ class OETestResult(_TestResult):
for (scase, msg) in getattr(self, t):
if test.id() == scase.id():
self.tc.logger.info(str(msg))
+ self.shownmsg.append(test.id())
break
def logSummary(self, component, context_msg=''):
@@ -169,7 +171,6 @@ class OETestResult(_TestResult):
def logDetails(self, json_file_dir=None, configuration=None, result_id=None,
dump_streams=False):
- self.tc.logger.info("RESULTS:")
result = self.extraresults
logs = {}
@@ -193,6 +194,10 @@ class OETestResult(_TestResult):
report = {'status': status}
if log:
report['log'] = log
+ # Class setup failures wouldn't enter stopTest so would never display
+ if case.id() not in self.shownmsg:
+ self.tc.logger.info("Failure (%s) for %s:\n" % (status, case.id()) + log)
+
if duration:
report['duration'] = duration
@@ -215,6 +220,7 @@ class OETestResult(_TestResult):
report['stderr'] = stderr
result[case.id()] = report
+ self.tc.logger.info("RESULTS:")
for i in ['PASSED', 'SKIPPED', 'EXPECTEDFAIL', 'ERROR', 'FAILED', 'UNKNOWN']:
if i not in logs:
continue
@@ -229,6 +235,10 @@ class OETestResult(_TestResult):
# Override as we unexpected successes aren't failures for us
return (len(self.failures) == len(self.errors) == 0)
+ def hasAnyFailingTest(self):
+ # Account for expected failures
+ return not self.wasSuccessful() or len(self.expectedFailures)
+
class OEListTestsResult(object):
def wasSuccessful(self):
return True
diff --git a/meta/lib/oeqa/core/target/qemu.py b/meta/lib/oeqa/core/target/qemu.py
index 4a5df4a9a8..d93b3ac94a 100644
--- a/meta/lib/oeqa/core/target/qemu.py
+++ b/meta/lib/oeqa/core/target/qemu.py
@@ -8,19 +8,19 @@ import os
import sys
import signal
import time
+import glob
+import subprocess
from collections import defaultdict
from .ssh import OESSHTarget
from oeqa.utils.qemurunner import QemuRunner
-from oeqa.utils.dump import MonitorDumper
-from oeqa.utils.dump import TargetDumper
supported_fstypes = ['ext3', 'ext4', 'cpio.gz', 'wic']
class OEQemuTarget(OESSHTarget):
def __init__(self, logger, server_ip, timeout=300, user='root',
port=None, machine='', rootfs='', kernel='', kvm=False, slirp=False,
- dump_dir='', dump_host_cmds='', display='', bootlog='',
+ dump_dir='', display='', bootlog='',
tmpdir='', dir_image='', boottime=60, serial_ports=2,
boot_patterns = defaultdict(str), ovmf=False, tmpfsdir=None, **kwargs):
@@ -36,22 +36,15 @@ class OEQemuTarget(OESSHTarget):
self.ovmf = ovmf
self.use_slirp = slirp
self.boot_patterns = boot_patterns
+ self.dump_dir = dump_dir
+ self.bootlog = bootlog
self.runner = QemuRunner(machine=machine, rootfs=rootfs, tmpdir=tmpdir,
deploy_dir_image=dir_image, display=display,
logfile=bootlog, boottime=boottime,
- use_kvm=kvm, use_slirp=slirp, dump_dir=dump_dir,
- dump_host_cmds=dump_host_cmds, logger=logger,
+ use_kvm=kvm, use_slirp=slirp, dump_dir=dump_dir, logger=logger,
serial_ports=serial_ports, boot_patterns = boot_patterns,
use_ovmf=ovmf, tmpfsdir=tmpfsdir)
- dump_monitor_cmds = kwargs.get("testimage_dump_monitor")
- self.monitor_dumper = MonitorDumper(dump_monitor_cmds, dump_dir, self.runner)
- if self.monitor_dumper:
- self.monitor_dumper.create_dir("qmp")
-
- dump_target_cmds = kwargs.get("testimage_dump_target")
- self.target_dumper = TargetDumper(dump_target_cmds, dump_dir, self.runner)
- self.target_dumper.create_dir("qemu")
def start(self, params=None, extra_bootparams=None, runqemuparams=''):
if self.use_slirp and not self.server_ip:
@@ -74,7 +67,28 @@ class OEQemuTarget(OESSHTarget):
self.server_ip = self.runner.server_ip
else:
self.stop()
- raise RuntimeError("FAILED to start qemu - check the task log and the boot log")
+ # Display the first 20 lines of top and
+ # last 20 lines of the bootlog when the
+ # target is not being booted up.
+ topfile = glob.glob(self.dump_dir + "/*_qemu/host_*_top")
+ msg = "\n\n===== start: snippet =====\n\n"
+ for f in topfile:
+ msg += "file: %s\n\n" % f
+ with open(f) as tf:
+ for x in range(20):
+ msg += next(tf)
+ msg += "\n\n===== end: snippet =====\n\n"
+ blcmd = ["tail", "-20", self.bootlog]
+ msg += "===== start: snippet =====\n\n"
+ try:
+ out = subprocess.check_output(blcmd, stderr=subprocess.STDOUT, timeout=1).decode('utf-8')
+ msg += "file: %s\n\n" % self.bootlog
+ msg += out
+ except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError) as err:
+ msg += "Error running command: %s\n%s\n" % (blcmd, err)
+ msg += "\n\n===== end: snippet =====\n"
+
+ raise RuntimeError("FAILED to start qemu - check the task log and the boot log %s" % (msg))
def stop(self):
self.runner.stop()
diff --git a/meta/lib/oeqa/core/target/ssh.py b/meta/lib/oeqa/core/target/ssh.py
index 923a223b25..09cdd14c75 100644
--- a/meta/lib/oeqa/core/target/ssh.py
+++ b/meta/lib/oeqa/core/target/ssh.py
@@ -34,16 +34,20 @@ class OESSHTarget(OETarget):
self.timeout = timeout
self.user = user
ssh_options = [
+ '-o', 'ServerAliveCountMax=2',
+ '-o', 'ServerAliveInterval=30',
'-o', 'UserKnownHostsFile=/dev/null',
'-o', 'StrictHostKeyChecking=no',
'-o', 'LogLevel=ERROR'
]
+ scp_options = [
+ '-r'
+ ]
self.ssh = ['ssh', '-l', self.user ] + ssh_options
- self.scp = ['scp'] + ssh_options
+ self.scp = ['scp'] + ssh_options + scp_options
if port:
self.ssh = self.ssh + [ '-p', port ]
self.scp = self.scp + [ '-P', port ]
- self._monitor_dumper = None
def start(self, **kwargs):
pass
@@ -51,15 +55,6 @@ class OESSHTarget(OETarget):
def stop(self, **kwargs):
pass
- @property
- def monitor_dumper(self):
- return self._monitor_dumper
-
- @monitor_dumper.setter
- def monitor_dumper(self, dumper):
- self._monitor_dumper = dumper
- self.monitor_dumper.dump_monitor()
-
def _run(self, command, timeout=None, ignore_status=True):
"""
Runs command in target using SSHProcess.
@@ -77,7 +72,7 @@ class OESSHTarget(OETarget):
return (status, output)
- def run(self, command, timeout=None):
+ def run(self, command, timeout=None, ignore_status=True):
"""
Runs command in target.
@@ -96,15 +91,9 @@ class OESSHTarget(OETarget):
else:
processTimeout = self.timeout
- status, output = self._run(sshCmd, processTimeout, True)
+ status, output = self._run(sshCmd, processTimeout, ignore_status)
self.logger.debug('Command: %s\nStatus: %d Output: %s\n' % (command, status, output))
- if (status == 255) and (('No route to host') in output):
- if self.monitor_dumper:
- self.monitor_dumper.dump_monitor()
- if status == 255:
- self.target_dumper.dump_target()
- if self.monitor_dumper:
- self.monitor_dumper.dump_monitor()
+
return (status, output)
def copyTo(self, localSrc, remoteDst):
@@ -222,27 +211,41 @@ def SSHCall(command, logger, timeout=None, **opts):
def run():
nonlocal output
nonlocal process
+ output_raw = b''
starttime = time.time()
process = subprocess.Popen(command, **options)
+ has_timeout = False
if timeout:
endtime = starttime + timeout
eof = False
- while time.time() < endtime and not eof:
- logger.debug('time: %s, endtime: %s' % (time.time(), endtime))
+ os.set_blocking(process.stdout.fileno(), False)
+ while not has_timeout and not eof:
try:
+ logger.debug('Waiting for process output: time: %s, endtime: %s' % (time.time(), endtime))
if select.select([process.stdout], [], [], 5)[0] != []:
- reader = codecs.getreader('utf-8')(process.stdout, 'ignore')
- data = reader.read(1024, 4096)
+ # wait a bit for more data, tries to avoid reading single characters
+ time.sleep(0.2)
+ data = process.stdout.read()
if not data:
- process.stdout.close()
eof = True
else:
- output += data
- logger.debug('Partial data from SSH call: %s' % data)
+ output_raw += data
+ # ignore errors to capture as much as possible
+ logger.debug('Partial data from SSH call:\n%s' % data.decode('utf-8', errors='ignore'))
endtime = time.time() + timeout
except InterruptedError:
+ logger.debug('InterruptedError')
+ continue
+ except BlockingIOError:
+ logger.debug('BlockingIOError')
continue
+ if time.time() >= endtime:
+ logger.debug('SSHCall has timeout! Time: %s, endtime: %s' % (time.time(), endtime))
+ has_timeout = True
+
+ process.stdout.close()
+
# process hasn't returned yet
if not eof:
process.terminate()
@@ -250,16 +253,42 @@ def SSHCall(command, logger, timeout=None, **opts):
try:
process.kill()
except OSError:
+ logger.debug('OSError when killing process')
pass
endtime = time.time() - starttime
lastline = ("\nProcess killed - no output for %d seconds. Total"
" running time: %d seconds." % (timeout, endtime))
- logger.debug('Received data from SSH call %s ' % lastline)
+ logger.debug('Received data from SSH call:\n%s ' % lastline)
output += lastline
+ process.wait()
else:
- output = process.communicate()[0].decode('utf-8', errors='ignore')
- logger.debug('Data from SSH call: %s' % output.rstrip())
+ output_raw = process.communicate()[0]
+
+ output = output_raw.decode('utf-8', errors='ignore')
+ logger.debug('Data from SSH call:\n%s' % output.rstrip())
+
+ # timout or not, make sure process exits and is not hanging
+ if process.returncode == None:
+ try:
+ process.wait(timeout=5)
+ except TimeoutExpired:
+ try:
+ process.kill()
+ except OSError:
+ logger.debug('OSError')
+ pass
+ process.wait()
+
+ if has_timeout:
+ # Version of openssh before 8.6_p1 returns error code 0 when killed
+ # by a signal, when the timeout occurs we will receive a 0 error
+ # code because the process is been terminated and it's wrong because
+ # that value means success, but the process timed out.
+ # Afterwards, from version 8.6_p1 onwards, the returned code is 255.
+ # Fix this behaviour by checking the return code
+ if process.returncode == 0:
+ process.returncode = 255
options = {
"stdout": subprocess.PIPE,
@@ -286,6 +315,9 @@ def SSHCall(command, logger, timeout=None, **opts):
# whilst running and ensure we don't leave a process behind.
if process.poll() is None:
process.kill()
+ if process.returncode == None:
+ process.wait()
logger.debug('Something went wrong, killing SSH process')
raise
- return (process.wait(), output.rstrip())
+
+ return (process.returncode, output.rstrip())
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
index 161a2f6e90..d10f8f7f04 100644
--- a/meta/lib/oeqa/core/utils/concurrencytest.py
+++ b/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -1,5 +1,7 @@
#!/usr/bin/env python3
#
+# Copyright OpenEmbedded Contributors
+#
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Modified for use in OE by Richard Purdie, 2018
@@ -57,6 +59,7 @@ class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
self.outputbuf = output
self.finalresult = finalresult
self.finalresult.buffer = True
+ self.target = target
def _add_result_with_semaphore(self, method, test, *args, **kwargs):
self.semaphore.acquire()
@@ -65,13 +68,14 @@ class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
self.result.starttime[test.id()] = self._test_start.timestamp()
self.result.threadprogress[self.threadnum].append(test.id())
totalprogress = sum(len(x) for x in self.result.threadprogress.values())
- self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
+ self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s failed) (%s)" % (
self.threadnum,
len(self.result.threadprogress[self.threadnum]),
self.totalinprocess,
totalprogress,
self.totaltests,
"{0:.2f}".format(time.time()-self._test_start.timestamp()),
+ self.target.failed_tests,
test.id())
finally:
self.semaphore.release()
@@ -189,11 +193,12 @@ class dummybuf(object):
#
class ConcurrentTestSuite(unittest.TestSuite):
- def __init__(self, suite, processes, setupfunc, removefunc):
+ def __init__(self, suite, processes, setupfunc, removefunc, bb_vars):
super(ConcurrentTestSuite, self).__init__([suite])
self.processes = processes
self.setupfunc = setupfunc
self.removefunc = removefunc
+ self.bb_vars = bb_vars
def run(self, result):
testservers, totaltests = fork_for_tests(self.processes, self)
@@ -239,7 +244,7 @@ class ConcurrentTestSuite(unittest.TestSuite):
def fork_for_tests(concurrency_num, suite):
testservers = []
if 'BUILDDIR' in os.environ:
- selftestdir = get_test_layer()
+ selftestdir = get_test_layer(suite.bb_vars['BBLAYERS'])
test_blocks = partition_tests(suite, concurrency_num)
# Clear the tests from the original suite so it doesn't keep them alive
@@ -259,7 +264,7 @@ def fork_for_tests(concurrency_num, suite):
ourpid = os.getpid()
try:
newbuilddir = None
- stream = os.fdopen(c2pwrite, 'wb', 1)
+ stream = os.fdopen(c2pwrite, 'wb')
os.close(c2pread)
(builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite)
@@ -304,7 +309,7 @@ def fork_for_tests(concurrency_num, suite):
os._exit(0)
else:
os.close(c2pwrite)
- stream = os.fdopen(c2pread, 'rb', 1)
+ stream = os.fdopen(c2pread, 'rb')
# Collect stdout/stderr into an io buffer
output = io.BytesIO()
testserver = ProtocolTestCase(stream, passthrough=output)
diff --git a/meta/lib/oeqa/core/utils/misc.py b/meta/lib/oeqa/core/utils/misc.py
deleted file mode 100644
index e1a59588eb..0000000000
--- a/meta/lib/oeqa/core/utils/misc.py
+++ /dev/null
@@ -1,47 +0,0 @@
-#
-# Copyright (C) 2016 Intel Corporation
-#
-# SPDX-License-Identifier: MIT
-#
-
-def toList(obj, obj_type, obj_name="Object"):
- if isinstance(obj, obj_type):
- return [obj]
- elif isinstance(obj, list):
- return obj
- else:
- raise TypeError("%s must be %s or list" % (obj_name, obj_type))
-
-def toSet(obj, obj_type, obj_name="Object"):
- if isinstance(obj, obj_type):
- return {obj}
- elif isinstance(obj, list):
- return set(obj)
- elif isinstance(obj, set):
- return obj
- else:
- raise TypeError("%s must be %s or set" % (obj_name, obj_type))
-
-def strToList(obj, obj_name="Object"):
- return toList(obj, str, obj_name)
-
-def strToSet(obj, obj_name="Object"):
- return toSet(obj, str, obj_name)
-
-def intToList(obj, obj_name="Object"):
- return toList(obj, int, obj_name)
-
-def dataStoteToDict(d, variables):
- data = {}
-
- for v in variables:
- data[v] = d.getVar(v)
-
- return data
-
-def updateTestData(d, td, variables):
- """
- Updates variables with values of data store to test data.
- """
- for var in variables:
- td[var] = d.getVar(var)