diff --git a/copyparty/util.py b/copyparty/util.py index bebcc57b..aab818ee 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -30,7 +30,17 @@ from collections import Counter from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network from queue import Queue -from .__init__ import ANYWIN, EXE, MACOS, PY2, PY36, TYPE_CHECKING, VT100, WINDOWS +from .__init__ import ( + ANYWIN, + EXE, + MACOS, + PY2, + PY36, + TYPE_CHECKING, + VT100, + WINDOWS, + EnvParams, +) from .__version__ import S_BUILD_DT, S_VERSION from .stolen import surrogateescape @@ -3545,6 +3555,100 @@ def hidedir(dp) -> None: pass +try: + import importlib.resources as impresources +except ImportError: + impresources = None + + +def stat_resource(E: EnvParams, name: str): + path = os.path.join(E.mod, name) + if os.path.exists(path): + return os.stat(fsenc(path)) + return None + + +def has_resource(E: EnvParams, name: str): + if impresources: + try: + resources = impresources.files("copyparty") + except ImportError: + pass + else: + res = resources.joinpath(name) + if res.is_file() or res.is_dir(): + return True + + return os.path.exists(os.path.join(E.mod, name)) + + +def load_resource(E: EnvParams, name: str, mode="rb"): + if impresources: + try: + resources = impresources.files("copyparty") + except ImportError: + pass + else: + res = resources.joinpath(name) + if res.is_file(): + return res.open(mode) + + return open(os.path.join(E.mod, name), mode) + + +def walk_resources(E: EnvParams, name: str): + def do_walk(base, r): + queue = [(base, r)] + while queue: + (b, r) = queue.pop(0) + d = [] + f = [] + for e in r.iterdir(): + if e.is_dir(): + d.append(e.name) + queue.append((os.path.join(b, e.name), e)) + elif e.is_file(): + f.append(e.name) + yield (b, d, f) + + if impresources: + try: + resources = impresources.files("copyparty").joinpath(name) + except ImportError: + resources = None + else: + resources = None + + base_path = os.path.join(E.mod, name) + visited_root = False + for (base, dirs, files) in os.walk(base_path): + res_dirs = [] + res_files = [] + if resources: + if base != base_path: + relbase = os.path.relpath(base, base_path) + resbase = resources.joinpath(relbase) + else: + visited_root = True + relbase = name + resbase = resources + if resbase.is_dir(): + for r in resbase.iterdir(): + if r.is_dir() and r.name not in dirs: + res_dirs.append(r.name) + elif r.is_file() and r.name not in files: + res_files.append(r.name) + + yield (base, dirs + res_dirs, files + res_files) + for d in res_dirs: + for f in do_walk(relbase, res_dirs): + yield f + + if resources and not visited_root: + for f in do_walk(name, resources): + yield f + + class Pebkac(Exception): def __init__( self, code: int, msg: Optional[str] = None, log: Optional[str] = None