summaryrefslogtreecommitdiffstats
path: root/meta/lib/oe/package_manager/deb/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oe/package_manager/deb/__init__.py')
-rw-r--r--meta/lib/oe/package_manager/deb/__init__.py487
1 files changed, 487 insertions, 0 deletions
diff --git a/meta/lib/oe/package_manager/deb/__init__.py b/meta/lib/oe/package_manager/deb/__init__.py
index a2094304c9..3c214e59dd 100644
--- a/meta/lib/oe/package_manager/deb/__init__.py
+++ b/meta/lib/oe/package_manager/deb/__init__.py
@@ -1,3 +1,490 @@
#
# SPDX-License-Identifier: GPL-2.0-only
#
+
+from oe.package_manager import *
+
+class DpkgIndexer(Indexer):
+ def _create_configs(self):
+ bb.utils.mkdirhier(self.apt_conf_dir)
+ bb.utils.mkdirhier(os.path.join(self.apt_conf_dir, "lists", "partial"))
+ bb.utils.mkdirhier(os.path.join(self.apt_conf_dir, "apt.conf.d"))
+ bb.utils.mkdirhier(os.path.join(self.apt_conf_dir, "preferences.d"))
+
+ with open(os.path.join(self.apt_conf_dir, "preferences"),
+ "w") as prefs_file:
+ pass
+ with open(os.path.join(self.apt_conf_dir, "sources.list"),
+ "w+") as sources_file:
+ pass
+
+ with open(self.apt_conf_file, "w") as apt_conf:
+ with open(os.path.join(self.d.expand("${STAGING_ETCDIR_NATIVE}"),
+ "apt", "apt.conf.sample")) as apt_conf_sample:
+ for line in apt_conf_sample.read().split("\n"):
+ line = re.sub(r"#ROOTFS#", "/dev/null", line)
+ line = re.sub(r"#APTCONF#", self.apt_conf_dir, line)
+ apt_conf.write(line + "\n")
+
+ def write_index(self):
+ self.apt_conf_dir = os.path.join(self.d.expand("${APTCONF_TARGET}"),
+ "apt-ftparchive")
+ self.apt_conf_file = os.path.join(self.apt_conf_dir, "apt.conf")
+ self._create_configs()
+
+ os.environ['APT_CONFIG'] = self.apt_conf_file
+
+ pkg_archs = self.d.getVar('PACKAGE_ARCHS')
+ if pkg_archs is not None:
+ arch_list = pkg_archs.split()
+ sdk_pkg_archs = self.d.getVar('SDK_PACKAGE_ARCHS')
+ if sdk_pkg_archs is not None:
+ for a in sdk_pkg_archs.split():
+ if a not in pkg_archs:
+ arch_list.append(a)
+
+ all_mlb_pkg_arch_list = (self.d.getVar('ALL_MULTILIB_PACKAGE_ARCHS') or "").split()
+ arch_list.extend(arch for arch in all_mlb_pkg_arch_list if arch not in arch_list)
+
+ apt_ftparchive = bb.utils.which(os.getenv('PATH'), "apt-ftparchive")
+ gzip = bb.utils.which(os.getenv('PATH'), "gzip")
+
+ index_cmds = []
+ deb_dirs_found = False
+ for arch in arch_list:
+ arch_dir = os.path.join(self.deploy_dir, arch)
+ if not os.path.isdir(arch_dir):
+ continue
+
+ cmd = "cd %s; PSEUDO_UNLOAD=1 %s packages . > Packages;" % (arch_dir, apt_ftparchive)
+
+ cmd += "%s -fcn Packages > Packages.gz;" % gzip
+
+ with open(os.path.join(arch_dir, "Release"), "w+") as release:
+ release.write("Label: %s\n" % arch)
+
+ cmd += "PSEUDO_UNLOAD=1 %s release . >> Release" % apt_ftparchive
+
+ index_cmds.append(cmd)
+
+ deb_dirs_found = True
+
+ if not deb_dirs_found:
+ bb.note("There are no packages in %s" % self.deploy_dir)
+ return
+
+ oe.utils.multiprocess_launch(create_index, index_cmds, self.d)
+ if self.d.getVar('PACKAGE_FEED_SIGN') == '1':
+ raise NotImplementedError('Package feed signing not implementd for dpkg')
+
+class DpkgPkgsList(PkgsList):
+
+ def list_pkgs(self):
+ cmd = [bb.utils.which(os.getenv('PATH'), "dpkg-query"),
+ "--admindir=%s/var/lib/dpkg" % self.rootfs_dir,
+ "-W"]
+
+ cmd.append("-f=Package: ${Package}\nArchitecture: ${PackageArch}\nVersion: ${Version}\nFile: ${Package}_${Version}_${Architecture}.deb\nDepends: ${Depends}\nRecommends: ${Recommends}\nProvides: ${Provides}\n\n")
+
+ try:
+ cmd_output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).strip().decode("utf-8")
+ except subprocess.CalledProcessError as e:
+ bb.fatal("Cannot get the installed packages list. Command '%s' "
+ "returned %d:\n%s" % (' '.join(cmd), e.returncode, e.output.decode("utf-8")))
+
+ return opkg_query(cmd_output)
+
+class OpkgDpkgPM(PackageManager):
+ def __init__(self, d, target_rootfs):
+ """
+ This is an abstract class. Do not instantiate this directly.
+ """
+ super(OpkgDpkgPM, self).__init__(d, target_rootfs)
+
+ def package_info(self, pkg, cmd):
+ """
+ Returns a dictionary with the package info.
+
+ This method extracts the common parts for Opkg and Dpkg
+ """
+
+ try:
+ output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True).decode("utf-8")
+ except subprocess.CalledProcessError as e:
+ bb.fatal("Unable to list available packages. Command '%s' "
+ "returned %d:\n%s" % (cmd, e.returncode, e.output.decode("utf-8")))
+ return opkg_query(output)
+
+ def extract(self, pkg, pkg_info):
+ """
+ Returns the path to a tmpdir where resides the contents of a package.
+
+ Deleting the tmpdir is responsability of the caller.
+
+ This method extracts the common parts for Opkg and Dpkg
+ """
+
+ ar_cmd = bb.utils.which(os.getenv("PATH"), "ar")
+ tar_cmd = bb.utils.which(os.getenv("PATH"), "tar")
+ pkg_path = pkg_info[pkg]["filepath"]
+
+ if not os.path.isfile(pkg_path):
+ bb.fatal("Unable to extract package for '%s'."
+ "File %s doesn't exists" % (pkg, pkg_path))
+
+ tmp_dir = tempfile.mkdtemp()
+ current_dir = os.getcwd()
+ os.chdir(tmp_dir)
+ data_tar = 'data.tar.xz'
+
+ try:
+ cmd = [ar_cmd, 'x', pkg_path]
+ output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+ cmd = [tar_cmd, 'xf', data_tar]
+ output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ bb.utils.remove(tmp_dir, recurse=True)
+ bb.fatal("Unable to extract %s package. Command '%s' "
+ "returned %d:\n%s" % (pkg_path, ' '.join(cmd), e.returncode, e.output.decode("utf-8")))
+ except OSError as e:
+ bb.utils.remove(tmp_dir, recurse=True)
+ bb.fatal("Unable to extract %s package. Command '%s' "
+ "returned %d:\n%s at %s" % (pkg_path, ' '.join(cmd), e.errno, e.strerror, e.filename))
+
+ bb.note("Extracted %s to %s" % (pkg_path, tmp_dir))
+ bb.utils.remove(os.path.join(tmp_dir, "debian-binary"))
+ bb.utils.remove(os.path.join(tmp_dir, "control.tar.gz"))
+ os.chdir(current_dir)
+
+ return tmp_dir
+
+ def _handle_intercept_failure(self, registered_pkgs):
+ self.mark_packages("unpacked", registered_pkgs.split())
+
+class DpkgPM(OpkgDpkgPM):
+ def __init__(self, d, target_rootfs, archs, base_archs, apt_conf_dir=None, deb_repo_workdir="oe-rootfs-repo", filterbydependencies=True):
+ super(DpkgPM, self).__init__(d, target_rootfs)
+ self.deploy_dir = oe.path.join(self.d.getVar('WORKDIR'), deb_repo_workdir)
+
+ create_packages_dir(self.d, self.deploy_dir, d.getVar("DEPLOY_DIR_DEB"), "package_write_deb", filterbydependencies)
+
+ if apt_conf_dir is None:
+ self.apt_conf_dir = self.d.expand("${APTCONF_TARGET}/apt")
+ else:
+ self.apt_conf_dir = apt_conf_dir
+ self.apt_conf_file = os.path.join(self.apt_conf_dir, "apt.conf")
+ self.apt_get_cmd = bb.utils.which(os.getenv('PATH'), "apt-get")
+ self.apt_cache_cmd = bb.utils.which(os.getenv('PATH'), "apt-cache")
+
+ self.apt_args = d.getVar("APT_ARGS")
+
+ self.all_arch_list = archs.split()
+ all_mlb_pkg_arch_list = (self.d.getVar('ALL_MULTILIB_PACKAGE_ARCHS') or "").split()
+ self.all_arch_list.extend(arch for arch in all_mlb_pkg_arch_list if arch not in self.all_arch_list)
+
+ self._create_configs(archs, base_archs)
+
+ self.indexer = DpkgIndexer(self.d, self.deploy_dir)
+
+ def mark_packages(self, status_tag, packages=None):
+ """
+ This function will change a package's status in /var/lib/dpkg/status file.
+ If 'packages' is None then the new_status will be applied to all
+ packages
+ """
+ status_file = self.target_rootfs + "/var/lib/dpkg/status"
+
+ with open(status_file, "r") as sf:
+ with open(status_file + ".tmp", "w+") as tmp_sf:
+ if packages is None:
+ tmp_sf.write(re.sub(r"Package: (.*?)\n((?:[^\n]+\n)*?)Status: (.*)(?:unpacked|installed)",
+ r"Package: \1\n\2Status: \3%s" % status_tag,
+ sf.read()))
+ else:
+ if type(packages).__name__ != "list":
+ raise TypeError("'packages' should be a list object")
+
+ status = sf.read()
+ for pkg in packages:
+ status = re.sub(r"Package: %s\n((?:[^\n]+\n)*?)Status: (.*)(?:unpacked|installed)" % pkg,
+ r"Package: %s\n\1Status: \2%s" % (pkg, status_tag),
+ status)
+
+ tmp_sf.write(status)
+
+ os.rename(status_file + ".tmp", status_file)
+
+ def run_pre_post_installs(self, package_name=None):
+ """
+ Run the pre/post installs for package "package_name". If package_name is
+ None, then run all pre/post install scriptlets.
+ """
+ info_dir = self.target_rootfs + "/var/lib/dpkg/info"
+ ControlScript = collections.namedtuple("ControlScript", ["suffix", "name", "argument"])
+ control_scripts = [
+ ControlScript(".preinst", "Preinstall", "install"),
+ ControlScript(".postinst", "Postinstall", "configure")]
+ status_file = self.target_rootfs + "/var/lib/dpkg/status"
+ installed_pkgs = []
+
+ with open(status_file, "r") as status:
+ for line in status.read().split('\n'):
+ m = re.match(r"^Package: (.*)", line)
+ if m is not None:
+ installed_pkgs.append(m.group(1))
+
+ if package_name is not None and not package_name in installed_pkgs:
+ return
+
+ os.environ['D'] = self.target_rootfs
+ os.environ['OFFLINE_ROOT'] = self.target_rootfs
+ os.environ['IPKG_OFFLINE_ROOT'] = self.target_rootfs
+ os.environ['OPKG_OFFLINE_ROOT'] = self.target_rootfs
+ os.environ['INTERCEPT_DIR'] = self.intercepts_dir
+ os.environ['NATIVE_ROOT'] = self.d.getVar('STAGING_DIR_NATIVE')
+
+ for pkg_name in installed_pkgs:
+ for control_script in control_scripts:
+ p_full = os.path.join(info_dir, pkg_name + control_script.suffix)
+ if os.path.exists(p_full):
+ try:
+ bb.note("Executing %s for package: %s ..." %
+ (control_script.name.lower(), pkg_name))
+ output = subprocess.check_output([p_full, control_script.argument],
+ stderr=subprocess.STDOUT).decode("utf-8")
+ bb.note(output)
+ except subprocess.CalledProcessError as e:
+ bb.warn("%s for package %s failed with %d:\n%s" %
+ (control_script.name, pkg_name, e.returncode,
+ e.output.decode("utf-8")))
+ failed_postinsts_abort([pkg_name], self.d.expand("${T}/log.do_${BB_CURRENTTASK}"))
+
+ def update(self):
+ os.environ['APT_CONFIG'] = self.apt_conf_file
+
+ self.deploy_dir_lock()
+
+ cmd = "%s update" % self.apt_get_cmd
+
+ try:
+ subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ bb.fatal("Unable to update the package index files. Command '%s' "
+ "returned %d:\n%s" % (e.cmd, e.returncode, e.output.decode("utf-8")))
+
+ self.deploy_dir_unlock()
+
+ def install(self, pkgs, attempt_only=False):
+ if attempt_only and len(pkgs) == 0:
+ return
+
+ os.environ['APT_CONFIG'] = self.apt_conf_file
+
+ cmd = "%s %s install --force-yes --allow-unauthenticated --no-remove %s" % \
+ (self.apt_get_cmd, self.apt_args, ' '.join(pkgs))
+
+ try:
+ bb.note("Installing the following packages: %s" % ' '.join(pkgs))
+ subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ (bb.fatal, bb.warn)[attempt_only]("Unable to install packages. "
+ "Command '%s' returned %d:\n%s" %
+ (cmd, e.returncode, e.output.decode("utf-8")))
+
+ # rename *.dpkg-new files/dirs
+ for root, dirs, files in os.walk(self.target_rootfs):
+ for dir in dirs:
+ new_dir = re.sub(r"\.dpkg-new", "", dir)
+ if dir != new_dir:
+ os.rename(os.path.join(root, dir),
+ os.path.join(root, new_dir))
+
+ for file in files:
+ new_file = re.sub(r"\.dpkg-new", "", file)
+ if file != new_file:
+ os.rename(os.path.join(root, file),
+ os.path.join(root, new_file))
+
+
+ def remove(self, pkgs, with_dependencies=True):
+ if not pkgs:
+ return
+
+ if with_dependencies:
+ os.environ['APT_CONFIG'] = self.apt_conf_file
+ cmd = "%s purge %s" % (self.apt_get_cmd, ' '.join(pkgs))
+ else:
+ cmd = "%s --admindir=%s/var/lib/dpkg --instdir=%s" \
+ " -P --force-depends %s" % \
+ (bb.utils.which(os.getenv('PATH'), "dpkg"),
+ self.target_rootfs, self.target_rootfs, ' '.join(pkgs))
+
+ try:
+ subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ bb.fatal("Unable to remove packages. Command '%s' "
+ "returned %d:\n%s" % (e.cmd, e.returncode, e.output.decode("utf-8")))
+
+ def write_index(self):
+ self.deploy_dir_lock()
+
+ result = self.indexer.write_index()
+
+ self.deploy_dir_unlock()
+
+ if result is not None:
+ bb.fatal(result)
+
+ def insert_feeds_uris(self, feed_uris, feed_base_paths, feed_archs):
+ if feed_uris == "":
+ return
+
+ sources_conf = os.path.join("%s/etc/apt/sources.list"
+ % self.target_rootfs)
+ arch_list = []
+
+ if feed_archs is None:
+ for arch in self.all_arch_list:
+ if not os.path.exists(os.path.join(self.deploy_dir, arch)):
+ continue
+ arch_list.append(arch)
+ else:
+ arch_list = feed_archs.split()
+
+ feed_uris = self.construct_uris(feed_uris.split(), feed_base_paths.split())
+
+ with open(sources_conf, "w+") as sources_file:
+ for uri in feed_uris:
+ if arch_list:
+ for arch in arch_list:
+ bb.note('Adding dpkg channel at (%s)' % uri)
+ sources_file.write("deb %s/%s ./\n" %
+ (uri, arch))
+ else:
+ bb.note('Adding dpkg channel at (%s)' % uri)
+ sources_file.write("deb %s ./\n" % uri)
+
+ def _create_configs(self, archs, base_archs):
+ base_archs = re.sub(r"_", r"-", base_archs)
+
+ if os.path.exists(self.apt_conf_dir):
+ bb.utils.remove(self.apt_conf_dir, True)
+
+ bb.utils.mkdirhier(self.apt_conf_dir)
+ bb.utils.mkdirhier(self.apt_conf_dir + "/lists/partial/")
+ bb.utils.mkdirhier(self.apt_conf_dir + "/apt.conf.d/")
+ bb.utils.mkdirhier(self.apt_conf_dir + "/preferences.d/")
+
+ arch_list = []
+ for arch in self.all_arch_list:
+ if not os.path.exists(os.path.join(self.deploy_dir, arch)):
+ continue
+ arch_list.append(arch)
+
+ with open(os.path.join(self.apt_conf_dir, "preferences"), "w+") as prefs_file:
+ priority = 801
+ for arch in arch_list:
+ prefs_file.write(
+ "Package: *\n"
+ "Pin: release l=%s\n"
+ "Pin-Priority: %d\n\n" % (arch, priority))
+
+ priority += 5
+
+ pkg_exclude = self.d.getVar('PACKAGE_EXCLUDE') or ""
+ for pkg in pkg_exclude.split():
+ prefs_file.write(
+ "Package: %s\n"
+ "Pin: release *\n"
+ "Pin-Priority: -1\n\n" % pkg)
+
+ arch_list.reverse()
+
+ with open(os.path.join(self.apt_conf_dir, "sources.list"), "w+") as sources_file:
+ for arch in arch_list:
+ sources_file.write("deb file:%s/ ./\n" %
+ os.path.join(self.deploy_dir, arch))
+
+ base_arch_list = base_archs.split()
+ multilib_variants = self.d.getVar("MULTILIB_VARIANTS");
+ for variant in multilib_variants.split():
+ localdata = bb.data.createCopy(self.d)
+ variant_tune = localdata.getVar("DEFAULTTUNE_virtclass-multilib-" + variant, False)
+ orig_arch = localdata.getVar("DPKG_ARCH")
+ localdata.setVar("DEFAULTTUNE", variant_tune)
+ variant_arch = localdata.getVar("DPKG_ARCH")
+ if variant_arch not in base_arch_list:
+ base_arch_list.append(variant_arch)
+
+ with open(self.apt_conf_file, "w+") as apt_conf:
+ with open(self.d.expand("${STAGING_ETCDIR_NATIVE}/apt/apt.conf.sample")) as apt_conf_sample:
+ for line in apt_conf_sample.read().split("\n"):
+ match_arch = re.match(r" Architecture \".*\";$", line)
+ architectures = ""
+ if match_arch:
+ for base_arch in base_arch_list:
+ architectures += "\"%s\";" % base_arch
+ apt_conf.write(" Architectures {%s};\n" % architectures);
+ apt_conf.write(" Architecture \"%s\";\n" % base_archs)
+ else:
+ line = re.sub(r"#ROOTFS#", self.target_rootfs, line)
+ line = re.sub(r"#APTCONF#", self.apt_conf_dir, line)
+ apt_conf.write(line + "\n")
+
+ target_dpkg_dir = "%s/var/lib/dpkg" % self.target_rootfs
+ bb.utils.mkdirhier(os.path.join(target_dpkg_dir, "info"))
+
+ bb.utils.mkdirhier(os.path.join(target_dpkg_dir, "updates"))
+
+ if not os.path.exists(os.path.join(target_dpkg_dir, "status")):
+ open(os.path.join(target_dpkg_dir, "status"), "w+").close()
+ if not os.path.exists(os.path.join(target_dpkg_dir, "available")):
+ open(os.path.join(target_dpkg_dir, "available"), "w+").close()
+
+ def remove_packaging_data(self):
+ bb.utils.remove(self.target_rootfs + self.d.getVar('opkglibdir'), True)
+ bb.utils.remove(self.target_rootfs + "/var/lib/dpkg/", True)
+
+ def fix_broken_dependencies(self):
+ os.environ['APT_CONFIG'] = self.apt_conf_file
+
+ cmd = "%s %s --allow-unauthenticated -f install" % (self.apt_get_cmd, self.apt_args)
+
+ try:
+ subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ bb.fatal("Cannot fix broken dependencies. Command '%s' "
+ "returned %d:\n%s" % (cmd, e.returncode, e.output.decode("utf-8")))
+
+ def list_installed(self):
+ return DpkgPkgsList(self.d, self.target_rootfs).list_pkgs()
+
+ def package_info(self, pkg):
+ """
+ Returns a dictionary with the package info.
+ """
+ cmd = "%s show %s" % (self.apt_cache_cmd, pkg)
+ pkg_info = super(DpkgPM, self).package_info(pkg, cmd)
+
+ pkg_arch = pkg_info[pkg]["pkgarch"]
+ pkg_filename = pkg_info[pkg]["filename"]
+ pkg_info[pkg]["filepath"] = \
+ os.path.join(self.deploy_dir, pkg_arch, pkg_filename)
+
+ return pkg_info
+
+ def extract(self, pkg):
+ """
+ Returns the path to a tmpdir where resides the contents of a package.
+
+ Deleting the tmpdir is responsability of the caller.
+ """
+ pkg_info = self.package_info(pkg)
+ if not pkg_info:
+ bb.fatal("Unable to get information for package '%s' while "
+ "trying to extract the package." % pkg)
+
+ tmp_dir = super(DpkgPM, self).extract(pkg, pkg_info)
+ bb.utils.remove(os.path.join(tmp_dir, "data.tar.xz"))
+
+ return tmp_dir