util: add pkg_resources compatibility to resource abstraction

This commit is contained in:
Shiz 2024-09-17 13:23:39 +02:00
parent ee4bf22ec2
commit ea97e620bf

View File

@ -7,6 +7,7 @@ import binascii
import errno
import hashlib
import hmac
import io
import json
import logging
import math
@ -3567,15 +3568,26 @@ except ImportError:
import importlib_resources as impresources
except ImportError:
impresources = None
try:
import pkg_resources
except ImportError:
pkg_resources = None
def _pkg_resource_exists(pkg: str, name: str) -> bool:
if not pkg_resources:
return False
try:
return pkg_resources.resource_exists(pkg, name)
except NotImplementedError:
return False
def stat_resource(E: EnvParams, name: str):
path = os.path.join(E.mod, name)
if os.path.exists(path):
return os.stat(fsenc(path))
return None
def has_resource(E: EnvParams, name: str):
if impresources:
try:
@ -3587,6 +3599,10 @@ def has_resource(E: EnvParams, name: str):
if res.is_file() or res.is_dir():
return True
if pkg_resources:
if _pkg_resource_exists(E.pkg.__name__, name):
return True
return os.path.exists(os.path.join(E.mod, name))
@ -3601,11 +3617,18 @@ def load_resource(E: EnvParams, name: str, mode="rb"):
if res.is_file():
return res.open(mode)
if pkg_resources:
if _pkg_resource_exists(E.pkg.__name__, name) and not pkg_resources.resource_isdir(E.pkg.__name__, name):
stream = pkg_resources.resource_stream(E.pkg.__name__, name)
if 'b' not in mode:
stream = io.TextIOWrapper(stream)
return stream
return open(os.path.join(E.mod, name), mode)
def walk_resources(E: EnvParams, name: str):
def do_walk(base, r):
def walk_idirs(base, r):
queue = [(base, r)]
while queue:
(b, r) = queue.pop(0)
@ -3619,42 +3642,84 @@ def walk_resources(E: EnvParams, name: str):
f.append(e.name)
yield (b, d, f)
def walk_pdirs(base):
queue = [base]
while queue:
b = queue.pop(0)
d = []
f = []
for e in pkg_resources.resource_listdir(E.pkg.__name__, b):
if pkg_resources.resource_isdir(E.pkg.__name__, e):
d.append(e)
queue.append(os.path.join(b, e))
else:
f.append(e)
yield (b, d, f)
if impresources:
try:
resources = impresources.files(E.pkg).joinpath(name)
iresources = impresources.files(E.pkg)
except ImportError:
resources = None
iresources = None
else:
resources = None
iresources = None
base_path = os.path.join(E.mod, name)
visited_root = False
for (base, dirs, files) in os.walk(base_path):
res_dirs = []
res_files = []
if resources:
def walk_single(base, dirs, files, normalize_base=False, skip_ires=False, skip_pres=False):
if normalize_base:
if base != base_path:
relbase = os.path.relpath(base, base_path)
resbase = resources.joinpath(relbase)
else:
visited_root = True
relbase = name
resbase = resources
if resbase.is_dir():
for r in resbase.iterdir():
if r.is_dir() and r.name not in dirs:
res_dirs.append(r.name)
elif r.is_file() and r.name not in files:
res_files.append(r.name)
else:
relbase = base
yield (base, dirs + res_dirs, files + res_files)
for d in res_dirs:
for f in do_walk(relbase, res_dirs):
yield f
ires_dirs = []
if not skip_ires and iresources:
iresbase = iresources.joinpath(relbase)
if iresbase.is_dir():
for ientry in iresbase.iterdir():
if ientry.is_dir() and ientry.name not in dirs:
dirs.append(ientry.name)
ires_dirs.append(ientry.name)
elif ientry.is_file() and ientry.name not in files:
files.append(ientry.name)
if resources and not visited_root:
for f in do_walk(name, resources):
yield f
pres_dirs = []
if not skip_pres and _pkg_resource_exists(E.pkg.__name__, relbase) and pkg_resources.resource_isdir(E.pkg.__name__, relbase):
for pentry in pkg_resources.resource_listdir(E.pkg.__name__, relbase):
ppath = os.path.join(relbase, pentry)
if pkg_resources.resource_isdir(E.pkg.__name__, ppath):
if pentry not in dirs:
dirs.append(pentry)
pres_dirs.append(pentry)
else:
if pentry not in files:
files.append(pentry)
yield (base, dirs + ires_dirs + pres_dirs, files)
for d in ires_dirs:
for (ibase, idirs, ifiles) in walk_idirs(os.path.join(relbase, d), iresources.joinpath(relbase, d)):
yield from walk_single(ibase, idirs, ifiles, normalize_base=False, skip_ires=True, skip_pres=skip_pres)
for d in pres_dirs:
for (pbase, pdirs, pfiles) in walk_pdirs(os.path.join(relbase, d)):
yield (pbase, pdirs, pfiles)
normalize_base = False
skip_ires = skip_pres = False
if os.path.isdir(base_path):
walker = os.walk(base_path)
normalize_base = True
elif iresources and iresources.joinpath(name).is_dir():
walker = walk_idirs(name, iresources.joinpath(name))
skip_ires = True
elif pkg_resources and _pkg_resource_exists(E.pkg.__name__, name) and pkg_resources.resource_isdir(E.pkg.__name__, name):
walker = walk_pdirs(name)
skip_pres = True
for (base, dirs, files) in walker:
yield from walk_single(base, dirs, files, normalize_base=normalize_base, skip_ires=skip_ires, skip_pres=skip_pres)
class Pebkac(Exception):