diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2023-08-18 07:27:51 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2023-08-24 16:49:34 +0100 |
commit | 538011256964d0253f8e3ab7ff1d6fd62c7c2f89 (patch) | |
tree | 6e29f83e90b666867727b70292b7fd8f92747869 | |
parent | 21c17968f801e406ef7f328656587fadd9ef7f5d (diff) | |
download | bitbake-contrib-538011256964d0253f8e3ab7ff1d6fd62c7c2f89.tar.gz |
lib/bb: Add xattr and acl libraries
Adds Python wrappers around the xattr API from libc and the ACL API from
libacl.
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rwxr-xr-x | lib/bb/acl.py | 215 | ||||
-rwxr-xr-x | lib/bb/xattr.py | 126 |
2 files changed, 341 insertions, 0 deletions
diff --git a/lib/bb/acl.py b/lib/bb/acl.py new file mode 100755 index 000000000..0f41b275c --- /dev/null +++ b/lib/bb/acl.py @@ -0,0 +1,215 @@ +#! /usr/bin/env python3 +# +# Copyright 2023 by Garmin Ltd. or its subsidiaries +# +# SPDX-License-Identifier: MIT + + +import sys +import ctypes +import os +import errno +import pwd +import grp + +libacl = ctypes.CDLL("libacl.so.1", use_errno=True) + + +ACL_TYPE_ACCESS = 0x8000 +ACL_TYPE_DEFAULT = 0x4000 + +ACL_FIRST_ENTRY = 0 +ACL_NEXT_ENTRY = 1 + +ACL_UNDEFINED_TAG = 0x00 +ACL_USER_OBJ = 0x01 +ACL_USER = 0x02 +ACL_GROUP_OBJ = 0x04 +ACL_GROUP = 0x08 +ACL_MASK = 0x10 +ACL_OTHER = 0x20 + +ACL_READ = 0x04 +ACL_WRITE = 0x02 +ACL_EXECUTE = 0x01 + +acl_t = ctypes.c_void_p +acl_entry_t = ctypes.c_void_p +acl_permset_t = ctypes.c_void_p +acl_perm_t = ctypes.c_uint + +acl_tag_t = ctypes.c_int + +libacl.acl_free.argtypes = [acl_t] + + +def acl_free(acl): + libacl.acl_free(acl) + + +libacl.acl_get_file.restype = acl_t +libacl.acl_get_file.argtypes = [ctypes.c_char_p, ctypes.c_uint] + + +def acl_get_file(path, typ): + acl = libacl.acl_get_file(os.fsencode(path), typ) + if acl is None: + err = ctypes.get_errno() + raise OSError(err, os.strerror(err), str(path)) + + return acl + + +libacl.acl_get_entry.argtypes = [acl_t, ctypes.c_int, ctypes.c_void_p] + + +def acl_get_entry(acl, entry_id): + entry = acl_entry_t() + ret = libacl.acl_get_entry(acl, entry_id, ctypes.byref(entry)) + if ret < 0: + err = ctypes.get_errno() + raise OSError(err, os.strerror(err)) + + if ret == 0: + return None + + return entry + + +libacl.acl_get_tag_type.argtypes = [acl_entry_t, ctypes.c_void_p] + + +def acl_get_tag_type(entry_d): + tag = acl_tag_t() + ret = libacl.acl_get_tag_type(entry_d, ctypes.byref(tag)) + if ret < 0: + err = ctypes.get_errno() + raise OSError(err, os.strerror(err)) + return tag.value + + +libacl.acl_get_qualifier.restype = ctypes.c_void_p +libacl.acl_get_qualifier.argtypes = [acl_entry_t] + + +def acl_get_qualifier(entry_d): + ret = libacl.acl_get_qualifier(entry_d) + if ret is None: + err = ctypes.get_errno() + raise OSError(err, os.strerror(err)) + return ctypes.c_void_p(ret) + + +libacl.acl_get_permset.argtypes = [acl_entry_t, ctypes.c_void_p] + + +def acl_get_permset(entry_d): + permset = acl_permset_t() + ret = libacl.acl_get_permset(entry_d, ctypes.byref(permset)) + if ret < 0: + err = ctypes.get_errno() + raise OSError(err, os.strerror(err)) + + return permset + + +libacl.acl_get_perm.argtypes = [acl_permset_t, acl_perm_t] + + +def acl_get_perm(permset_d, perm): + ret = libacl.acl_get_perm(permset_d, perm) + if ret < 0: + err = ctypes.get_errno() + raise OSError(err, os.strerror(err)) + return bool(ret) + + +class Entry(object): + def __init__(self, tag, qualifier, mode): + self.tag = tag + self.qualifier = qualifier + self.mode = mode + + def __str__(self): + typ = "" + qual = "" + if self.tag == ACL_USER: + typ = "user" + qual = pwd.getpwuid(self.qualifier).pw_name + elif self.tag == ACL_GROUP: + typ = "group" + qual = grp.getgrgid(self.qualifier).gr_name + elif self.tag == ACL_USER_OBJ: + typ = "user" + elif self.tag == ACL_GROUP_OBJ: + typ = "group" + elif self.tag == ACL_MASK: + typ = "mask" + elif self.tag == ACL_OTHER: + typ = "other" + + r = "r" if self.mode & ACL_READ else "-" + w = "w" if self.mode & ACL_WRITE else "-" + x = "x" if self.mode & ACL_EXECUTE else "-" + + return f"{typ}:{qual}:{r}{w}{x}" + + +class ACL(object): + def __init__(self, acl): + self.acl = acl + + def __del__(self): + acl_free(self.acl) + + def entries(self): + entry_id = ACL_FIRST_ENTRY + while True: + entry = acl_get_entry(self.acl, entry_id) + if entry is None: + break + + permset = acl_get_permset(entry) + + mode = 0 + for m in (ACL_READ, ACL_WRITE, ACL_EXECUTE): + if acl_get_perm(permset, m): + mode |= m + + qualifier = None + tag = acl_get_tag_type(entry) + + if tag == ACL_USER or tag == ACL_GROUP: + qual = acl_get_qualifier(entry) + qualifier = ctypes.cast(qual, ctypes.POINTER(ctypes.c_int))[0] + + yield Entry(tag, qualifier, mode) + + entry_id = ACL_NEXT_ENTRY + + @classmethod + def from_path(cls, path, typ): + acl = acl_get_file(path, typ) + return cls(acl) + + +def main(): + import argparse + import pwd + import grp + from pathlib import Path + + parser = argparse.ArgumentParser() + parser.add_argument("path", help="File Path", type=Path) + + args = parser.parse_args() + + acl = ACL.from_path(args.path, ACL_TYPE_ACCESS) + for entry in acl.entries(): + print(str(entry)) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/lib/bb/xattr.py b/lib/bb/xattr.py new file mode 100755 index 000000000..7b634944a --- /dev/null +++ b/lib/bb/xattr.py @@ -0,0 +1,126 @@ +#! /usr/bin/env python3 +# +# Copyright 2023 by Garmin Ltd. or its subsidiaries +# +# SPDX-License-Identifier: MIT + +import sys +import ctypes +import os +import errno + +libc = ctypes.CDLL("libc.so.6", use_errno=True) +fsencoding = sys.getfilesystemencoding() + + +libc.listxattr.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_size_t] +libc.llistxattr.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_size_t] + + +def listxattr(path, follow=True): + func = libc.listxattr if follow else libc.llistxattr + + os_path = os.fsencode(path) + + while True: + length = func(os_path, None, 0) + + if length < 0: + err = ctypes.get_errno() + raise OSError(err, os.strerror(err), str(path)) + + if length == 0: + return [] + + arr = ctypes.create_string_buffer(length) + + read_length = func(os_path, arr, length) + if read_length != length: + # Race! + continue + + return [a.decode(fsencoding) for a in arr.raw.split(b"\x00") if a] + + +libc.getxattr.argtypes = [ + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_size_t, +] +libc.lgetxattr.argtypes = [ + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_size_t, +] + + +def getxattr(path, name, follow=True): + func = libc.getxattr if follow else libc.lgetxattr + + os_path = os.fsencode(path) + os_name = os.fsencode(name) + + while True: + length = func(os_path, os_name, None, 0) + + if length < 0: + err = ctypes.get_errno() + if err == errno.ENODATA: + return None + raise OSError(err, os.strerror(err), str(path)) + + if length == 0: + return "" + + arr = ctypes.create_string_buffer(length) + + read_length = func(os_path, os_name, arr, length) + if read_length != length: + # Race! + continue + + return arr.raw + + +def get_all_xattr(path, follow=True): + attrs = {} + + names = listxattr(path, follow) + + for name in names: + value = getxattr(path, name, follow) + if value is None: + # This can happen if a value is erased after listxattr is called, + # so ignore it + continue + attrs[name] = value + + return attrs + + +def main(): + import argparse + from pathlib import Path + + parser = argparse.ArgumentParser() + parser.add_argument("path", help="File Path", type=Path) + + args = parser.parse_args() + + attrs = get_all_xattr(args.path) + + for name, value in attrs.items(): + try: + value = value.decode(fsencoding) + except UnicodeDecodeError: + pass + + print(f"{name} = {value}") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) |