summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/oetest.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oeqa/oetest.py')
-rw-r--r--meta/lib/oeqa/oetest.py686
1 files changed, 551 insertions, 135 deletions
diff --git a/meta/lib/oeqa/oetest.py b/meta/lib/oeqa/oetest.py
index dfed3dea87..9c84466dd0 100644
--- a/meta/lib/oeqa/oetest.py
+++ b/meta/lib/oeqa/oetest.py
@@ -1,112 +1,85 @@
+#
# Copyright (C) 2013 Intel Corporation
#
-# Released under the MIT license (see COPYING.MIT)
+# SPDX-License-Identifier: MIT
+#
# Main unittest module used by testimage.bbclass
# This provides the oeRuntimeTest base class which is inherited by all tests in meta/lib/oeqa/runtime.
# It also has some helper functions and it's responsible for actually starting the tests
-import os, re, mmap
+import os, re, sys
import unittest
import inspect
import subprocess
-import datetime
-import bb
-from oeqa.utils.decorators import LogResults
-
-def loadTests(tc, type="runtime"):
- if type == "runtime":
- # set the context object passed from the test class
- setattr(oeTest, "tc", tc)
- # set ps command to use
- setattr(oeRuntimeTest, "pscmd", "ps -ef" if oeTest.hasPackage("procps") else "ps")
- # prepare test suite, loader and runner
- suite = unittest.TestSuite()
- elif type == "sdk":
- # set the context object passed from the test class
- setattr(oeTest, "tc", tc)
- testloader = unittest.TestLoader()
- testloader.sortTestMethodsUsing = None
- suites = [testloader.loadTestsFromName(name) for name in tc.testslist]
-
- def getTests(test):
- '''Return all individual tests executed when running the suite.'''
- # Unfortunately unittest does not have an API for this, so we have
- # to rely on implementation details. This only needs to work
- # for TestSuite containing TestCase.
- method = getattr(test, '_testMethodName', None)
- if method:
- # leaf case: a TestCase
- yield test
- else:
- # Look into TestSuite.
- tests = getattr(test, '_tests', [])
- for t1 in tests:
- for t2 in getTests(t1):
- yield t2
+import signal
+import shutil
+import functools
+try:
+ import bb
+except ImportError:
+ pass
+import logging
+
+import oeqa.runtime
+# Exported test doesn't require sdkext
+try:
+ import oeqa.sdkext
+except ImportError:
+ pass
+from oeqa.utils.decorators import LogResults, gettag, getResults
+
+logger = logging.getLogger("BitBake")
+
+def getVar(obj):
+ #extend form dict, if a variable didn't exists, need find it in testcase
+ class VarDict(dict):
+ def __getitem__(self, key):
+ return gettag(obj, key)
+ return VarDict()
- # Determine dependencies between suites by looking for @skipUnlessPassed
- # method annotations. Suite A depends on suite B if any method in A
- # depends on a method on B.
- for suite in suites:
- suite.dependencies = []
- suite.depth = 0
- for test in getTests(suite):
- methodname = getattr(test, '_testMethodName', None)
- if methodname:
- method = getattr(test, methodname)
- depends_on = getattr(method, '_depends_on', None)
- if depends_on:
- for dep_suite in suites:
- if depends_on in [getattr(t, '_testMethodName', None) for t in getTests(dep_suite)]:
- if dep_suite not in suite.dependencies and \
- dep_suite is not suite:
- suite.dependencies.append(dep_suite)
- break
- else:
- bb.warn("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." %
- (test, depends_on))
- # Use brute-force topological sort to determine ordering. Sort by
- # depth (higher depth = must run later), with original ordering to
- # break ties.
- def set_suite_depth(suite):
- for dep in suite.dependencies:
- new_depth = set_suite_depth(dep) + 1
- if new_depth > suite.depth:
- suite.depth = new_depth
- return suite.depth
- for index, suite in enumerate(suites):
- set_suite_depth(suite)
- suite.index = index
- suites.sort(cmp=lambda a,b: cmp((a.depth, a.index), (b.depth, b.index)))
- return testloader.suiteClass(suites)
-
-def runTests(tc, type="runtime"):
-
- suite = loadTests(tc, type)
- bb.note("Test modules %s" % tc.testslist)
- bb.note("Found %s tests" % suite.countTestCases())
- runner = unittest.TextTestRunner(verbosity=2)
- result = runner.run(suite)
-
- return result
+def checkTags(tc, tagexp):
+ return eval(tagexp, None, getVar(tc))
+
+def filterByTagExp(testsuite, tagexp):
+ if not tagexp:
+ return testsuite
+ caseList = []
+ for each in testsuite:
+ if not isinstance(each, unittest.BaseTestSuite):
+ if checkTags(each, tagexp):
+ caseList.append(each)
+ else:
+ caseList.append(filterByTagExp(each, tagexp))
+ return testsuite.__class__(caseList)
@LogResults
class oeTest(unittest.TestCase):
+ pscmd = "ps"
longMessage = True
@classmethod
def hasPackage(self, pkg):
+ """
+ True if the full package name exists in the manifest, False otherwise.
+ """
+ return pkg in oeTest.tc.pkgmanifest
- if re.search(pkg, oeTest.tc.pkgmanifest):
- return True
+ @classmethod
+ def hasPackageMatch(self, match):
+ """
+ True if match exists in the manifest as a regular expression substring,
+ False otherwise.
+ """
+ for s in oeTest.tc.pkgmanifest:
+ if re.match(match, s):
+ return True
return False
@classmethod
def hasFeature(self,feature):
-
if feature in oeTest.tc.imagefeatures or \
feature in oeTest.tc.distrofeatures:
return True
@@ -118,61 +91,44 @@ class oeRuntimeTest(oeTest):
self.target = oeRuntimeTest.tc.target
super(oeRuntimeTest, self).__init__(methodName)
+ def setUp(self):
+ # Install packages in the DUT
+ self.tc.install_uninstall_packages(self.id())
+
+ # Check if test needs to run
+ if self.tc.sigterm:
+ self.fail("Got SIGTERM")
+ elif (type(self.target).__name__ == "QemuTarget"):
+ self.assertTrue(self.target.check(), msg = "Qemu not running?")
+
+ self.setUpLocal()
+
+ # a setup method before tests but after the class instantiation
+ def setUpLocal(self):
+ pass
+
def tearDown(self):
- # If a test fails or there is an exception
- if (self._resultForDoCleanups.failures or
- self._resultForDoCleanups.errors):
- self.dump_target_logs()
-
- def dump_target_logs(self):
- commands = ["top -bn1", "ps", "free", "df", "_ping", "dmesg", "netstat -a", "ifconfig -a", "_logs"]
- dump_dir = "/tmp/oe-saved-tests"
- dump_sub_dir = ("%s_%s" % (
- datetime.datetime.now().strftime('%Y%m%d%H%M'),
- self._testMethodName))
- dump_dir = os.path.join(dump_dir, dump_sub_dir)
- os.makedirs(dump_dir)
- bb.warn("%s failed: getting data from target and "
- "saving into %s" % (self._testMethodName, dump_dir))
- for command in commands:
- # This will ping the host from target
- if command == "_ping":
- comm = "ping -c3 %s" % self.target.server_ip
- # This will get all the logs from /var/log/
- elif command == "_logs":
- comm = 'find /var/log/ -type f 2>/dev/null '
- comm = '%s-exec echo "%s" \\; ' % (comm, '='*20)
- comm = '%s-exec echo {} \\; ' % comm
- comm = '%s-exec echo "%s" \\; ' % (comm, '='*20)
- comm = '%s-exec cat {} \\; -exec echo "" \\;' % comm
- else:
- comm = command
- (status, output) = self.target.run_serial(comm)
- filename = command.split()[0]
- with open(os.path.join(dump_dir, filename), 'w') as f:
- f.write(output)
-
- #TODO: use package_manager.py to install packages on any type of image
- def install_packages(self, packagelist):
- for package in packagelist:
- (status, result) = self.target.run("smart install -y "+package)
- if status != 0:
- return status
-
-class oeSDKTest(oeTest):
- def __init__(self, methodName='runTest'):
- self.sdktestdir = oeSDKTest.tc.sdktestdir
- super(oeSDKTest, self).__init__(methodName)
+ # Uninstall packages in the DUT
+ self.tc.install_uninstall_packages(self.id(), False)
- @classmethod
- def hasHostPackage(self, pkg):
+ res = getResults()
+ # If a test fails or there is an exception dump
+ # for QemuTarget only
+ if (type(self.target).__name__ == "QemuTarget" and
+ (self.id() in res.getErrorList() or
+ self.id() in res.getFailList())):
+ self.tc.host_dumper.create_dir(self._testMethodName)
+ self.tc.host_dumper.dump_host()
+ self.target.target_dumper.dump_target(
+ self.tc.host_dumper.dump_dir)
+ print ("%s dump data stored in %s" % (self._testMethodName,
+ self.tc.host_dumper.dump_dir))
- if re.search(pkg, oeTest.tc.hostpkgmanifest):
- return True
- return False
+ self.tearDownLocal()
- def _run(self, cmd):
- return subprocess.check_output(cmd, shell=True)
+ # Method to be run after tearDown and implemented by child classes
+ def tearDownLocal(self):
+ pass
def getmodule(pos=2):
# stack returns a list of tuples containg frame information
@@ -200,3 +156,463 @@ def skipModuleUnless(cond, reason):
if not cond:
skipModule(reason, 3)
+
+_buffer_logger = ""
+def custom_verbose(msg, *args, **kwargs):
+ global _buffer_logger
+ if msg[-1] != "\n":
+ _buffer_logger += msg
+ else:
+ _buffer_logger += msg
+ try:
+ bb.plain(_buffer_logger.rstrip("\n"), *args, **kwargs)
+ except NameError:
+ logger.info(_buffer_logger.rstrip("\n"), *args, **kwargs)
+ _buffer_logger = ""
+
+class TestContext(object):
+ def __init__(self, d, exported=False):
+ self.d = d
+
+ self.testsuites = self._get_test_suites()
+
+ if exported:
+ path = [os.path.dirname(os.path.abspath(__file__))]
+ extrapath = ""
+ else:
+ path = d.getVar("BBPATH").split(':')
+ extrapath = "lib/oeqa"
+
+ self.testslist = self._get_tests_list(path, extrapath)
+ self.testsrequired = self._get_test_suites_required()
+
+ self.filesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "runtime/files")
+ self.corefilesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files")
+ self.imagefeatures = d.getVar("IMAGE_FEATURES").split()
+ self.distrofeatures = d.getVar("DISTRO_FEATURES").split()
+
+ # get testcase list from specified file
+ # if path is a relative path, then relative to build/conf/
+ def _read_testlist(self, fpath, builddir):
+ if not os.path.isabs(fpath):
+ fpath = os.path.join(builddir, "conf", fpath)
+ if not os.path.exists(fpath):
+ bb.fatal("No such manifest file: ", fpath)
+ tcs = []
+ for line in open(fpath).readlines():
+ line = line.strip()
+ if line and not line.startswith("#"):
+ tcs.append(line)
+ return " ".join(tcs)
+
+ # return test list by type also filter if TEST_SUITES is specified
+ def _get_tests_list(self, bbpath, extrapath):
+ testslist = []
+
+ type = self._get_test_namespace()
+
+ # This relies on lib/ under each directory in BBPATH being added to sys.path
+ # (as done by default in base.bbclass)
+ for testname in self.testsuites:
+ if testname != "auto":
+ if testname.startswith("oeqa."):
+ testslist.append(testname)
+ continue
+ found = False
+ for p in bbpath:
+ if os.path.exists(os.path.join(p, extrapath, type, testname + ".py")):
+ testslist.append("oeqa." + type + "." + testname)
+ found = True
+ break
+ elif os.path.exists(os.path.join(p, extrapath, type, testname.split(".")[0] + ".py")):
+ testslist.append("oeqa." + type + "." + testname)
+ found = True
+ break
+ if not found:
+ bb.fatal('Test %s specified in TEST_SUITES could not be found in lib/oeqa/runtime under BBPATH' % testname)
+
+ if "auto" in self.testsuites:
+ def add_auto_list(path):
+ files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_')])
+ for f in files:
+ module = 'oeqa.' + type + '.' + f[:-3]
+ if module not in testslist:
+ testslist.append(module)
+
+ for p in bbpath:
+ testpath = os.path.join(p, 'lib', 'oeqa', type)
+ bb.debug(2, 'Searching for tests in %s' % testpath)
+ if os.path.exists(testpath):
+ add_auto_list(testpath)
+
+ return testslist
+
+ def getTestModules(self):
+ """
+ Returns all the test modules in the testlist.
+ """
+
+ import pkgutil
+
+ modules = []
+ for test in self.testslist:
+ if re.search("\w+\.\w+\.test_\S+", test):
+ test = '.'.join(t.split('.')[:3])
+ module = pkgutil.get_loader(test)
+ modules.append(module)
+
+ return modules
+
+ def getModulefromID(self, test_id):
+ """
+ Returns the test module based on a test id.
+ """
+
+ module_name = ".".join(test_id.split(".")[:3])
+ modules = self.getTestModules()
+ for module in modules:
+ if module.name == module_name:
+ return module
+
+ return None
+
+ def getTests(self, test):
+ '''Return all individual tests executed when running the suite.'''
+ # Unfortunately unittest does not have an API for this, so we have
+ # to rely on implementation details. This only needs to work
+ # for TestSuite containing TestCase.
+ method = getattr(test, '_testMethodName', None)
+ if method:
+ # leaf case: a TestCase
+ yield test
+ else:
+ # Look into TestSuite.
+ tests = getattr(test, '_tests', [])
+ for t1 in tests:
+ for t2 in self.getTests(t1):
+ yield t2
+
+ def loadTests(self):
+ setattr(oeTest, "tc", self)
+
+ testloader = unittest.TestLoader()
+ testloader.sortTestMethodsUsing = None
+ suites = [testloader.loadTestsFromName(name) for name in self.testslist]
+ suites = filterByTagExp(suites, getattr(self, "tagexp", None))
+
+ # Determine dependencies between suites by looking for @skipUnlessPassed
+ # method annotations. Suite A depends on suite B if any method in A
+ # depends on a method on B.
+ for suite in suites:
+ suite.dependencies = []
+ suite.depth = 0
+ for test in self.getTests(suite):
+ methodname = getattr(test, '_testMethodName', None)
+ if methodname:
+ method = getattr(test, methodname)
+ depends_on = getattr(method, '_depends_on', None)
+ if depends_on:
+ for dep_suite in suites:
+ if depends_on in [getattr(t, '_testMethodName', None) for t in self.getTests(dep_suite)]:
+ if dep_suite not in suite.dependencies and \
+ dep_suite is not suite:
+ suite.dependencies.append(dep_suite)
+ break
+ else:
+ logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." %
+ (test, depends_on))
+
+ # Use brute-force topological sort to determine ordering. Sort by
+ # depth (higher depth = must run later), with original ordering to
+ # break ties.
+ def set_suite_depth(suite):
+ for dep in suite.dependencies:
+ new_depth = set_suite_depth(dep) + 1
+ if new_depth > suite.depth:
+ suite.depth = new_depth
+ return suite.depth
+
+ for index, suite in enumerate(suites):
+ set_suite_depth(suite)
+ suite.index = index
+
+ def cmp(a, b):
+ return (a > b) - (a < b)
+
+ def cmpfunc(a, b):
+ return cmp((a.depth, a.index), (b.depth, b.index))
+
+ suites.sort(key=functools.cmp_to_key(cmpfunc))
+
+ self.suite = testloader.suiteClass(suites)
+
+ return self.suite
+
+ def runTests(self):
+ logger.info("Test modules %s" % self.testslist)
+ if hasattr(self, "tagexp") and self.tagexp:
+ logger.info("Filter test cases by tags: %s" % self.tagexp)
+ logger.info("Found %s tests" % self.suite.countTestCases())
+ runner = unittest.TextTestRunner(verbosity=2)
+ if 'bb' in sys.modules:
+ runner.stream.write = custom_verbose
+
+ return runner.run(self.suite)
+
+class RuntimeTestContext(TestContext):
+ def __init__(self, d, target, exported=False):
+ super(RuntimeTestContext, self).__init__(d, exported)
+
+ self.target = target
+
+ self.pkgmanifest = {}
+ manifest = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"),
+ d.getVar("IMAGE_LINK_NAME") + ".manifest")
+ nomanifest = d.getVar("IMAGE_NO_MANIFEST")
+ if nomanifest is None or nomanifest != "1":
+ try:
+ with open(manifest) as f:
+ for line in f:
+ (pkg, arch, version) = line.strip().split()
+ self.pkgmanifest[pkg] = (version, arch)
+ except IOError as e:
+ bb.fatal("No package manifest file found. Did you build the image?\n%s" % e)
+
+ def _get_test_namespace(self):
+ return "runtime"
+
+ def _get_test_suites(self):
+ testsuites = []
+
+ manifests = (self.d.getVar("TEST_SUITES_MANIFEST") or '').split()
+ if manifests:
+ for manifest in manifests:
+ testsuites.extend(self._read_testlist(manifest,
+ self.d.getVar("TOPDIR")).split())
+
+ else:
+ testsuites = self.d.getVar("TEST_SUITES").split()
+
+ return testsuites
+
+ def _get_test_suites_required(self):
+ return [t for t in self.d.getVar("TEST_SUITES").split() if t != "auto"]
+
+ def loadTests(self):
+ super(RuntimeTestContext, self).loadTests()
+ if oeTest.hasPackage("procps"):
+ oeRuntimeTest.pscmd = "ps -ef"
+
+ def extract_packages(self):
+ """
+ Find packages that will be needed during runtime.
+ """
+
+ modules = self.getTestModules()
+ bbpaths = self.d.getVar("BBPATH").split(":")
+
+ shutil.rmtree(self.d.getVar("TEST_EXTRACTED_DIR"))
+ shutil.rmtree(self.d.getVar("TEST_PACKAGED_DIR"))
+ for module in modules:
+ json_file = self._getJsonFile(module)
+ if json_file:
+ needed_packages = self._getNeededPackages(json_file)
+ self._perform_package_extraction(needed_packages)
+
+ def _perform_package_extraction(self, needed_packages):
+ """
+ Extract packages that will be needed during runtime.
+ """
+
+ import oe.path
+
+ extracted_path = self.d.getVar("TEST_EXTRACTED_DIR")
+ packaged_path = self.d.getVar("TEST_PACKAGED_DIR")
+
+ for key,value in needed_packages.items():
+ packages = ()
+ if isinstance(value, dict):
+ packages = (value, )
+ elif isinstance(value, list):
+ packages = value
+ else:
+ bb.fatal("Failed to process needed packages for %s; "
+ "Value must be a dict or list" % key)
+
+ for package in packages:
+ pkg = package["pkg"]
+ rm = package.get("rm", False)
+ extract = package.get("extract", True)
+ if extract:
+ dst_dir = os.path.join(extracted_path, pkg)
+ else:
+ dst_dir = os.path.join(packaged_path)
+
+ # Extract package and copy it to TEST_EXTRACTED_DIR
+ pkg_dir = self._extract_in_tmpdir(pkg)
+ if extract:
+
+ # Same package used for more than one test,
+ # don't need to extract again.
+ if os.path.exists(dst_dir):
+ continue
+ oe.path.copytree(pkg_dir, dst_dir)
+ shutil.rmtree(pkg_dir)
+
+ # Copy package to TEST_PACKAGED_DIR
+ else:
+ self._copy_package(pkg)
+
+ def _getJsonFile(self, module):
+ """
+ Returns the path of the JSON file for a module, empty if doesn't exitst.
+ """
+
+ module_file = module.path
+ json_file = "%s.json" % module_file.rsplit(".", 1)[0]
+ if os.path.isfile(module_file) and os.path.isfile(json_file):
+ return json_file
+ else:
+ return ""
+
+ def _getNeededPackages(self, json_file, test=None):
+ """
+ Returns a dict with needed packages based on a JSON file.
+
+
+ If a test is specified it will return the dict just for that test.
+ """
+
+ import json
+
+ needed_packages = {}
+
+ with open(json_file) as f:
+ test_packages = json.load(f)
+ for key,value in test_packages.items():
+ needed_packages[key] = value
+
+ if test:
+ if test in needed_packages:
+ needed_packages = needed_packages[test]
+ else:
+ needed_packages = {}
+
+ return needed_packages
+
+ def _extract_in_tmpdir(self, pkg):
+ """"
+ Returns path to a temp directory where the package was
+ extracted without dependencies.
+ """
+
+ from oeqa.utils.package_manager import get_package_manager
+
+ pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg)
+ pm = get_package_manager(self.d, pkg_path)
+ extract_dir = pm.extract(pkg)
+ shutil.rmtree(pkg_path)
+
+ return extract_dir
+
+ def _copy_package(self, pkg):
+ """
+ Copy the RPM, DEB or IPK package to dst_dir
+ """
+
+ from oeqa.utils.package_manager import get_package_manager
+
+ pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg)
+ dst_dir = self.d.getVar("TEST_PACKAGED_DIR")
+ pm = get_package_manager(self.d, pkg_path)
+ pkg_info = pm.package_info(pkg)
+ file_path = pkg_info[pkg]["filepath"]
+ shutil.copy2(file_path, dst_dir)
+ shutil.rmtree(pkg_path)
+
+ def install_uninstall_packages(self, test_id, pkg_dir, install):
+ """
+ Check if the test requires a package and Install/Uninstall it in the DUT
+ """
+
+ test = test_id.split(".")[4]
+ module = self.getModulefromID(test_id)
+ json = self._getJsonFile(module)
+ if json:
+ needed_packages = self._getNeededPackages(json, test)
+ if needed_packages:
+ self._install_uninstall_packages(needed_packages, pkg_dir, install)
+
+ def _install_uninstall_packages(self, needed_packages, pkg_dir, install=True):
+ """
+ Install/Uninstall packages in the DUT without using a package manager
+ """
+
+ if isinstance(needed_packages, dict):
+ packages = [needed_packages]
+ elif isinstance(needed_packages, list):
+ packages = needed_packages
+
+ for package in packages:
+ pkg = package["pkg"]
+ rm = package.get("rm", False)
+ extract = package.get("extract", True)
+ src_dir = os.path.join(pkg_dir, pkg)
+
+ # Install package
+ if install and extract:
+ self.target.connection.copy_dir_to(src_dir, "/")
+
+ # Uninstall package
+ elif not install and rm:
+ self.target.connection.delete_dir_structure(src_dir, "/")
+
+class ImageTestContext(RuntimeTestContext):
+ def __init__(self, d, target, host_dumper):
+ super(ImageTestContext, self).__init__(d, target)
+
+ self.tagexp = d.getVar("TEST_SUITES_TAGS")
+
+ self.host_dumper = host_dumper
+
+ self.sigterm = False
+ self.origsigtermhandler = signal.getsignal(signal.SIGTERM)
+ signal.signal(signal.SIGTERM, self._sigterm_exception)
+
+ def _sigterm_exception(self, signum, stackframe):
+ bb.warn("TestImage received SIGTERM, shutting down...")
+ self.sigterm = True
+ self.target.stop()
+
+ def install_uninstall_packages(self, test_id, install=True):
+ """
+ Check if the test requires a package and Install/Uninstall it in the DUT
+ """
+
+ pkg_dir = self.d.getVar("TEST_EXTRACTED_DIR")
+ super(ImageTestContext, self).install_uninstall_packages(test_id, pkg_dir, install)
+
+class ExportTestContext(RuntimeTestContext):
+ def __init__(self, d, target, exported=False, parsedArgs={}):
+ """
+ This class is used when exporting tests and when are executed outside OE environment.
+
+ parsedArgs can contain the following:
+ - tag: Filter test by tag.
+ """
+ super(ExportTestContext, self).__init__(d, target, exported)
+
+ tag = parsedArgs.get("tag", None)
+ self.tagexp = tag if tag != None else d.getVar("TEST_SUITES_TAGS")
+
+ self.sigterm = None
+
+ def install_uninstall_packages(self, test_id, install=True):
+ """
+ Check if the test requires a package and Install/Uninstall it in the DUT
+ """
+
+ export_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
+ extracted_dir = self.d.getVar("TEST_EXPORT_EXTRACTED_DIR")
+ pkg_dir = os.path.join(export_dir, extracted_dir)
+ super(ExportTestContext, self).install_uninstall_packages(test_id, pkg_dir, install)