Compare commits
1 Commits
hovudstrau
...
no-broker
Author | SHA1 | Date | |
---|---|---|---|
![]() |
7781e0529d |
@ -841,7 +841,6 @@ def add_general(ap, nc, srvname):
|
||||
ap2 = ap.add_argument_group('general options')
|
||||
ap2.add_argument("-c", metavar="PATH", type=u, action="append", help="add config file")
|
||||
ap2.add_argument("-nc", metavar="NUM", type=int, default=nc, help="max num clients")
|
||||
ap2.add_argument("-j", metavar="CORES", type=int, default=1, help="max num cpu cores, 0=all")
|
||||
ap2.add_argument("-a", metavar="ACCT", type=u, action="append", help="add account, \033[33mUSER\033[0m:\033[33mPASS\033[0m; example [\033[32med:wark\033[0m]")
|
||||
ap2.add_argument("-v", metavar="VOL", type=u, action="append", help="add volume, \033[33mSRC\033[0m:\033[33mDST\033[0m:\033[33mFLAG\033[0m; examples [\033[32m.::r\033[0m], [\033[32m/mnt/nas/music:/music:r:aed\033[0m], see --help-accounts")
|
||||
ap2.add_argument("--grp", metavar="G:N,N", type=u, action="append", help="add group, \033[33mNAME\033[0m:\033[33mUSER1\033[0m,\033[33mUSER2\033[0m,\033[33m...\033[0m; example [\033[32madmins:ed,foo,bar\033[0m]")
|
||||
@ -881,7 +880,7 @@ def add_upload(ap):
|
||||
ap2.add_argument("--blank-wt", metavar="SEC", type=int, default=300, help="file write grace period (any client can write to a blank file last-modified more recently than \033[33mSEC\033[0m seconds ago)")
|
||||
ap2.add_argument("--reg-cap", metavar="N", type=int, default=38400, help="max number of uploads to keep in memory when running without \033[33m-e2d\033[0m; roughly 1 MiB RAM per 600")
|
||||
ap2.add_argument("--no-fpool", action="store_true", help="disable file-handle pooling -- instead, repeatedly close and reopen files during upload (bad idea to enable this on windows and/or cow filesystems)")
|
||||
ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (multiprocessing, filesystems lacking sparse-files support, ...)")
|
||||
ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (filesystems lacking sparse-files support, ...)")
|
||||
ap2.add_argument("--hardlink", action="store_true", help="prefer hardlinks instead of symlinks when possible (within same filesystem) (volflag=hardlink)")
|
||||
ap2.add_argument("--never-symlink", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made (volflag=neversymlink)")
|
||||
ap2.add_argument("--no-dedup", action="store_true", help="disable symlink/hardlink creation; copy file contents instead (volflag=copydupes)")
|
||||
|
@ -44,9 +44,7 @@ if True: # pylint: disable=using-constant-test
|
||||
from .util import NamedLogger, RootLogger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .broker_mp import BrokerMp
|
||||
from .broker_thr import BrokerThr
|
||||
from .broker_util import BrokerCli
|
||||
from .svchub import SvcHub
|
||||
|
||||
# Vflags: TypeAlias = dict[str, str | bool | float | list[str]]
|
||||
# Vflags: TypeAlias = dict[str, Any]
|
||||
@ -141,9 +139,9 @@ class Lim(object):
|
||||
sz: int,
|
||||
ptop: str,
|
||||
abspath: str,
|
||||
broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]] = None,
|
||||
hub: Optional["SvcHub"] = None,
|
||||
reg: Optional[dict[str, dict[str, Any]]] = None,
|
||||
volgetter: str = "up2k.get_volsize",
|
||||
volgetter: str = "get_volsize",
|
||||
) -> tuple[str, str]:
|
||||
if reg is not None and self.reg is None:
|
||||
self.reg = reg
|
||||
@ -154,7 +152,7 @@ class Lim(object):
|
||||
self.chk_rem(rem)
|
||||
if sz != -1:
|
||||
self.chk_sz(sz)
|
||||
self.chk_vsz(broker, ptop, sz, volgetter)
|
||||
self.chk_vsz(hub, ptop, sz, volgetter)
|
||||
self.chk_df(abspath, sz) # side effects; keep last-ish
|
||||
|
||||
ap2, vp2 = self.rot(abspath)
|
||||
@ -172,16 +170,15 @@ class Lim(object):
|
||||
|
||||
def chk_vsz(
|
||||
self,
|
||||
broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]],
|
||||
hub: Optional["SvcHub"],
|
||||
ptop: str,
|
||||
sz: int,
|
||||
volgetter: str = "up2k.get_volsize",
|
||||
) -> None:
|
||||
if not broker or not self.vbmax + self.vnmax:
|
||||
if not hub or not self.vbmax + self.vnmax:
|
||||
return
|
||||
|
||||
x = broker.ask(volgetter, ptop)
|
||||
nbytes, nfiles = x.get()
|
||||
nbytes, nfiles = hub.up2k.getattr(volgetter)(ptop)
|
||||
|
||||
if self.vbmax and self.vbmax < nbytes + sz:
|
||||
raise Pebkac(400, "volume has exceeded max size")
|
||||
@ -815,9 +812,7 @@ class AuthSrv(object):
|
||||
|
||||
yield prev, True
|
||||
|
||||
def idp_checkin(
|
||||
self, broker: Optional["BrokerCli"], uname: str, gname: str
|
||||
) -> bool:
|
||||
def idp_checkin(self, hub: Optional["SvcHub"], uname: str, gname: str) -> bool:
|
||||
if uname in self.acct:
|
||||
return False
|
||||
|
||||
@ -837,12 +832,12 @@ class AuthSrv(object):
|
||||
t = "reinitializing due to new user from IdP: [%s:%s]"
|
||||
self.log(t % (uname, gnames), 3)
|
||||
|
||||
if not broker:
|
||||
if not hub:
|
||||
# only true for tests
|
||||
self._reload()
|
||||
return True
|
||||
|
||||
broker.ask("_reload_blocking", False).get()
|
||||
hub._reload_blocking(False)
|
||||
return True
|
||||
|
||||
def _map_volume_idp(
|
||||
|
@ -1,141 +0,0 @@
|
||||
# coding: utf-8
|
||||
from __future__ import print_function, unicode_literals
|
||||
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import queue
|
||||
|
||||
from .__init__ import CORES, TYPE_CHECKING
|
||||
from .broker_mpw import MpWorker
|
||||
from .broker_util import ExceptionalQueue, try_exec
|
||||
from .util import Daemon, mp
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .svchub import SvcHub
|
||||
|
||||
if True: # pylint: disable=using-constant-test
|
||||
from typing import Any
|
||||
|
||||
|
||||
class MProcess(mp.Process):
|
||||
def __init__(
|
||||
self,
|
||||
q_pend: queue.Queue[tuple[int, str, list[Any]]],
|
||||
q_yield: queue.Queue[tuple[int, str, list[Any]]],
|
||||
target: Any,
|
||||
args: Any,
|
||||
) -> None:
|
||||
super(MProcess, self).__init__(target=target, args=args)
|
||||
self.q_pend = q_pend
|
||||
self.q_yield = q_yield
|
||||
|
||||
|
||||
class BrokerMp(object):
|
||||
"""external api; manages MpWorkers"""
|
||||
|
||||
def __init__(self, hub: "SvcHub") -> None:
|
||||
self.hub = hub
|
||||
self.log = hub.log
|
||||
self.args = hub.args
|
||||
|
||||
self.procs = []
|
||||
self.mutex = threading.Lock()
|
||||
|
||||
self.num_workers = self.args.j or CORES
|
||||
self.log("broker", "booting {} subprocesses".format(self.num_workers))
|
||||
for n in range(1, self.num_workers + 1):
|
||||
q_pend: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(1) # type: ignore
|
||||
q_yield: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(64) # type: ignore
|
||||
|
||||
proc = MProcess(q_pend, q_yield, MpWorker, (q_pend, q_yield, self.args, n))
|
||||
Daemon(self.collector, "mp-sink-{}".format(n), (proc,))
|
||||
self.procs.append(proc)
|
||||
proc.start()
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self.log("broker", "shutting down")
|
||||
for n, proc in enumerate(self.procs):
|
||||
thr = threading.Thread(
|
||||
target=proc.q_pend.put((0, "shutdown", [])),
|
||||
name="mp-shutdown-{}-{}".format(n, len(self.procs)),
|
||||
)
|
||||
thr.start()
|
||||
|
||||
with self.mutex:
|
||||
procs = self.procs
|
||||
self.procs = []
|
||||
|
||||
while procs:
|
||||
if procs[-1].is_alive():
|
||||
time.sleep(0.05)
|
||||
continue
|
||||
|
||||
procs.pop()
|
||||
|
||||
def reload(self) -> None:
|
||||
self.log("broker", "reloading")
|
||||
for _, proc in enumerate(self.procs):
|
||||
proc.q_pend.put((0, "reload", []))
|
||||
|
||||
def collector(self, proc: MProcess) -> None:
|
||||
"""receive message from hub in other process"""
|
||||
while True:
|
||||
msg = proc.q_yield.get()
|
||||
retq_id, dest, args = msg
|
||||
|
||||
if dest == "log":
|
||||
self.log(*args)
|
||||
|
||||
elif dest == "retq":
|
||||
# response from previous ipc call
|
||||
raise Exception("invalid broker_mp usage")
|
||||
|
||||
else:
|
||||
# new ipc invoking managed service in hub
|
||||
try:
|
||||
obj = self.hub
|
||||
for node in dest.split("."):
|
||||
obj = getattr(obj, node)
|
||||
|
||||
# TODO will deadlock if dest performs another ipc
|
||||
rv = try_exec(retq_id, obj, *args)
|
||||
except:
|
||||
rv = ["exception", "stack", traceback.format_exc()]
|
||||
|
||||
if retq_id:
|
||||
proc.q_pend.put((retq_id, "retq", rv))
|
||||
|
||||
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
|
||||
|
||||
# new non-ipc invoking managed service in hub
|
||||
obj = self.hub
|
||||
for node in dest.split("."):
|
||||
obj = getattr(obj, node)
|
||||
|
||||
rv = try_exec(True, obj, *args)
|
||||
|
||||
retq = ExceptionalQueue(1)
|
||||
retq.put(rv)
|
||||
return retq
|
||||
|
||||
def say(self, dest: str, *args: Any) -> None:
|
||||
"""
|
||||
send message to non-hub component in other process,
|
||||
returns a Queue object which eventually contains the response if want_retval
|
||||
(not-impl here since nothing uses it yet)
|
||||
"""
|
||||
if dest == "listen":
|
||||
for p in self.procs:
|
||||
p.q_pend.put((0, dest, [args[0], len(self.procs)]))
|
||||
|
||||
elif dest == "set_netdevs":
|
||||
for p in self.procs:
|
||||
p.q_pend.put((0, dest, list(args)))
|
||||
|
||||
elif dest == "cb_httpsrv_up":
|
||||
self.hub.cb_httpsrv_up()
|
||||
|
||||
else:
|
||||
raise Exception("what is " + str(dest))
|
@ -1,123 +0,0 @@
|
||||
# coding: utf-8
|
||||
from __future__ import print_function, unicode_literals
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
|
||||
import queue
|
||||
|
||||
from .__init__ import ANYWIN
|
||||
from .authsrv import AuthSrv
|
||||
from .broker_util import BrokerCli, ExceptionalQueue
|
||||
from .httpsrv import HttpSrv
|
||||
from .util import FAKE_MP, Daemon, HMaccas
|
||||
|
||||
if True: # pylint: disable=using-constant-test
|
||||
from types import FrameType
|
||||
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
|
||||
class MpWorker(BrokerCli):
|
||||
"""one single mp instance"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
q_pend: queue.Queue[tuple[int, str, list[Any]]],
|
||||
q_yield: queue.Queue[tuple[int, str, list[Any]]],
|
||||
args: argparse.Namespace,
|
||||
n: int,
|
||||
) -> None:
|
||||
super(MpWorker, self).__init__()
|
||||
|
||||
self.q_pend = q_pend
|
||||
self.q_yield = q_yield
|
||||
self.args = args
|
||||
self.n = n
|
||||
|
||||
self.log = self._log_disabled if args.q and not args.lo else self._log_enabled
|
||||
|
||||
self.retpend: dict[int, Any] = {}
|
||||
self.retpend_mutex = threading.Lock()
|
||||
self.mutex = threading.Lock()
|
||||
|
||||
# we inherited signal_handler from parent,
|
||||
# replace it with something harmless
|
||||
if not FAKE_MP:
|
||||
sigs = [signal.SIGINT, signal.SIGTERM]
|
||||
if not ANYWIN:
|
||||
sigs.append(signal.SIGUSR1)
|
||||
|
||||
for sig in sigs:
|
||||
signal.signal(sig, self.signal_handler)
|
||||
|
||||
# starting to look like a good idea
|
||||
self.asrv = AuthSrv(args, None, False)
|
||||
|
||||
# instantiate all services here (TODO: inheritance?)
|
||||
self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8)
|
||||
self.httpsrv = HttpSrv(self, n)
|
||||
|
||||
# on winxp and some other platforms,
|
||||
# use thr.join() to block all signals
|
||||
Daemon(self.main, "mpw-main").join()
|
||||
|
||||
def signal_handler(self, sig: Optional[int], frame: Optional[FrameType]) -> None:
|
||||
# print('k')
|
||||
pass
|
||||
|
||||
def _log_enabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None:
|
||||
self.q_yield.put((0, "log", [src, msg, c]))
|
||||
|
||||
def _log_disabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None:
|
||||
pass
|
||||
|
||||
def logw(self, msg: str, c: Union[int, str] = 0) -> None:
|
||||
self.log("mp%d" % (self.n,), msg, c)
|
||||
|
||||
def main(self) -> None:
|
||||
while True:
|
||||
retq_id, dest, args = self.q_pend.get()
|
||||
|
||||
# self.logw("work: [{}]".format(d[0]))
|
||||
if dest == "shutdown":
|
||||
self.httpsrv.shutdown()
|
||||
self.logw("ok bye")
|
||||
sys.exit(0)
|
||||
return
|
||||
|
||||
elif dest == "reload":
|
||||
self.logw("mpw.asrv reloading")
|
||||
self.asrv.reload()
|
||||
self.logw("mpw.asrv reloaded")
|
||||
|
||||
elif dest == "listen":
|
||||
self.httpsrv.listen(args[0], args[1])
|
||||
|
||||
elif dest == "set_netdevs":
|
||||
self.httpsrv.set_netdevs(args[0])
|
||||
|
||||
elif dest == "retq":
|
||||
# response from previous ipc call
|
||||
with self.retpend_mutex:
|
||||
retq = self.retpend.pop(retq_id)
|
||||
|
||||
retq.put(args)
|
||||
|
||||
else:
|
||||
raise Exception("what is " + str(dest))
|
||||
|
||||
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
|
||||
retq = ExceptionalQueue(1)
|
||||
retq_id = id(retq)
|
||||
with self.retpend_mutex:
|
||||
self.retpend[retq_id] = retq
|
||||
|
||||
self.q_yield.put((retq_id, dest, list(args)))
|
||||
return retq
|
||||
|
||||
def say(self, dest: str, *args: Any) -> None:
|
||||
self.q_yield.put((0, dest, list(args)))
|
@ -1,73 +0,0 @@
|
||||
# coding: utf-8
|
||||
from __future__ import print_function, unicode_literals
|
||||
|
||||
import os
|
||||
import threading
|
||||
|
||||
from .__init__ import TYPE_CHECKING
|
||||
from .broker_util import BrokerCli, ExceptionalQueue, try_exec
|
||||
from .httpsrv import HttpSrv
|
||||
from .util import HMaccas
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .svchub import SvcHub
|
||||
|
||||
if True: # pylint: disable=using-constant-test
|
||||
from typing import Any
|
||||
|
||||
|
||||
class BrokerThr(BrokerCli):
|
||||
"""external api; behaves like BrokerMP but using plain threads"""
|
||||
|
||||
def __init__(self, hub: "SvcHub") -> None:
|
||||
super(BrokerThr, self).__init__()
|
||||
|
||||
self.hub = hub
|
||||
self.log = hub.log
|
||||
self.args = hub.args
|
||||
self.asrv = hub.asrv
|
||||
|
||||
self.mutex = threading.Lock()
|
||||
self.num_workers = 1
|
||||
|
||||
# instantiate all services here (TODO: inheritance?)
|
||||
self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8)
|
||||
self.httpsrv = HttpSrv(self, None)
|
||||
self.reload = self.noop
|
||||
|
||||
def shutdown(self) -> None:
|
||||
# self.log("broker", "shutting down")
|
||||
self.httpsrv.shutdown()
|
||||
|
||||
def noop(self) -> None:
|
||||
pass
|
||||
|
||||
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
|
||||
|
||||
# new ipc invoking managed service in hub
|
||||
obj = self.hub
|
||||
for node in dest.split("."):
|
||||
obj = getattr(obj, node)
|
||||
|
||||
rv = try_exec(True, obj, *args)
|
||||
|
||||
# pretend we're broker_mp
|
||||
retq = ExceptionalQueue(1)
|
||||
retq.put(rv)
|
||||
return retq
|
||||
|
||||
def say(self, dest: str, *args: Any) -> None:
|
||||
if dest == "listen":
|
||||
self.httpsrv.listen(args[0], 1)
|
||||
return
|
||||
|
||||
if dest == "set_netdevs":
|
||||
self.httpsrv.set_netdevs(args[0])
|
||||
return
|
||||
|
||||
# new ipc invoking managed service in hub
|
||||
obj = self.hub
|
||||
for node in dest.split("."):
|
||||
obj = getattr(obj, node)
|
||||
|
||||
try_exec(False, obj, *args)
|
@ -1,72 +0,0 @@
|
||||
# coding: utf-8
|
||||
from __future__ import print_function, unicode_literals
|
||||
|
||||
import argparse
|
||||
import traceback
|
||||
|
||||
from queue import Queue
|
||||
|
||||
from .__init__ import TYPE_CHECKING
|
||||
from .authsrv import AuthSrv
|
||||
from .util import HMaccas, Pebkac
|
||||
|
||||
if True: # pylint: disable=using-constant-test
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from .util import RootLogger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .httpsrv import HttpSrv
|
||||
|
||||
|
||||
class ExceptionalQueue(Queue, object):
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None) -> Any:
|
||||
rv = super(ExceptionalQueue, self).get(block, timeout)
|
||||
|
||||
if isinstance(rv, list):
|
||||
if rv[0] == "exception":
|
||||
if rv[1] == "pebkac":
|
||||
raise Pebkac(*rv[2:])
|
||||
else:
|
||||
raise Exception(rv[2])
|
||||
|
||||
return rv
|
||||
|
||||
|
||||
class BrokerCli(object):
|
||||
"""
|
||||
helps mypy understand httpsrv.broker but still fails a few levels deeper,
|
||||
for example resolving httpconn.* in httpcli -- see lines tagged #mypy404
|
||||
"""
|
||||
|
||||
log: "RootLogger"
|
||||
args: argparse.Namespace
|
||||
asrv: AuthSrv
|
||||
httpsrv: "HttpSrv"
|
||||
iphash: HMaccas
|
||||
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
|
||||
return ExceptionalQueue(1)
|
||||
|
||||
def say(self, dest: str, *args: Any) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def try_exec(want_retval: Union[bool, int], func: Any, *args: list[Any]) -> Any:
|
||||
try:
|
||||
return func(*args)
|
||||
|
||||
except Pebkac as ex:
|
||||
if not want_retval:
|
||||
raise
|
||||
|
||||
return ["exception", "pebkac", ex.code, str(ex)]
|
||||
|
||||
except:
|
||||
if not want_retval:
|
||||
raise
|
||||
|
||||
return ["exception", "stack", traceback.format_exc()]
|
@ -88,12 +88,8 @@ class FtpAuth(DummyAuthorizer):
|
||||
if bonk:
|
||||
logging.warning("client banned: invalid passwords")
|
||||
bans[ip] = bonk
|
||||
try:
|
||||
# only possible if multiprocessing disabled
|
||||
self.hub.broker.httpsrv.bans[ip] = bonk # type: ignore
|
||||
self.hub.broker.httpsrv.nban += 1 # type: ignore
|
||||
except:
|
||||
pass
|
||||
self.hub.httpsrv.bans[ip] = bonk
|
||||
self.hub.httpsrv.nban += 1
|
||||
|
||||
raise AuthenticationFailed("Authentication failed.")
|
||||
|
||||
|
@ -115,6 +115,7 @@ class HttpCli(object):
|
||||
|
||||
self.t0 = time.time()
|
||||
self.conn = conn
|
||||
self.hub = conn.hsrv.hub
|
||||
self.u2mutex = conn.u2mutex # mypy404
|
||||
self.s = conn.s
|
||||
self.sr = conn.sr
|
||||
@ -475,7 +476,7 @@ class HttpCli(object):
|
||||
) or self.args.idp_h_key in self.headers
|
||||
|
||||
if trusted_key and trusted_xff:
|
||||
self.asrv.idp_checkin(self.conn.hsrv.broker, idp_usr, idp_grp)
|
||||
self.asrv.idp_checkin(self.hub, idp_usr, idp_grp)
|
||||
else:
|
||||
if not trusted_key:
|
||||
t = 'the idp-h-key header ("%s") is not present in the request; will NOT trust the other headers saying that the client\'s username is "%s" and group is "%s"'
|
||||
@ -626,7 +627,7 @@ class HttpCli(object):
|
||||
msg += "hint: %s\r\n" % (self.hint,)
|
||||
|
||||
if "database is locked" in em:
|
||||
self.conn.hsrv.broker.say("log_stacks")
|
||||
self.hub.log_stacks()
|
||||
msg += "hint: important info in the server log\r\n"
|
||||
|
||||
zb = b"<pre>" + html_escape(msg).encode("utf-8", "replace")
|
||||
@ -1629,9 +1630,7 @@ class HttpCli(object):
|
||||
lim = vfs.get_dbv(rem)[0].lim
|
||||
fdir = vfs.canonical(rem)
|
||||
if lim:
|
||||
fdir, rem = lim.all(
|
||||
self.ip, rem, remains, vfs.realpath, fdir, self.conn.hsrv.broker
|
||||
)
|
||||
fdir, rem = lim.all(self.ip, rem, remains, vfs.realpath, fdir, self.hub)
|
||||
|
||||
fn = None
|
||||
if rem and not self.trailing_slash and not bos.path.isdir(fdir):
|
||||
@ -1764,7 +1763,7 @@ class HttpCli(object):
|
||||
lim.bup(self.ip, post_sz)
|
||||
try:
|
||||
lim.chk_sz(post_sz)
|
||||
lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, post_sz)
|
||||
lim.chk_vsz(self.hub, vfs.realpath, post_sz)
|
||||
except:
|
||||
wunlink(self.log, path, vfs.flags)
|
||||
raise
|
||||
@ -1823,8 +1822,7 @@ class HttpCli(object):
|
||||
raise Pebkac(403, t)
|
||||
|
||||
vfs, rem = vfs.get_dbv(rem)
|
||||
self.conn.hsrv.broker.say(
|
||||
"up2k.hash_file",
|
||||
self.hub.up2k.hash_file(
|
||||
vfs.realpath,
|
||||
vfs.vpath,
|
||||
vfs.flags,
|
||||
@ -2049,8 +2047,7 @@ class HttpCli(object):
|
||||
|
||||
# not to protect u2fh, but to prevent handshakes while files are closing
|
||||
with self.u2mutex:
|
||||
x = self.conn.hsrv.broker.ask("up2k.handle_json", body, self.u2fh.aps)
|
||||
ret = x.get()
|
||||
ret = self.hub.up2k.handle_json(body, self.u2fh.aps)
|
||||
|
||||
if self.is_vproxied:
|
||||
if "purl" in ret:
|
||||
@ -2138,7 +2135,7 @@ class HttpCli(object):
|
||||
vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||||
ptop = (vfs.dbv or vfs).realpath
|
||||
|
||||
x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash)
|
||||
x = self.hub.up2k.handle_chunk(ptop, wark, chash)
|
||||
response = x.get()
|
||||
chunksize, cstart, path, lastmod, sprs = response
|
||||
|
||||
@ -2207,11 +2204,9 @@ class HttpCli(object):
|
||||
f.close()
|
||||
raise
|
||||
finally:
|
||||
x = self.conn.hsrv.broker.ask("up2k.release_chunk", ptop, wark, chash)
|
||||
x.get() # block client until released
|
||||
self.hub.up2k.release_chunk(ptop, wark, chash)
|
||||
|
||||
x = self.conn.hsrv.broker.ask("up2k.confirm_chunk", ptop, wark, chash)
|
||||
ztis = x.get()
|
||||
ztis = self.hub.up2k.confirm_chunk(ptop, wark, chash)
|
||||
try:
|
||||
num_left, fin_path = ztis
|
||||
except:
|
||||
@ -2223,9 +2218,7 @@ class HttpCli(object):
|
||||
self.u2fh.close(path)
|
||||
|
||||
if not num_left and not self.args.nw:
|
||||
self.conn.hsrv.broker.ask(
|
||||
"up2k.finish_upload", ptop, wark, self.u2fh.aps
|
||||
).get()
|
||||
self.hub.up2k.finish_upload(ptop, wark, self.u2fh.aps)
|
||||
|
||||
cinf = self.headers.get("x-up2k-stat", "")
|
||||
|
||||
@ -2403,7 +2396,7 @@ class HttpCli(object):
|
||||
fdir_base = vfs.canonical(rem)
|
||||
if lim:
|
||||
fdir_base, rem = lim.all(
|
||||
self.ip, rem, -1, vfs.realpath, fdir_base, self.conn.hsrv.broker
|
||||
self.ip, rem, -1, vfs.realpath, fdir_base, self.hub
|
||||
)
|
||||
upload_vpath = "{}/{}".format(vfs.vpath, rem).strip("/")
|
||||
if not nullwrite:
|
||||
@ -2511,7 +2504,7 @@ class HttpCli(object):
|
||||
try:
|
||||
lim.chk_df(tabspath, sz, True)
|
||||
lim.chk_sz(sz)
|
||||
lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz)
|
||||
lim.chk_vsz(self.hub, vfs.realpath, sz)
|
||||
lim.chk_bup(self.ip)
|
||||
lim.chk_nup(self.ip)
|
||||
except:
|
||||
@ -2549,8 +2542,7 @@ class HttpCli(object):
|
||||
raise Pebkac(403, t)
|
||||
|
||||
dbv, vrem = vfs.get_dbv(rem)
|
||||
self.conn.hsrv.broker.say(
|
||||
"up2k.hash_file",
|
||||
self.hub.up2k.hash_file(
|
||||
dbv.realpath,
|
||||
vfs.vpath,
|
||||
dbv.flags,
|
||||
@ -2697,7 +2689,7 @@ class HttpCli(object):
|
||||
fp = vfs.canonical(rp)
|
||||
lim = vfs.get_dbv(rem)[0].lim
|
||||
if lim:
|
||||
fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.conn.hsrv.broker)
|
||||
fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.hub)
|
||||
bos.makedirs(fp)
|
||||
|
||||
fp = os.path.join(fp, fn)
|
||||
@ -2799,7 +2791,7 @@ class HttpCli(object):
|
||||
lim.bup(self.ip, sz)
|
||||
try:
|
||||
lim.chk_sz(sz)
|
||||
lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz)
|
||||
lim.chk_vsz(self.hub, vfs.realpath, sz)
|
||||
except:
|
||||
wunlink(self.log, fp, vfs.flags)
|
||||
raise
|
||||
@ -2828,8 +2820,7 @@ class HttpCli(object):
|
||||
raise Pebkac(403, t)
|
||||
|
||||
vfs, rem = vfs.get_dbv(rem)
|
||||
self.conn.hsrv.broker.say(
|
||||
"up2k.hash_file",
|
||||
self.hub.up2k.hash_file(
|
||||
vfs.realpath,
|
||||
vfs.vpath,
|
||||
vfs.flags,
|
||||
@ -3363,8 +3354,8 @@ class HttpCli(object):
|
||||
]
|
||||
|
||||
if self.avol and not self.args.no_rescan:
|
||||
x = self.conn.hsrv.broker.ask("up2k.get_state")
|
||||
vs = json.loads(x.get())
|
||||
zs = self.hub.up2k.get_state()
|
||||
vs = json.loads(zs)
|
||||
vstate = {("/" + k).rstrip("/") + "/": v for k, v in vs["volstate"].items()}
|
||||
else:
|
||||
vstate = {}
|
||||
@ -3508,10 +3499,8 @@ class HttpCli(object):
|
||||
|
||||
vn, _ = self.asrv.vfs.get(self.vpath, self.uname, True, True)
|
||||
|
||||
args = [self.asrv.vfs.all_vols, [vn.vpath], False, True]
|
||||
err = self.hub.up2k.rescan(self.asrv.vfs.all_vols, [vn.vpath], False, True)
|
||||
|
||||
x = self.conn.hsrv.broker.ask("up2k.rescan", *args)
|
||||
err = x.get()
|
||||
if not err:
|
||||
self.redirect("", "?h")
|
||||
return True
|
||||
@ -3529,8 +3518,8 @@ class HttpCli(object):
|
||||
if self.args.no_reload:
|
||||
raise Pebkac(403, "the reload feature is disabled in server config")
|
||||
|
||||
x = self.conn.hsrv.broker.ask("reload")
|
||||
return self.redirect("", "?h", x.get(), "return to", False)
|
||||
zs = self.hub.reload()
|
||||
return self.redirect("", "?h", zs, "return to", False)
|
||||
|
||||
def tx_stack(self) -> bool:
|
||||
if not self.avol and not [x for x in self.wvol if x in self.rvol]:
|
||||
@ -3632,10 +3621,7 @@ class HttpCli(object):
|
||||
and (self.uname in vol.axs.uread or self.uname in vol.axs.upget)
|
||||
}
|
||||
|
||||
x = self.conn.hsrv.broker.ask(
|
||||
"up2k.get_unfinished_by_user", self.uname, self.ip
|
||||
)
|
||||
uret = x.get()
|
||||
uret = self.hub.up2k.get_unfinished_by_user(self.uname, self.ip)
|
||||
|
||||
if not self.args.unpost:
|
||||
allvols = []
|
||||
@ -3721,10 +3707,8 @@ class HttpCli(object):
|
||||
nlim = int(self.uparam.get("lim") or 0)
|
||||
lim = [nlim, nlim] if nlim else []
|
||||
|
||||
x = self.conn.hsrv.broker.ask(
|
||||
"up2k.handle_rm", self.uname, self.ip, req, lim, False, unpost
|
||||
)
|
||||
self.loud_reply(x.get())
|
||||
zs = self.hub.up2k.handle_rm(self.uname, self.ip, req, lim, False, unpost)
|
||||
self.loud_reply(zs)
|
||||
return True
|
||||
|
||||
def handle_mv(self) -> bool:
|
||||
@ -3746,8 +3730,8 @@ class HttpCli(object):
|
||||
if self.args.no_mv:
|
||||
raise Pebkac(403, "the rename/move feature is disabled in server config")
|
||||
|
||||
x = self.conn.hsrv.broker.ask("up2k.handle_mv", self.uname, vsrc, vdst)
|
||||
self.loud_reply(x.get(), status=201)
|
||||
zs = self.hub.up2k.handle_mv(self.uname, vsrc, vdst)
|
||||
self.loud_reply(zs, status=201)
|
||||
return True
|
||||
|
||||
def tx_ls(self, ls: dict[str, Any]) -> bool:
|
||||
|
@ -58,7 +58,7 @@ class HttpConn(object):
|
||||
self.ipa_nm: Optional[NetMap] = hsrv.ipa_nm
|
||||
self.xff_nm: Optional[NetMap] = hsrv.xff_nm
|
||||
self.xff_lan: NetMap = hsrv.xff_lan # type: ignore
|
||||
self.iphash: HMaccas = hsrv.broker.iphash
|
||||
self.iphash: HMaccas = hsrv.hub.iphash
|
||||
self.bans: dict[str, int] = hsrv.bans
|
||||
self.aclose: dict[str, int] = hsrv.aclose
|
||||
|
||||
|
@ -77,7 +77,7 @@ from .util import (
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .broker_util import BrokerCli
|
||||
from .svchub import SvcHub
|
||||
from .ssdp import SSDPr
|
||||
|
||||
if True: # pylint: disable=using-constant-test
|
||||
@ -90,16 +90,13 @@ class HttpSrv(object):
|
||||
relying on MpSrv for performance (HttpSrv is just plain threads)
|
||||
"""
|
||||
|
||||
def __init__(self, broker: "BrokerCli", nid: Optional[int]) -> None:
|
||||
self.broker = broker
|
||||
def __init__(self, hub: "SvcHub", nid: Optional[int]) -> None:
|
||||
self.hub = hub
|
||||
self.nid = nid
|
||||
self.args = broker.args
|
||||
self.args = hub.args
|
||||
self.E: EnvParams = self.args.E
|
||||
self.log = broker.log
|
||||
self.asrv = broker.asrv
|
||||
|
||||
# redefine in case of multiprocessing
|
||||
socket.setdefaulttimeout(120)
|
||||
self.log = hub.log
|
||||
self.asrv = hub.asrv
|
||||
|
||||
self.t0 = time.time()
|
||||
nsuf = "-n{}-i{:x}".format(nid, os.getpid()) if nid else ""
|
||||
@ -169,7 +166,7 @@ class HttpSrv(object):
|
||||
if self.args.zs:
|
||||
from .ssdp import SSDPr
|
||||
|
||||
self.ssdp = SSDPr(broker)
|
||||
self.ssdp = SSDPr(hub)
|
||||
|
||||
if self.tp_q:
|
||||
self.start_threads(4)
|
||||
@ -186,8 +183,7 @@ class HttpSrv(object):
|
||||
|
||||
def post_init(self) -> None:
|
||||
try:
|
||||
x = self.broker.ask("thumbsrv.getcfg")
|
||||
self.th_cfg = x.get()
|
||||
self.th_cfg = self.hub.thumbsrv.getcfg()
|
||||
except:
|
||||
pass
|
||||
|
||||
@ -237,19 +233,11 @@ class HttpSrv(object):
|
||||
self.t_periodic = None
|
||||
return
|
||||
|
||||
def listen(self, sck: socket.socket, nlisteners: int) -> None:
|
||||
if self.args.j != 1:
|
||||
# lost in the pickle; redefine
|
||||
if not ANYWIN or self.args.reuseaddr:
|
||||
sck.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
sck.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
sck.settimeout(None) # < does not inherit, ^ opts above do
|
||||
|
||||
def listen(self, sck: socket.socket) -> None:
|
||||
ip, port = sck.getsockname()[:2]
|
||||
self.srvs.append(sck)
|
||||
self.bound.add((ip, port))
|
||||
self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners)
|
||||
self.nclimax = self.args.nc
|
||||
Daemon(
|
||||
self.thr_listen,
|
||||
"httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port),
|
||||
@ -265,7 +253,7 @@ class HttpSrv(object):
|
||||
self.log(self.name, msg)
|
||||
|
||||
def fun() -> None:
|
||||
self.broker.say("cb_httpsrv_up")
|
||||
self.hub.cb_httpsrv_up()
|
||||
|
||||
threading.Thread(target=fun, name="sig-hsrv-up1").start()
|
||||
|
||||
|
@ -15,6 +15,7 @@ if TYPE_CHECKING:
|
||||
class Metrics(object):
|
||||
def __init__(self, hsrv: "HttpSrv") -> None:
|
||||
self.hsrv = hsrv
|
||||
self.hub = hsrv.hub
|
||||
|
||||
def tx(self, cli: "HttpCli") -> bool:
|
||||
if not cli.avol:
|
||||
@ -88,8 +89,8 @@ class Metrics(object):
|
||||
addg("cpp_total_bans", str(self.hsrv.nban), t)
|
||||
|
||||
if not args.nos_vst:
|
||||
x = self.hsrv.broker.ask("up2k.get_state")
|
||||
vs = json.loads(x.get())
|
||||
zs = self.hub.up2k.get_state()
|
||||
vs = json.loads(zs.get())
|
||||
|
||||
nvidle = 0
|
||||
nvbusy = 0
|
||||
@ -146,8 +147,7 @@ class Metrics(object):
|
||||
volsizes = []
|
||||
try:
|
||||
ptops = [x.realpath for _, x in allvols]
|
||||
x = self.hsrv.broker.ask("up2k.get_volsizes", ptops)
|
||||
volsizes = x.get()
|
||||
volsizes = self.hub.up2k.get_volsizes(ptops)
|
||||
except Exception as ex:
|
||||
cli.log("tx_stats get_volsizes: {!r}".format(ex), 3)
|
||||
|
||||
@ -204,8 +204,7 @@ class Metrics(object):
|
||||
tnbytes = 0
|
||||
tnfiles = 0
|
||||
try:
|
||||
x = self.hsrv.broker.ask("up2k.get_unfinished")
|
||||
xs = x.get()
|
||||
xs = self.hub.up2k.get_unfinished()
|
||||
if not xs:
|
||||
raise Exception("up2k mutex acquisition timed out")
|
||||
|
||||
|
@ -12,7 +12,6 @@ from .multicast import MC_Sck, MCast
|
||||
from .util import CachedSet, html_escape, min_ex
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .broker_util import BrokerCli
|
||||
from .httpcli import HttpCli
|
||||
from .svchub import SvcHub
|
||||
|
||||
@ -32,9 +31,9 @@ class SSDP_Sck(MC_Sck):
|
||||
class SSDPr(object):
|
||||
"""generates http responses for httpcli"""
|
||||
|
||||
def __init__(self, broker: "BrokerCli") -> None:
|
||||
self.broker = broker
|
||||
self.args = broker.args
|
||||
def __init__(self, hub: "SvcHub") -> None:
|
||||
self.hub = hub
|
||||
self.args = hub.args
|
||||
|
||||
def reply(self, hc: "HttpCli") -> bool:
|
||||
if hc.vpath.endswith("device.xml"):
|
||||
|
@ -28,9 +28,10 @@ if True: # pylint: disable=using-constant-test
|
||||
import typing
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from .__init__ import ANYWIN, EXE, MACOS, TYPE_CHECKING, E, EnvParams, unicode
|
||||
from .__init__ import ANYWIN, EXE, TYPE_CHECKING, E, EnvParams, unicode
|
||||
from .authsrv import BAD_CFG, AuthSrv
|
||||
from .cert import ensure_cert
|
||||
from .httpsrv import HttpSrv
|
||||
from .mtag import HAVE_FFMPEG, HAVE_FFPROBE
|
||||
from .tcpsrv import TcpSrv
|
||||
from .th_srv import HAVE_PIL, HAVE_VIPS, HAVE_WEBP, ThumbSrv
|
||||
@ -51,7 +52,6 @@ from .util import (
|
||||
ansi_re,
|
||||
build_netmap,
|
||||
min_ex,
|
||||
mp,
|
||||
odfusion,
|
||||
pybin,
|
||||
start_log_thrs,
|
||||
@ -67,16 +67,6 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
class SvcHub(object):
|
||||
"""
|
||||
Hosts all services which cannot be parallelized due to reliance on monolithic resources.
|
||||
Creates a Broker which does most of the heavy stuff; hosted services can use this to perform work:
|
||||
hub.broker.<say|ask>(destination, args_list).
|
||||
|
||||
Either BrokerThr (plain threads) or BrokerMP (multiprocessing) is used depending on configuration.
|
||||
Nothing is returned synchronously; if you want any value returned from the call,
|
||||
put() can return a queue (if want_reply=True) which has a blocking get() with the response.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
args: argparse.Namespace,
|
||||
@ -163,16 +153,6 @@ class SvcHub(object):
|
||||
if args.log_thrs:
|
||||
start_log_thrs(self.log, args.log_thrs, 0)
|
||||
|
||||
if not args.use_fpool and args.j != 1:
|
||||
args.no_fpool = True
|
||||
t = "multithreading enabled with -j {}, so disabling fpool -- this can reduce upload performance on some filesystems"
|
||||
self.log("root", t.format(args.j))
|
||||
|
||||
if not args.no_fpool and args.j != 1:
|
||||
t = "WARNING: ignoring --use-fpool because multithreading (-j{}) is enabled"
|
||||
self.log("root", t.format(args.j), c=3)
|
||||
args.no_fpool = True
|
||||
|
||||
for name, arg in (
|
||||
("iobuf", "iobuf"),
|
||||
("s-rd-sz", "s_rd_sz"),
|
||||
@ -316,13 +296,7 @@ class SvcHub(object):
|
||||
self.mdns: Optional["MDNS"] = None
|
||||
self.ssdp: Optional["SSDPd"] = None
|
||||
|
||||
# decide which worker impl to use
|
||||
if self.check_mp_enable():
|
||||
from .broker_mp import BrokerMp as Broker
|
||||
else:
|
||||
from .broker_thr import BrokerThr as Broker # type: ignore
|
||||
|
||||
self.broker = Broker(self)
|
||||
self.httpsrv = HttpSrv(self, None)
|
||||
|
||||
def start_ftpd(self) -> None:
|
||||
time.sleep(30)
|
||||
@ -361,15 +335,14 @@ class SvcHub(object):
|
||||
|
||||
def thr_httpsrv_up(self) -> None:
|
||||
time.sleep(1 if self.args.ign_ebind_all else 5)
|
||||
expected = self.broker.num_workers * self.tcpsrv.nsrv
|
||||
expected = self.tcpsrv.nsrv
|
||||
failed = expected - self.httpsrv_up
|
||||
if not failed:
|
||||
return
|
||||
|
||||
if self.args.ign_ebind_all:
|
||||
if not self.tcpsrv.srv:
|
||||
for _ in range(self.broker.num_workers):
|
||||
self.broker.say("cb_httpsrv_up")
|
||||
self.cb_httpsrv_up()
|
||||
return
|
||||
|
||||
if self.args.ign_ebind and self.tcpsrv.srv:
|
||||
@ -387,8 +360,6 @@ class SvcHub(object):
|
||||
|
||||
def cb_httpsrv_up(self) -> None:
|
||||
self.httpsrv_up += 1
|
||||
if self.httpsrv_up != self.broker.num_workers:
|
||||
return
|
||||
|
||||
ar = self.args
|
||||
for _ in range(10 if ar.ftp or ar.ftps else 0):
|
||||
@ -723,7 +694,6 @@ class SvcHub(object):
|
||||
self.log("root", "reloading config")
|
||||
self.asrv.reload()
|
||||
self.up2k.reload(rescan_all_vols)
|
||||
self.broker.reload()
|
||||
self.reloading = 0
|
||||
|
||||
def _reload_blocking(self, rescan_all_vols: bool = True) -> None:
|
||||
@ -808,7 +778,7 @@ class SvcHub(object):
|
||||
tasks.append(Daemon(self.ssdp.stop, "ssdp"))
|
||||
slp = time.time() + 0.5
|
||||
|
||||
self.broker.shutdown()
|
||||
self.httpsrv.shutdown()
|
||||
self.tcpsrv.shutdown()
|
||||
self.up2k.shutdown()
|
||||
|
||||
@ -970,48 +940,6 @@ class SvcHub(object):
|
||||
if ex.errno != errno.EPIPE:
|
||||
raise
|
||||
|
||||
def check_mp_support(self) -> str:
|
||||
if MACOS:
|
||||
return "multiprocessing is wonky on mac osx;"
|
||||
elif sys.version_info < (3, 3):
|
||||
return "need python 3.3 or newer for multiprocessing;"
|
||||
|
||||
try:
|
||||
x: mp.Queue[tuple[str, str]] = mp.Queue(1)
|
||||
x.put(("foo", "bar"))
|
||||
if x.get()[0] != "foo":
|
||||
raise Exception()
|
||||
except:
|
||||
return "multiprocessing is not supported on your platform;"
|
||||
|
||||
return ""
|
||||
|
||||
def check_mp_enable(self) -> bool:
|
||||
if self.args.j == 1:
|
||||
return False
|
||||
|
||||
try:
|
||||
if mp.cpu_count() <= 1:
|
||||
raise Exception()
|
||||
except:
|
||||
self.log("svchub", "only one CPU detected; multiprocessing disabled")
|
||||
return False
|
||||
|
||||
try:
|
||||
# support vscode debugger (bonus: same behavior as on windows)
|
||||
mp.set_start_method("spawn", True)
|
||||
except AttributeError:
|
||||
# py2.7 probably, anyways dontcare
|
||||
pass
|
||||
|
||||
err = self.check_mp_support()
|
||||
if not err:
|
||||
return True
|
||||
else:
|
||||
self.log("svchub", err)
|
||||
self.log("svchub", "cannot efficiently use multiple CPU cores")
|
||||
return False
|
||||
|
||||
def sd_notify(self) -> None:
|
||||
try:
|
||||
zb = os.getenv("NOTIFY_SOCKET")
|
||||
|
@ -297,7 +297,7 @@ class TcpSrv(object):
|
||||
if self.args.q:
|
||||
print(msg)
|
||||
|
||||
self.hub.broker.say("listen", srv)
|
||||
self.hub.httpsrv.listen(srv)
|
||||
|
||||
self.srv = srvs
|
||||
self.bound = bound
|
||||
@ -305,7 +305,7 @@ class TcpSrv(object):
|
||||
self._distribute_netdevs()
|
||||
|
||||
def _distribute_netdevs(self):
|
||||
self.hub.broker.say("set_netdevs", self.netdevs)
|
||||
self.hub.httpsrv.set_netdevs(self.netdevs)
|
||||
self.hub.start_zeroconf()
|
||||
gencert(self.log, self.args, self.netdevs)
|
||||
self.hub.restart_ftpd()
|
||||
|
@ -7,7 +7,6 @@ from .__init__ import TYPE_CHECKING
|
||||
from .authsrv import VFS
|
||||
from .bos import bos
|
||||
from .th_srv import HAVE_WEBP, thumb_path
|
||||
from .util import Cooldown
|
||||
|
||||
if True: # pylint: disable=using-constant-test
|
||||
from typing import Optional, Union
|
||||
@ -18,14 +17,11 @@ if TYPE_CHECKING:
|
||||
|
||||
class ThumbCli(object):
|
||||
def __init__(self, hsrv: "HttpSrv") -> None:
|
||||
self.broker = hsrv.broker
|
||||
self.hub = hsrv.hub
|
||||
self.log_func = hsrv.log
|
||||
self.args = hsrv.args
|
||||
self.asrv = hsrv.asrv
|
||||
|
||||
# cache on both sides for less broker spam
|
||||
self.cooldown = Cooldown(self.args.th_poke)
|
||||
|
||||
try:
|
||||
c = hsrv.th_cfg
|
||||
if not c:
|
||||
@ -134,13 +130,11 @@ class ThumbCli(object):
|
||||
|
||||
if ret:
|
||||
tdir = os.path.dirname(tpath)
|
||||
if self.cooldown.poke(tdir):
|
||||
self.broker.say("thumbsrv.poke", tdir)
|
||||
self.hub.thumbsrv.poke(tdir)
|
||||
|
||||
if want_opus:
|
||||
# audio files expire individually
|
||||
if self.cooldown.poke(tpath):
|
||||
self.broker.say("thumbsrv.poke", tpath)
|
||||
self.hub.thumbsrv.poke(tpath)
|
||||
|
||||
return ret
|
||||
|
||||
@ -150,5 +144,4 @@ class ThumbCli(object):
|
||||
if not bos.path.getsize(os.path.join(ptop, rem)):
|
||||
return None
|
||||
|
||||
x = self.broker.ask("thumbsrv.get", ptop, rem, mtime, fmt)
|
||||
return x.get() # type: ignore
|
||||
return self.hub.thumbsrv.get(ptop, rem, mtime, fmt)
|
||||
|
@ -2745,9 +2745,9 @@ class Up2k(object):
|
||||
cj["size"],
|
||||
cj["ptop"],
|
||||
ap1,
|
||||
self.hub.broker,
|
||||
self.hub,
|
||||
reg,
|
||||
"up2k._get_volsize",
|
||||
"_get_volsize",
|
||||
)
|
||||
bos.makedirs(ap2)
|
||||
vfs.lim.nup(cj["addr"])
|
||||
|
@ -69,8 +69,6 @@ sed -ri s/copyparty.exe/copyparty$esuf.exe/ loader.rc2
|
||||
|
||||
excl=(
|
||||
asyncio
|
||||
copyparty.broker_mp
|
||||
copyparty.broker_mpw
|
||||
copyparty.smbd
|
||||
ctypes.macholib
|
||||
curses
|
||||
|
@ -7,10 +7,6 @@ copyparty/bos,
|
||||
copyparty/bos/__init__.py,
|
||||
copyparty/bos/bos.py,
|
||||
copyparty/bos/path.py,
|
||||
copyparty/broker_mp.py,
|
||||
copyparty/broker_mpw.py,
|
||||
copyparty/broker_thr.py,
|
||||
copyparty/broker_util.py,
|
||||
copyparty/cert.py,
|
||||
copyparty/cfg.py,
|
||||
copyparty/dxml.py,
|
||||
|
@ -170,12 +170,14 @@ class Cfg(Namespace):
|
||||
)
|
||||
|
||||
|
||||
class NullBroker(object):
|
||||
def say(self, *args):
|
||||
class NullUp2k(object):
|
||||
def hash_file(*a):
|
||||
pass
|
||||
|
||||
def ask(self, *args):
|
||||
pass
|
||||
|
||||
class NullHub(object):
|
||||
def __init__(self):
|
||||
self.up2k = NullUp2k()
|
||||
|
||||
|
||||
class VSock(object):
|
||||
@ -206,7 +208,7 @@ class VHttpSrv(object):
|
||||
self.asrv = asrv
|
||||
self.log = log
|
||||
|
||||
self.broker = NullBroker()
|
||||
self.hub = NullHub()
|
||||
self.prism = None
|
||||
self.bans = {}
|
||||
self.nreq = 0
|
||||
|
Loading…
Reference in New Issue
Block a user