util: add helper functions to abstract embedded resource access
This commit is contained in:
parent
de9069ef1d
commit
b484c1b99b
@ -30,7 +30,17 @@ from collections import Counter
|
|||||||
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
|
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
|
||||||
from .__init__ import ANYWIN, EXE, MACOS, PY2, PY36, TYPE_CHECKING, VT100, WINDOWS
|
from .__init__ import (
|
||||||
|
ANYWIN,
|
||||||
|
EXE,
|
||||||
|
MACOS,
|
||||||
|
PY2,
|
||||||
|
PY36,
|
||||||
|
TYPE_CHECKING,
|
||||||
|
VT100,
|
||||||
|
WINDOWS,
|
||||||
|
EnvParams,
|
||||||
|
)
|
||||||
from .__version__ import S_BUILD_DT, S_VERSION
|
from .__version__ import S_BUILD_DT, S_VERSION
|
||||||
from .stolen import surrogateescape
|
from .stolen import surrogateescape
|
||||||
|
|
||||||
@ -3545,6 +3555,100 @@ def hidedir(dp) -> None:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
import importlib.resources as impresources
|
||||||
|
except ImportError:
|
||||||
|
impresources = None
|
||||||
|
|
||||||
|
|
||||||
|
def stat_resource(E: EnvParams, name: str):
|
||||||
|
path = os.path.join(E.mod, name)
|
||||||
|
if os.path.exists(path):
|
||||||
|
return os.stat(fsenc(path))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def has_resource(E: EnvParams, name: str):
|
||||||
|
if impresources:
|
||||||
|
try:
|
||||||
|
resources = impresources.files("copyparty")
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
res = resources.joinpath(name)
|
||||||
|
if res.is_file() or res.is_dir():
|
||||||
|
return True
|
||||||
|
|
||||||
|
return os.path.exists(os.path.join(E.mod, name))
|
||||||
|
|
||||||
|
|
||||||
|
def load_resource(E: EnvParams, name: str, mode="rb"):
|
||||||
|
if impresources:
|
||||||
|
try:
|
||||||
|
resources = impresources.files("copyparty")
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
res = resources.joinpath(name)
|
||||||
|
if res.is_file():
|
||||||
|
return res.open(mode)
|
||||||
|
|
||||||
|
return open(os.path.join(E.mod, name), mode)
|
||||||
|
|
||||||
|
|
||||||
|
def walk_resources(E: EnvParams, name: str):
|
||||||
|
def do_walk(base, r):
|
||||||
|
queue = [(base, r)]
|
||||||
|
while queue:
|
||||||
|
(b, r) = queue.pop(0)
|
||||||
|
d = []
|
||||||
|
f = []
|
||||||
|
for e in r.iterdir():
|
||||||
|
if e.is_dir():
|
||||||
|
d.append(e.name)
|
||||||
|
queue.append((os.path.join(b, e.name), e))
|
||||||
|
elif e.is_file():
|
||||||
|
f.append(e.name)
|
||||||
|
yield (b, d, f)
|
||||||
|
|
||||||
|
if impresources:
|
||||||
|
try:
|
||||||
|
resources = impresources.files("copyparty").joinpath(name)
|
||||||
|
except ImportError:
|
||||||
|
resources = None
|
||||||
|
else:
|
||||||
|
resources = None
|
||||||
|
|
||||||
|
base_path = os.path.join(E.mod, name)
|
||||||
|
visited_root = False
|
||||||
|
for (base, dirs, files) in os.walk(base_path):
|
||||||
|
res_dirs = []
|
||||||
|
res_files = []
|
||||||
|
if resources:
|
||||||
|
if base != base_path:
|
||||||
|
relbase = os.path.relpath(base, base_path)
|
||||||
|
resbase = resources.joinpath(relbase)
|
||||||
|
else:
|
||||||
|
visited_root = True
|
||||||
|
relbase = name
|
||||||
|
resbase = resources
|
||||||
|
if resbase.is_dir():
|
||||||
|
for r in resbase.iterdir():
|
||||||
|
if r.is_dir() and r.name not in dirs:
|
||||||
|
res_dirs.append(r.name)
|
||||||
|
elif r.is_file() and r.name not in files:
|
||||||
|
res_files.append(r.name)
|
||||||
|
|
||||||
|
yield (base, dirs + res_dirs, files + res_files)
|
||||||
|
for d in res_dirs:
|
||||||
|
for f in do_walk(relbase, res_dirs):
|
||||||
|
yield f
|
||||||
|
|
||||||
|
if resources and not visited_root:
|
||||||
|
for f in do_walk(name, resources):
|
||||||
|
yield f
|
||||||
|
|
||||||
|
|
||||||
class Pebkac(Exception):
|
class Pebkac(Exception):
|
||||||
def __init__(
|
def __init__(
|
||||||
self, code: int, msg: Optional[str] = None, log: Optional[str] = None
|
self, code: int, msg: Optional[str] = None, log: Optional[str] = None
|
||||||
|
Loading…
Reference in New Issue
Block a user