Compare commits
1 Commits
hovudstrau
...
no-broker
Author | SHA1 | Date | |
---|---|---|---|
![]() |
7781e0529d |
@ -841,7 +841,6 @@ def add_general(ap, nc, srvname):
|
|||||||
ap2 = ap.add_argument_group('general options')
|
ap2 = ap.add_argument_group('general options')
|
||||||
ap2.add_argument("-c", metavar="PATH", type=u, action="append", help="add config file")
|
ap2.add_argument("-c", metavar="PATH", type=u, action="append", help="add config file")
|
||||||
ap2.add_argument("-nc", metavar="NUM", type=int, default=nc, help="max num clients")
|
ap2.add_argument("-nc", metavar="NUM", type=int, default=nc, help="max num clients")
|
||||||
ap2.add_argument("-j", metavar="CORES", type=int, default=1, help="max num cpu cores, 0=all")
|
|
||||||
ap2.add_argument("-a", metavar="ACCT", type=u, action="append", help="add account, \033[33mUSER\033[0m:\033[33mPASS\033[0m; example [\033[32med:wark\033[0m]")
|
ap2.add_argument("-a", metavar="ACCT", type=u, action="append", help="add account, \033[33mUSER\033[0m:\033[33mPASS\033[0m; example [\033[32med:wark\033[0m]")
|
||||||
ap2.add_argument("-v", metavar="VOL", type=u, action="append", help="add volume, \033[33mSRC\033[0m:\033[33mDST\033[0m:\033[33mFLAG\033[0m; examples [\033[32m.::r\033[0m], [\033[32m/mnt/nas/music:/music:r:aed\033[0m], see --help-accounts")
|
ap2.add_argument("-v", metavar="VOL", type=u, action="append", help="add volume, \033[33mSRC\033[0m:\033[33mDST\033[0m:\033[33mFLAG\033[0m; examples [\033[32m.::r\033[0m], [\033[32m/mnt/nas/music:/music:r:aed\033[0m], see --help-accounts")
|
||||||
ap2.add_argument("--grp", metavar="G:N,N", type=u, action="append", help="add group, \033[33mNAME\033[0m:\033[33mUSER1\033[0m,\033[33mUSER2\033[0m,\033[33m...\033[0m; example [\033[32madmins:ed,foo,bar\033[0m]")
|
ap2.add_argument("--grp", metavar="G:N,N", type=u, action="append", help="add group, \033[33mNAME\033[0m:\033[33mUSER1\033[0m,\033[33mUSER2\033[0m,\033[33m...\033[0m; example [\033[32madmins:ed,foo,bar\033[0m]")
|
||||||
@ -881,7 +880,7 @@ def add_upload(ap):
|
|||||||
ap2.add_argument("--blank-wt", metavar="SEC", type=int, default=300, help="file write grace period (any client can write to a blank file last-modified more recently than \033[33mSEC\033[0m seconds ago)")
|
ap2.add_argument("--blank-wt", metavar="SEC", type=int, default=300, help="file write grace period (any client can write to a blank file last-modified more recently than \033[33mSEC\033[0m seconds ago)")
|
||||||
ap2.add_argument("--reg-cap", metavar="N", type=int, default=38400, help="max number of uploads to keep in memory when running without \033[33m-e2d\033[0m; roughly 1 MiB RAM per 600")
|
ap2.add_argument("--reg-cap", metavar="N", type=int, default=38400, help="max number of uploads to keep in memory when running without \033[33m-e2d\033[0m; roughly 1 MiB RAM per 600")
|
||||||
ap2.add_argument("--no-fpool", action="store_true", help="disable file-handle pooling -- instead, repeatedly close and reopen files during upload (bad idea to enable this on windows and/or cow filesystems)")
|
ap2.add_argument("--no-fpool", action="store_true", help="disable file-handle pooling -- instead, repeatedly close and reopen files during upload (bad idea to enable this on windows and/or cow filesystems)")
|
||||||
ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (multiprocessing, filesystems lacking sparse-files support, ...)")
|
ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (filesystems lacking sparse-files support, ...)")
|
||||||
ap2.add_argument("--hardlink", action="store_true", help="prefer hardlinks instead of symlinks when possible (within same filesystem) (volflag=hardlink)")
|
ap2.add_argument("--hardlink", action="store_true", help="prefer hardlinks instead of symlinks when possible (within same filesystem) (volflag=hardlink)")
|
||||||
ap2.add_argument("--never-symlink", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made (volflag=neversymlink)")
|
ap2.add_argument("--never-symlink", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made (volflag=neversymlink)")
|
||||||
ap2.add_argument("--no-dedup", action="store_true", help="disable symlink/hardlink creation; copy file contents instead (volflag=copydupes)")
|
ap2.add_argument("--no-dedup", action="store_true", help="disable symlink/hardlink creation; copy file contents instead (volflag=copydupes)")
|
||||||
|
@ -44,9 +44,7 @@ if True: # pylint: disable=using-constant-test
|
|||||||
from .util import NamedLogger, RootLogger
|
from .util import NamedLogger, RootLogger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .broker_mp import BrokerMp
|
from .svchub import SvcHub
|
||||||
from .broker_thr import BrokerThr
|
|
||||||
from .broker_util import BrokerCli
|
|
||||||
|
|
||||||
# Vflags: TypeAlias = dict[str, str | bool | float | list[str]]
|
# Vflags: TypeAlias = dict[str, str | bool | float | list[str]]
|
||||||
# Vflags: TypeAlias = dict[str, Any]
|
# Vflags: TypeAlias = dict[str, Any]
|
||||||
@ -141,9 +139,9 @@ class Lim(object):
|
|||||||
sz: int,
|
sz: int,
|
||||||
ptop: str,
|
ptop: str,
|
||||||
abspath: str,
|
abspath: str,
|
||||||
broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]] = None,
|
hub: Optional["SvcHub"] = None,
|
||||||
reg: Optional[dict[str, dict[str, Any]]] = None,
|
reg: Optional[dict[str, dict[str, Any]]] = None,
|
||||||
volgetter: str = "up2k.get_volsize",
|
volgetter: str = "get_volsize",
|
||||||
) -> tuple[str, str]:
|
) -> tuple[str, str]:
|
||||||
if reg is not None and self.reg is None:
|
if reg is not None and self.reg is None:
|
||||||
self.reg = reg
|
self.reg = reg
|
||||||
@ -154,7 +152,7 @@ class Lim(object):
|
|||||||
self.chk_rem(rem)
|
self.chk_rem(rem)
|
||||||
if sz != -1:
|
if sz != -1:
|
||||||
self.chk_sz(sz)
|
self.chk_sz(sz)
|
||||||
self.chk_vsz(broker, ptop, sz, volgetter)
|
self.chk_vsz(hub, ptop, sz, volgetter)
|
||||||
self.chk_df(abspath, sz) # side effects; keep last-ish
|
self.chk_df(abspath, sz) # side effects; keep last-ish
|
||||||
|
|
||||||
ap2, vp2 = self.rot(abspath)
|
ap2, vp2 = self.rot(abspath)
|
||||||
@ -172,16 +170,15 @@ class Lim(object):
|
|||||||
|
|
||||||
def chk_vsz(
|
def chk_vsz(
|
||||||
self,
|
self,
|
||||||
broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]],
|
hub: Optional["SvcHub"],
|
||||||
ptop: str,
|
ptop: str,
|
||||||
sz: int,
|
sz: int,
|
||||||
volgetter: str = "up2k.get_volsize",
|
volgetter: str = "up2k.get_volsize",
|
||||||
) -> None:
|
) -> None:
|
||||||
if not broker or not self.vbmax + self.vnmax:
|
if not hub or not self.vbmax + self.vnmax:
|
||||||
return
|
return
|
||||||
|
|
||||||
x = broker.ask(volgetter, ptop)
|
nbytes, nfiles = hub.up2k.getattr(volgetter)(ptop)
|
||||||
nbytes, nfiles = x.get()
|
|
||||||
|
|
||||||
if self.vbmax and self.vbmax < nbytes + sz:
|
if self.vbmax and self.vbmax < nbytes + sz:
|
||||||
raise Pebkac(400, "volume has exceeded max size")
|
raise Pebkac(400, "volume has exceeded max size")
|
||||||
@ -815,9 +812,7 @@ class AuthSrv(object):
|
|||||||
|
|
||||||
yield prev, True
|
yield prev, True
|
||||||
|
|
||||||
def idp_checkin(
|
def idp_checkin(self, hub: Optional["SvcHub"], uname: str, gname: str) -> bool:
|
||||||
self, broker: Optional["BrokerCli"], uname: str, gname: str
|
|
||||||
) -> bool:
|
|
||||||
if uname in self.acct:
|
if uname in self.acct:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -837,12 +832,12 @@ class AuthSrv(object):
|
|||||||
t = "reinitializing due to new user from IdP: [%s:%s]"
|
t = "reinitializing due to new user from IdP: [%s:%s]"
|
||||||
self.log(t % (uname, gnames), 3)
|
self.log(t % (uname, gnames), 3)
|
||||||
|
|
||||||
if not broker:
|
if not hub:
|
||||||
# only true for tests
|
# only true for tests
|
||||||
self._reload()
|
self._reload()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
broker.ask("_reload_blocking", False).get()
|
hub._reload_blocking(False)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _map_volume_idp(
|
def _map_volume_idp(
|
||||||
|
@ -1,141 +0,0 @@
|
|||||||
# coding: utf-8
|
|
||||||
from __future__ import print_function, unicode_literals
|
|
||||||
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
import traceback
|
|
||||||
|
|
||||||
import queue
|
|
||||||
|
|
||||||
from .__init__ import CORES, TYPE_CHECKING
|
|
||||||
from .broker_mpw import MpWorker
|
|
||||||
from .broker_util import ExceptionalQueue, try_exec
|
|
||||||
from .util import Daemon, mp
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .svchub import SvcHub
|
|
||||||
|
|
||||||
if True: # pylint: disable=using-constant-test
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
|
|
||||||
class MProcess(mp.Process):
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
q_pend: queue.Queue[tuple[int, str, list[Any]]],
|
|
||||||
q_yield: queue.Queue[tuple[int, str, list[Any]]],
|
|
||||||
target: Any,
|
|
||||||
args: Any,
|
|
||||||
) -> None:
|
|
||||||
super(MProcess, self).__init__(target=target, args=args)
|
|
||||||
self.q_pend = q_pend
|
|
||||||
self.q_yield = q_yield
|
|
||||||
|
|
||||||
|
|
||||||
class BrokerMp(object):
|
|
||||||
"""external api; manages MpWorkers"""
|
|
||||||
|
|
||||||
def __init__(self, hub: "SvcHub") -> None:
|
|
||||||
self.hub = hub
|
|
||||||
self.log = hub.log
|
|
||||||
self.args = hub.args
|
|
||||||
|
|
||||||
self.procs = []
|
|
||||||
self.mutex = threading.Lock()
|
|
||||||
|
|
||||||
self.num_workers = self.args.j or CORES
|
|
||||||
self.log("broker", "booting {} subprocesses".format(self.num_workers))
|
|
||||||
for n in range(1, self.num_workers + 1):
|
|
||||||
q_pend: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(1) # type: ignore
|
|
||||||
q_yield: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(64) # type: ignore
|
|
||||||
|
|
||||||
proc = MProcess(q_pend, q_yield, MpWorker, (q_pend, q_yield, self.args, n))
|
|
||||||
Daemon(self.collector, "mp-sink-{}".format(n), (proc,))
|
|
||||||
self.procs.append(proc)
|
|
||||||
proc.start()
|
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
|
||||||
self.log("broker", "shutting down")
|
|
||||||
for n, proc in enumerate(self.procs):
|
|
||||||
thr = threading.Thread(
|
|
||||||
target=proc.q_pend.put((0, "shutdown", [])),
|
|
||||||
name="mp-shutdown-{}-{}".format(n, len(self.procs)),
|
|
||||||
)
|
|
||||||
thr.start()
|
|
||||||
|
|
||||||
with self.mutex:
|
|
||||||
procs = self.procs
|
|
||||||
self.procs = []
|
|
||||||
|
|
||||||
while procs:
|
|
||||||
if procs[-1].is_alive():
|
|
||||||
time.sleep(0.05)
|
|
||||||
continue
|
|
||||||
|
|
||||||
procs.pop()
|
|
||||||
|
|
||||||
def reload(self) -> None:
|
|
||||||
self.log("broker", "reloading")
|
|
||||||
for _, proc in enumerate(self.procs):
|
|
||||||
proc.q_pend.put((0, "reload", []))
|
|
||||||
|
|
||||||
def collector(self, proc: MProcess) -> None:
|
|
||||||
"""receive message from hub in other process"""
|
|
||||||
while True:
|
|
||||||
msg = proc.q_yield.get()
|
|
||||||
retq_id, dest, args = msg
|
|
||||||
|
|
||||||
if dest == "log":
|
|
||||||
self.log(*args)
|
|
||||||
|
|
||||||
elif dest == "retq":
|
|
||||||
# response from previous ipc call
|
|
||||||
raise Exception("invalid broker_mp usage")
|
|
||||||
|
|
||||||
else:
|
|
||||||
# new ipc invoking managed service in hub
|
|
||||||
try:
|
|
||||||
obj = self.hub
|
|
||||||
for node in dest.split("."):
|
|
||||||
obj = getattr(obj, node)
|
|
||||||
|
|
||||||
# TODO will deadlock if dest performs another ipc
|
|
||||||
rv = try_exec(retq_id, obj, *args)
|
|
||||||
except:
|
|
||||||
rv = ["exception", "stack", traceback.format_exc()]
|
|
||||||
|
|
||||||
if retq_id:
|
|
||||||
proc.q_pend.put((retq_id, "retq", rv))
|
|
||||||
|
|
||||||
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
|
|
||||||
|
|
||||||
# new non-ipc invoking managed service in hub
|
|
||||||
obj = self.hub
|
|
||||||
for node in dest.split("."):
|
|
||||||
obj = getattr(obj, node)
|
|
||||||
|
|
||||||
rv = try_exec(True, obj, *args)
|
|
||||||
|
|
||||||
retq = ExceptionalQueue(1)
|
|
||||||
retq.put(rv)
|
|
||||||
return retq
|
|
||||||
|
|
||||||
def say(self, dest: str, *args: Any) -> None:
|
|
||||||
"""
|
|
||||||
send message to non-hub component in other process,
|
|
||||||
returns a Queue object which eventually contains the response if want_retval
|
|
||||||
(not-impl here since nothing uses it yet)
|
|
||||||
"""
|
|
||||||
if dest == "listen":
|
|
||||||
for p in self.procs:
|
|
||||||
p.q_pend.put((0, dest, [args[0], len(self.procs)]))
|
|
||||||
|
|
||||||
elif dest == "set_netdevs":
|
|
||||||
for p in self.procs:
|
|
||||||
p.q_pend.put((0, dest, list(args)))
|
|
||||||
|
|
||||||
elif dest == "cb_httpsrv_up":
|
|
||||||
self.hub.cb_httpsrv_up()
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise Exception("what is " + str(dest))
|
|
@ -1,123 +0,0 @@
|
|||||||
# coding: utf-8
|
|
||||||
from __future__ import print_function, unicode_literals
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import os
|
|
||||||
import signal
|
|
||||||
import sys
|
|
||||||
import threading
|
|
||||||
|
|
||||||
import queue
|
|
||||||
|
|
||||||
from .__init__ import ANYWIN
|
|
||||||
from .authsrv import AuthSrv
|
|
||||||
from .broker_util import BrokerCli, ExceptionalQueue
|
|
||||||
from .httpsrv import HttpSrv
|
|
||||||
from .util import FAKE_MP, Daemon, HMaccas
|
|
||||||
|
|
||||||
if True: # pylint: disable=using-constant-test
|
|
||||||
from types import FrameType
|
|
||||||
|
|
||||||
from typing import Any, Optional, Union
|
|
||||||
|
|
||||||
|
|
||||||
class MpWorker(BrokerCli):
|
|
||||||
"""one single mp instance"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
q_pend: queue.Queue[tuple[int, str, list[Any]]],
|
|
||||||
q_yield: queue.Queue[tuple[int, str, list[Any]]],
|
|
||||||
args: argparse.Namespace,
|
|
||||||
n: int,
|
|
||||||
) -> None:
|
|
||||||
super(MpWorker, self).__init__()
|
|
||||||
|
|
||||||
self.q_pend = q_pend
|
|
||||||
self.q_yield = q_yield
|
|
||||||
self.args = args
|
|
||||||
self.n = n
|
|
||||||
|
|
||||||
self.log = self._log_disabled if args.q and not args.lo else self._log_enabled
|
|
||||||
|
|
||||||
self.retpend: dict[int, Any] = {}
|
|
||||||
self.retpend_mutex = threading.Lock()
|
|
||||||
self.mutex = threading.Lock()
|
|
||||||
|
|
||||||
# we inherited signal_handler from parent,
|
|
||||||
# replace it with something harmless
|
|
||||||
if not FAKE_MP:
|
|
||||||
sigs = [signal.SIGINT, signal.SIGTERM]
|
|
||||||
if not ANYWIN:
|
|
||||||
sigs.append(signal.SIGUSR1)
|
|
||||||
|
|
||||||
for sig in sigs:
|
|
||||||
signal.signal(sig, self.signal_handler)
|
|
||||||
|
|
||||||
# starting to look like a good idea
|
|
||||||
self.asrv = AuthSrv(args, None, False)
|
|
||||||
|
|
||||||
# instantiate all services here (TODO: inheritance?)
|
|
||||||
self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8)
|
|
||||||
self.httpsrv = HttpSrv(self, n)
|
|
||||||
|
|
||||||
# on winxp and some other platforms,
|
|
||||||
# use thr.join() to block all signals
|
|
||||||
Daemon(self.main, "mpw-main").join()
|
|
||||||
|
|
||||||
def signal_handler(self, sig: Optional[int], frame: Optional[FrameType]) -> None:
|
|
||||||
# print('k')
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _log_enabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None:
|
|
||||||
self.q_yield.put((0, "log", [src, msg, c]))
|
|
||||||
|
|
||||||
def _log_disabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def logw(self, msg: str, c: Union[int, str] = 0) -> None:
|
|
||||||
self.log("mp%d" % (self.n,), msg, c)
|
|
||||||
|
|
||||||
def main(self) -> None:
|
|
||||||
while True:
|
|
||||||
retq_id, dest, args = self.q_pend.get()
|
|
||||||
|
|
||||||
# self.logw("work: [{}]".format(d[0]))
|
|
||||||
if dest == "shutdown":
|
|
||||||
self.httpsrv.shutdown()
|
|
||||||
self.logw("ok bye")
|
|
||||||
sys.exit(0)
|
|
||||||
return
|
|
||||||
|
|
||||||
elif dest == "reload":
|
|
||||||
self.logw("mpw.asrv reloading")
|
|
||||||
self.asrv.reload()
|
|
||||||
self.logw("mpw.asrv reloaded")
|
|
||||||
|
|
||||||
elif dest == "listen":
|
|
||||||
self.httpsrv.listen(args[0], args[1])
|
|
||||||
|
|
||||||
elif dest == "set_netdevs":
|
|
||||||
self.httpsrv.set_netdevs(args[0])
|
|
||||||
|
|
||||||
elif dest == "retq":
|
|
||||||
# response from previous ipc call
|
|
||||||
with self.retpend_mutex:
|
|
||||||
retq = self.retpend.pop(retq_id)
|
|
||||||
|
|
||||||
retq.put(args)
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise Exception("what is " + str(dest))
|
|
||||||
|
|
||||||
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
|
|
||||||
retq = ExceptionalQueue(1)
|
|
||||||
retq_id = id(retq)
|
|
||||||
with self.retpend_mutex:
|
|
||||||
self.retpend[retq_id] = retq
|
|
||||||
|
|
||||||
self.q_yield.put((retq_id, dest, list(args)))
|
|
||||||
return retq
|
|
||||||
|
|
||||||
def say(self, dest: str, *args: Any) -> None:
|
|
||||||
self.q_yield.put((0, dest, list(args)))
|
|
@ -1,73 +0,0 @@
|
|||||||
# coding: utf-8
|
|
||||||
from __future__ import print_function, unicode_literals
|
|
||||||
|
|
||||||
import os
|
|
||||||
import threading
|
|
||||||
|
|
||||||
from .__init__ import TYPE_CHECKING
|
|
||||||
from .broker_util import BrokerCli, ExceptionalQueue, try_exec
|
|
||||||
from .httpsrv import HttpSrv
|
|
||||||
from .util import HMaccas
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .svchub import SvcHub
|
|
||||||
|
|
||||||
if True: # pylint: disable=using-constant-test
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
|
|
||||||
class BrokerThr(BrokerCli):
|
|
||||||
"""external api; behaves like BrokerMP but using plain threads"""
|
|
||||||
|
|
||||||
def __init__(self, hub: "SvcHub") -> None:
|
|
||||||
super(BrokerThr, self).__init__()
|
|
||||||
|
|
||||||
self.hub = hub
|
|
||||||
self.log = hub.log
|
|
||||||
self.args = hub.args
|
|
||||||
self.asrv = hub.asrv
|
|
||||||
|
|
||||||
self.mutex = threading.Lock()
|
|
||||||
self.num_workers = 1
|
|
||||||
|
|
||||||
# instantiate all services here (TODO: inheritance?)
|
|
||||||
self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8)
|
|
||||||
self.httpsrv = HttpSrv(self, None)
|
|
||||||
self.reload = self.noop
|
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
|
||||||
# self.log("broker", "shutting down")
|
|
||||||
self.httpsrv.shutdown()
|
|
||||||
|
|
||||||
def noop(self) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
|
|
||||||
|
|
||||||
# new ipc invoking managed service in hub
|
|
||||||
obj = self.hub
|
|
||||||
for node in dest.split("."):
|
|
||||||
obj = getattr(obj, node)
|
|
||||||
|
|
||||||
rv = try_exec(True, obj, *args)
|
|
||||||
|
|
||||||
# pretend we're broker_mp
|
|
||||||
retq = ExceptionalQueue(1)
|
|
||||||
retq.put(rv)
|
|
||||||
return retq
|
|
||||||
|
|
||||||
def say(self, dest: str, *args: Any) -> None:
|
|
||||||
if dest == "listen":
|
|
||||||
self.httpsrv.listen(args[0], 1)
|
|
||||||
return
|
|
||||||
|
|
||||||
if dest == "set_netdevs":
|
|
||||||
self.httpsrv.set_netdevs(args[0])
|
|
||||||
return
|
|
||||||
|
|
||||||
# new ipc invoking managed service in hub
|
|
||||||
obj = self.hub
|
|
||||||
for node in dest.split("."):
|
|
||||||
obj = getattr(obj, node)
|
|
||||||
|
|
||||||
try_exec(False, obj, *args)
|
|
@ -1,72 +0,0 @@
|
|||||||
# coding: utf-8
|
|
||||||
from __future__ import print_function, unicode_literals
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import traceback
|
|
||||||
|
|
||||||
from queue import Queue
|
|
||||||
|
|
||||||
from .__init__ import TYPE_CHECKING
|
|
||||||
from .authsrv import AuthSrv
|
|
||||||
from .util import HMaccas, Pebkac
|
|
||||||
|
|
||||||
if True: # pylint: disable=using-constant-test
|
|
||||||
from typing import Any, Optional, Union
|
|
||||||
|
|
||||||
from .util import RootLogger
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .httpsrv import HttpSrv
|
|
||||||
|
|
||||||
|
|
||||||
class ExceptionalQueue(Queue, object):
|
|
||||||
def get(self, block: bool = True, timeout: Optional[float] = None) -> Any:
|
|
||||||
rv = super(ExceptionalQueue, self).get(block, timeout)
|
|
||||||
|
|
||||||
if isinstance(rv, list):
|
|
||||||
if rv[0] == "exception":
|
|
||||||
if rv[1] == "pebkac":
|
|
||||||
raise Pebkac(*rv[2:])
|
|
||||||
else:
|
|
||||||
raise Exception(rv[2])
|
|
||||||
|
|
||||||
return rv
|
|
||||||
|
|
||||||
|
|
||||||
class BrokerCli(object):
|
|
||||||
"""
|
|
||||||
helps mypy understand httpsrv.broker but still fails a few levels deeper,
|
|
||||||
for example resolving httpconn.* in httpcli -- see lines tagged #mypy404
|
|
||||||
"""
|
|
||||||
|
|
||||||
log: "RootLogger"
|
|
||||||
args: argparse.Namespace
|
|
||||||
asrv: AuthSrv
|
|
||||||
httpsrv: "HttpSrv"
|
|
||||||
iphash: HMaccas
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
|
|
||||||
return ExceptionalQueue(1)
|
|
||||||
|
|
||||||
def say(self, dest: str, *args: Any) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def try_exec(want_retval: Union[bool, int], func: Any, *args: list[Any]) -> Any:
|
|
||||||
try:
|
|
||||||
return func(*args)
|
|
||||||
|
|
||||||
except Pebkac as ex:
|
|
||||||
if not want_retval:
|
|
||||||
raise
|
|
||||||
|
|
||||||
return ["exception", "pebkac", ex.code, str(ex)]
|
|
||||||
|
|
||||||
except:
|
|
||||||
if not want_retval:
|
|
||||||
raise
|
|
||||||
|
|
||||||
return ["exception", "stack", traceback.format_exc()]
|
|
@ -88,12 +88,8 @@ class FtpAuth(DummyAuthorizer):
|
|||||||
if bonk:
|
if bonk:
|
||||||
logging.warning("client banned: invalid passwords")
|
logging.warning("client banned: invalid passwords")
|
||||||
bans[ip] = bonk
|
bans[ip] = bonk
|
||||||
try:
|
self.hub.httpsrv.bans[ip] = bonk
|
||||||
# only possible if multiprocessing disabled
|
self.hub.httpsrv.nban += 1
|
||||||
self.hub.broker.httpsrv.bans[ip] = bonk # type: ignore
|
|
||||||
self.hub.broker.httpsrv.nban += 1 # type: ignore
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
raise AuthenticationFailed("Authentication failed.")
|
raise AuthenticationFailed("Authentication failed.")
|
||||||
|
|
||||||
|
@ -115,6 +115,7 @@ class HttpCli(object):
|
|||||||
|
|
||||||
self.t0 = time.time()
|
self.t0 = time.time()
|
||||||
self.conn = conn
|
self.conn = conn
|
||||||
|
self.hub = conn.hsrv.hub
|
||||||
self.u2mutex = conn.u2mutex # mypy404
|
self.u2mutex = conn.u2mutex # mypy404
|
||||||
self.s = conn.s
|
self.s = conn.s
|
||||||
self.sr = conn.sr
|
self.sr = conn.sr
|
||||||
@ -475,7 +476,7 @@ class HttpCli(object):
|
|||||||
) or self.args.idp_h_key in self.headers
|
) or self.args.idp_h_key in self.headers
|
||||||
|
|
||||||
if trusted_key and trusted_xff:
|
if trusted_key and trusted_xff:
|
||||||
self.asrv.idp_checkin(self.conn.hsrv.broker, idp_usr, idp_grp)
|
self.asrv.idp_checkin(self.hub, idp_usr, idp_grp)
|
||||||
else:
|
else:
|
||||||
if not trusted_key:
|
if not trusted_key:
|
||||||
t = 'the idp-h-key header ("%s") is not present in the request; will NOT trust the other headers saying that the client\'s username is "%s" and group is "%s"'
|
t = 'the idp-h-key header ("%s") is not present in the request; will NOT trust the other headers saying that the client\'s username is "%s" and group is "%s"'
|
||||||
@ -626,7 +627,7 @@ class HttpCli(object):
|
|||||||
msg += "hint: %s\r\n" % (self.hint,)
|
msg += "hint: %s\r\n" % (self.hint,)
|
||||||
|
|
||||||
if "database is locked" in em:
|
if "database is locked" in em:
|
||||||
self.conn.hsrv.broker.say("log_stacks")
|
self.hub.log_stacks()
|
||||||
msg += "hint: important info in the server log\r\n"
|
msg += "hint: important info in the server log\r\n"
|
||||||
|
|
||||||
zb = b"<pre>" + html_escape(msg).encode("utf-8", "replace")
|
zb = b"<pre>" + html_escape(msg).encode("utf-8", "replace")
|
||||||
@ -1629,9 +1630,7 @@ class HttpCli(object):
|
|||||||
lim = vfs.get_dbv(rem)[0].lim
|
lim = vfs.get_dbv(rem)[0].lim
|
||||||
fdir = vfs.canonical(rem)
|
fdir = vfs.canonical(rem)
|
||||||
if lim:
|
if lim:
|
||||||
fdir, rem = lim.all(
|
fdir, rem = lim.all(self.ip, rem, remains, vfs.realpath, fdir, self.hub)
|
||||||
self.ip, rem, remains, vfs.realpath, fdir, self.conn.hsrv.broker
|
|
||||||
)
|
|
||||||
|
|
||||||
fn = None
|
fn = None
|
||||||
if rem and not self.trailing_slash and not bos.path.isdir(fdir):
|
if rem and not self.trailing_slash and not bos.path.isdir(fdir):
|
||||||
@ -1764,7 +1763,7 @@ class HttpCli(object):
|
|||||||
lim.bup(self.ip, post_sz)
|
lim.bup(self.ip, post_sz)
|
||||||
try:
|
try:
|
||||||
lim.chk_sz(post_sz)
|
lim.chk_sz(post_sz)
|
||||||
lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, post_sz)
|
lim.chk_vsz(self.hub, vfs.realpath, post_sz)
|
||||||
except:
|
except:
|
||||||
wunlink(self.log, path, vfs.flags)
|
wunlink(self.log, path, vfs.flags)
|
||||||
raise
|
raise
|
||||||
@ -1823,8 +1822,7 @@ class HttpCli(object):
|
|||||||
raise Pebkac(403, t)
|
raise Pebkac(403, t)
|
||||||
|
|
||||||
vfs, rem = vfs.get_dbv(rem)
|
vfs, rem = vfs.get_dbv(rem)
|
||||||
self.conn.hsrv.broker.say(
|
self.hub.up2k.hash_file(
|
||||||
"up2k.hash_file",
|
|
||||||
vfs.realpath,
|
vfs.realpath,
|
||||||
vfs.vpath,
|
vfs.vpath,
|
||||||
vfs.flags,
|
vfs.flags,
|
||||||
@ -2049,8 +2047,7 @@ class HttpCli(object):
|
|||||||
|
|
||||||
# not to protect u2fh, but to prevent handshakes while files are closing
|
# not to protect u2fh, but to prevent handshakes while files are closing
|
||||||
with self.u2mutex:
|
with self.u2mutex:
|
||||||
x = self.conn.hsrv.broker.ask("up2k.handle_json", body, self.u2fh.aps)
|
ret = self.hub.up2k.handle_json(body, self.u2fh.aps)
|
||||||
ret = x.get()
|
|
||||||
|
|
||||||
if self.is_vproxied:
|
if self.is_vproxied:
|
||||||
if "purl" in ret:
|
if "purl" in ret:
|
||||||
@ -2138,7 +2135,7 @@ class HttpCli(object):
|
|||||||
vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||||||
ptop = (vfs.dbv or vfs).realpath
|
ptop = (vfs.dbv or vfs).realpath
|
||||||
|
|
||||||
x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash)
|
x = self.hub.up2k.handle_chunk(ptop, wark, chash)
|
||||||
response = x.get()
|
response = x.get()
|
||||||
chunksize, cstart, path, lastmod, sprs = response
|
chunksize, cstart, path, lastmod, sprs = response
|
||||||
|
|
||||||
@ -2207,11 +2204,9 @@ class HttpCli(object):
|
|||||||
f.close()
|
f.close()
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
x = self.conn.hsrv.broker.ask("up2k.release_chunk", ptop, wark, chash)
|
self.hub.up2k.release_chunk(ptop, wark, chash)
|
||||||
x.get() # block client until released
|
|
||||||
|
|
||||||
x = self.conn.hsrv.broker.ask("up2k.confirm_chunk", ptop, wark, chash)
|
ztis = self.hub.up2k.confirm_chunk(ptop, wark, chash)
|
||||||
ztis = x.get()
|
|
||||||
try:
|
try:
|
||||||
num_left, fin_path = ztis
|
num_left, fin_path = ztis
|
||||||
except:
|
except:
|
||||||
@ -2223,9 +2218,7 @@ class HttpCli(object):
|
|||||||
self.u2fh.close(path)
|
self.u2fh.close(path)
|
||||||
|
|
||||||
if not num_left and not self.args.nw:
|
if not num_left and not self.args.nw:
|
||||||
self.conn.hsrv.broker.ask(
|
self.hub.up2k.finish_upload(ptop, wark, self.u2fh.aps)
|
||||||
"up2k.finish_upload", ptop, wark, self.u2fh.aps
|
|
||||||
).get()
|
|
||||||
|
|
||||||
cinf = self.headers.get("x-up2k-stat", "")
|
cinf = self.headers.get("x-up2k-stat", "")
|
||||||
|
|
||||||
@ -2403,7 +2396,7 @@ class HttpCli(object):
|
|||||||
fdir_base = vfs.canonical(rem)
|
fdir_base = vfs.canonical(rem)
|
||||||
if lim:
|
if lim:
|
||||||
fdir_base, rem = lim.all(
|
fdir_base, rem = lim.all(
|
||||||
self.ip, rem, -1, vfs.realpath, fdir_base, self.conn.hsrv.broker
|
self.ip, rem, -1, vfs.realpath, fdir_base, self.hub
|
||||||
)
|
)
|
||||||
upload_vpath = "{}/{}".format(vfs.vpath, rem).strip("/")
|
upload_vpath = "{}/{}".format(vfs.vpath, rem).strip("/")
|
||||||
if not nullwrite:
|
if not nullwrite:
|
||||||
@ -2511,7 +2504,7 @@ class HttpCli(object):
|
|||||||
try:
|
try:
|
||||||
lim.chk_df(tabspath, sz, True)
|
lim.chk_df(tabspath, sz, True)
|
||||||
lim.chk_sz(sz)
|
lim.chk_sz(sz)
|
||||||
lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz)
|
lim.chk_vsz(self.hub, vfs.realpath, sz)
|
||||||
lim.chk_bup(self.ip)
|
lim.chk_bup(self.ip)
|
||||||
lim.chk_nup(self.ip)
|
lim.chk_nup(self.ip)
|
||||||
except:
|
except:
|
||||||
@ -2549,8 +2542,7 @@ class HttpCli(object):
|
|||||||
raise Pebkac(403, t)
|
raise Pebkac(403, t)
|
||||||
|
|
||||||
dbv, vrem = vfs.get_dbv(rem)
|
dbv, vrem = vfs.get_dbv(rem)
|
||||||
self.conn.hsrv.broker.say(
|
self.hub.up2k.hash_file(
|
||||||
"up2k.hash_file",
|
|
||||||
dbv.realpath,
|
dbv.realpath,
|
||||||
vfs.vpath,
|
vfs.vpath,
|
||||||
dbv.flags,
|
dbv.flags,
|
||||||
@ -2697,7 +2689,7 @@ class HttpCli(object):
|
|||||||
fp = vfs.canonical(rp)
|
fp = vfs.canonical(rp)
|
||||||
lim = vfs.get_dbv(rem)[0].lim
|
lim = vfs.get_dbv(rem)[0].lim
|
||||||
if lim:
|
if lim:
|
||||||
fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.conn.hsrv.broker)
|
fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.hub)
|
||||||
bos.makedirs(fp)
|
bos.makedirs(fp)
|
||||||
|
|
||||||
fp = os.path.join(fp, fn)
|
fp = os.path.join(fp, fn)
|
||||||
@ -2799,7 +2791,7 @@ class HttpCli(object):
|
|||||||
lim.bup(self.ip, sz)
|
lim.bup(self.ip, sz)
|
||||||
try:
|
try:
|
||||||
lim.chk_sz(sz)
|
lim.chk_sz(sz)
|
||||||
lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz)
|
lim.chk_vsz(self.hub, vfs.realpath, sz)
|
||||||
except:
|
except:
|
||||||
wunlink(self.log, fp, vfs.flags)
|
wunlink(self.log, fp, vfs.flags)
|
||||||
raise
|
raise
|
||||||
@ -2828,8 +2820,7 @@ class HttpCli(object):
|
|||||||
raise Pebkac(403, t)
|
raise Pebkac(403, t)
|
||||||
|
|
||||||
vfs, rem = vfs.get_dbv(rem)
|
vfs, rem = vfs.get_dbv(rem)
|
||||||
self.conn.hsrv.broker.say(
|
self.hub.up2k.hash_file(
|
||||||
"up2k.hash_file",
|
|
||||||
vfs.realpath,
|
vfs.realpath,
|
||||||
vfs.vpath,
|
vfs.vpath,
|
||||||
vfs.flags,
|
vfs.flags,
|
||||||
@ -3363,8 +3354,8 @@ class HttpCli(object):
|
|||||||
]
|
]
|
||||||
|
|
||||||
if self.avol and not self.args.no_rescan:
|
if self.avol and not self.args.no_rescan:
|
||||||
x = self.conn.hsrv.broker.ask("up2k.get_state")
|
zs = self.hub.up2k.get_state()
|
||||||
vs = json.loads(x.get())
|
vs = json.loads(zs)
|
||||||
vstate = {("/" + k).rstrip("/") + "/": v for k, v in vs["volstate"].items()}
|
vstate = {("/" + k).rstrip("/") + "/": v for k, v in vs["volstate"].items()}
|
||||||
else:
|
else:
|
||||||
vstate = {}
|
vstate = {}
|
||||||
@ -3508,10 +3499,8 @@ class HttpCli(object):
|
|||||||
|
|
||||||
vn, _ = self.asrv.vfs.get(self.vpath, self.uname, True, True)
|
vn, _ = self.asrv.vfs.get(self.vpath, self.uname, True, True)
|
||||||
|
|
||||||
args = [self.asrv.vfs.all_vols, [vn.vpath], False, True]
|
err = self.hub.up2k.rescan(self.asrv.vfs.all_vols, [vn.vpath], False, True)
|
||||||
|
|
||||||
x = self.conn.hsrv.broker.ask("up2k.rescan", *args)
|
|
||||||
err = x.get()
|
|
||||||
if not err:
|
if not err:
|
||||||
self.redirect("", "?h")
|
self.redirect("", "?h")
|
||||||
return True
|
return True
|
||||||
@ -3529,8 +3518,8 @@ class HttpCli(object):
|
|||||||
if self.args.no_reload:
|
if self.args.no_reload:
|
||||||
raise Pebkac(403, "the reload feature is disabled in server config")
|
raise Pebkac(403, "the reload feature is disabled in server config")
|
||||||
|
|
||||||
x = self.conn.hsrv.broker.ask("reload")
|
zs = self.hub.reload()
|
||||||
return self.redirect("", "?h", x.get(), "return to", False)
|
return self.redirect("", "?h", zs, "return to", False)
|
||||||
|
|
||||||
def tx_stack(self) -> bool:
|
def tx_stack(self) -> bool:
|
||||||
if not self.avol and not [x for x in self.wvol if x in self.rvol]:
|
if not self.avol and not [x for x in self.wvol if x in self.rvol]:
|
||||||
@ -3632,10 +3621,7 @@ class HttpCli(object):
|
|||||||
and (self.uname in vol.axs.uread or self.uname in vol.axs.upget)
|
and (self.uname in vol.axs.uread or self.uname in vol.axs.upget)
|
||||||
}
|
}
|
||||||
|
|
||||||
x = self.conn.hsrv.broker.ask(
|
uret = self.hub.up2k.get_unfinished_by_user(self.uname, self.ip)
|
||||||
"up2k.get_unfinished_by_user", self.uname, self.ip
|
|
||||||
)
|
|
||||||
uret = x.get()
|
|
||||||
|
|
||||||
if not self.args.unpost:
|
if not self.args.unpost:
|
||||||
allvols = []
|
allvols = []
|
||||||
@ -3721,10 +3707,8 @@ class HttpCli(object):
|
|||||||
nlim = int(self.uparam.get("lim") or 0)
|
nlim = int(self.uparam.get("lim") or 0)
|
||||||
lim = [nlim, nlim] if nlim else []
|
lim = [nlim, nlim] if nlim else []
|
||||||
|
|
||||||
x = self.conn.hsrv.broker.ask(
|
zs = self.hub.up2k.handle_rm(self.uname, self.ip, req, lim, False, unpost)
|
||||||
"up2k.handle_rm", self.uname, self.ip, req, lim, False, unpost
|
self.loud_reply(zs)
|
||||||
)
|
|
||||||
self.loud_reply(x.get())
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def handle_mv(self) -> bool:
|
def handle_mv(self) -> bool:
|
||||||
@ -3746,8 +3730,8 @@ class HttpCli(object):
|
|||||||
if self.args.no_mv:
|
if self.args.no_mv:
|
||||||
raise Pebkac(403, "the rename/move feature is disabled in server config")
|
raise Pebkac(403, "the rename/move feature is disabled in server config")
|
||||||
|
|
||||||
x = self.conn.hsrv.broker.ask("up2k.handle_mv", self.uname, vsrc, vdst)
|
zs = self.hub.up2k.handle_mv(self.uname, vsrc, vdst)
|
||||||
self.loud_reply(x.get(), status=201)
|
self.loud_reply(zs, status=201)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def tx_ls(self, ls: dict[str, Any]) -> bool:
|
def tx_ls(self, ls: dict[str, Any]) -> bool:
|
||||||
|
@ -58,7 +58,7 @@ class HttpConn(object):
|
|||||||
self.ipa_nm: Optional[NetMap] = hsrv.ipa_nm
|
self.ipa_nm: Optional[NetMap] = hsrv.ipa_nm
|
||||||
self.xff_nm: Optional[NetMap] = hsrv.xff_nm
|
self.xff_nm: Optional[NetMap] = hsrv.xff_nm
|
||||||
self.xff_lan: NetMap = hsrv.xff_lan # type: ignore
|
self.xff_lan: NetMap = hsrv.xff_lan # type: ignore
|
||||||
self.iphash: HMaccas = hsrv.broker.iphash
|
self.iphash: HMaccas = hsrv.hub.iphash
|
||||||
self.bans: dict[str, int] = hsrv.bans
|
self.bans: dict[str, int] = hsrv.bans
|
||||||
self.aclose: dict[str, int] = hsrv.aclose
|
self.aclose: dict[str, int] = hsrv.aclose
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ from .util import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .broker_util import BrokerCli
|
from .svchub import SvcHub
|
||||||
from .ssdp import SSDPr
|
from .ssdp import SSDPr
|
||||||
|
|
||||||
if True: # pylint: disable=using-constant-test
|
if True: # pylint: disable=using-constant-test
|
||||||
@ -90,16 +90,13 @@ class HttpSrv(object):
|
|||||||
relying on MpSrv for performance (HttpSrv is just plain threads)
|
relying on MpSrv for performance (HttpSrv is just plain threads)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, broker: "BrokerCli", nid: Optional[int]) -> None:
|
def __init__(self, hub: "SvcHub", nid: Optional[int]) -> None:
|
||||||
self.broker = broker
|
self.hub = hub
|
||||||
self.nid = nid
|
self.nid = nid
|
||||||
self.args = broker.args
|
self.args = hub.args
|
||||||
self.E: EnvParams = self.args.E
|
self.E: EnvParams = self.args.E
|
||||||
self.log = broker.log
|
self.log = hub.log
|
||||||
self.asrv = broker.asrv
|
self.asrv = hub.asrv
|
||||||
|
|
||||||
# redefine in case of multiprocessing
|
|
||||||
socket.setdefaulttimeout(120)
|
|
||||||
|
|
||||||
self.t0 = time.time()
|
self.t0 = time.time()
|
||||||
nsuf = "-n{}-i{:x}".format(nid, os.getpid()) if nid else ""
|
nsuf = "-n{}-i{:x}".format(nid, os.getpid()) if nid else ""
|
||||||
@ -169,7 +166,7 @@ class HttpSrv(object):
|
|||||||
if self.args.zs:
|
if self.args.zs:
|
||||||
from .ssdp import SSDPr
|
from .ssdp import SSDPr
|
||||||
|
|
||||||
self.ssdp = SSDPr(broker)
|
self.ssdp = SSDPr(hub)
|
||||||
|
|
||||||
if self.tp_q:
|
if self.tp_q:
|
||||||
self.start_threads(4)
|
self.start_threads(4)
|
||||||
@ -186,8 +183,7 @@ class HttpSrv(object):
|
|||||||
|
|
||||||
def post_init(self) -> None:
|
def post_init(self) -> None:
|
||||||
try:
|
try:
|
||||||
x = self.broker.ask("thumbsrv.getcfg")
|
self.th_cfg = self.hub.thumbsrv.getcfg()
|
||||||
self.th_cfg = x.get()
|
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -237,19 +233,11 @@ class HttpSrv(object):
|
|||||||
self.t_periodic = None
|
self.t_periodic = None
|
||||||
return
|
return
|
||||||
|
|
||||||
def listen(self, sck: socket.socket, nlisteners: int) -> None:
|
def listen(self, sck: socket.socket) -> None:
|
||||||
if self.args.j != 1:
|
|
||||||
# lost in the pickle; redefine
|
|
||||||
if not ANYWIN or self.args.reuseaddr:
|
|
||||||
sck.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
|
|
||||||
sck.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
||||||
sck.settimeout(None) # < does not inherit, ^ opts above do
|
|
||||||
|
|
||||||
ip, port = sck.getsockname()[:2]
|
ip, port = sck.getsockname()[:2]
|
||||||
self.srvs.append(sck)
|
self.srvs.append(sck)
|
||||||
self.bound.add((ip, port))
|
self.bound.add((ip, port))
|
||||||
self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners)
|
self.nclimax = self.args.nc
|
||||||
Daemon(
|
Daemon(
|
||||||
self.thr_listen,
|
self.thr_listen,
|
||||||
"httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port),
|
"httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port),
|
||||||
@ -265,7 +253,7 @@ class HttpSrv(object):
|
|||||||
self.log(self.name, msg)
|
self.log(self.name, msg)
|
||||||
|
|
||||||
def fun() -> None:
|
def fun() -> None:
|
||||||
self.broker.say("cb_httpsrv_up")
|
self.hub.cb_httpsrv_up()
|
||||||
|
|
||||||
threading.Thread(target=fun, name="sig-hsrv-up1").start()
|
threading.Thread(target=fun, name="sig-hsrv-up1").start()
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ if TYPE_CHECKING:
|
|||||||
class Metrics(object):
|
class Metrics(object):
|
||||||
def __init__(self, hsrv: "HttpSrv") -> None:
|
def __init__(self, hsrv: "HttpSrv") -> None:
|
||||||
self.hsrv = hsrv
|
self.hsrv = hsrv
|
||||||
|
self.hub = hsrv.hub
|
||||||
|
|
||||||
def tx(self, cli: "HttpCli") -> bool:
|
def tx(self, cli: "HttpCli") -> bool:
|
||||||
if not cli.avol:
|
if not cli.avol:
|
||||||
@ -88,8 +89,8 @@ class Metrics(object):
|
|||||||
addg("cpp_total_bans", str(self.hsrv.nban), t)
|
addg("cpp_total_bans", str(self.hsrv.nban), t)
|
||||||
|
|
||||||
if not args.nos_vst:
|
if not args.nos_vst:
|
||||||
x = self.hsrv.broker.ask("up2k.get_state")
|
zs = self.hub.up2k.get_state()
|
||||||
vs = json.loads(x.get())
|
vs = json.loads(zs.get())
|
||||||
|
|
||||||
nvidle = 0
|
nvidle = 0
|
||||||
nvbusy = 0
|
nvbusy = 0
|
||||||
@ -146,8 +147,7 @@ class Metrics(object):
|
|||||||
volsizes = []
|
volsizes = []
|
||||||
try:
|
try:
|
||||||
ptops = [x.realpath for _, x in allvols]
|
ptops = [x.realpath for _, x in allvols]
|
||||||
x = self.hsrv.broker.ask("up2k.get_volsizes", ptops)
|
volsizes = self.hub.up2k.get_volsizes(ptops)
|
||||||
volsizes = x.get()
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
cli.log("tx_stats get_volsizes: {!r}".format(ex), 3)
|
cli.log("tx_stats get_volsizes: {!r}".format(ex), 3)
|
||||||
|
|
||||||
@ -204,8 +204,7 @@ class Metrics(object):
|
|||||||
tnbytes = 0
|
tnbytes = 0
|
||||||
tnfiles = 0
|
tnfiles = 0
|
||||||
try:
|
try:
|
||||||
x = self.hsrv.broker.ask("up2k.get_unfinished")
|
xs = self.hub.up2k.get_unfinished()
|
||||||
xs = x.get()
|
|
||||||
if not xs:
|
if not xs:
|
||||||
raise Exception("up2k mutex acquisition timed out")
|
raise Exception("up2k mutex acquisition timed out")
|
||||||
|
|
||||||
|
@ -12,7 +12,6 @@ from .multicast import MC_Sck, MCast
|
|||||||
from .util import CachedSet, html_escape, min_ex
|
from .util import CachedSet, html_escape, min_ex
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .broker_util import BrokerCli
|
|
||||||
from .httpcli import HttpCli
|
from .httpcli import HttpCli
|
||||||
from .svchub import SvcHub
|
from .svchub import SvcHub
|
||||||
|
|
||||||
@ -32,9 +31,9 @@ class SSDP_Sck(MC_Sck):
|
|||||||
class SSDPr(object):
|
class SSDPr(object):
|
||||||
"""generates http responses for httpcli"""
|
"""generates http responses for httpcli"""
|
||||||
|
|
||||||
def __init__(self, broker: "BrokerCli") -> None:
|
def __init__(self, hub: "SvcHub") -> None:
|
||||||
self.broker = broker
|
self.hub = hub
|
||||||
self.args = broker.args
|
self.args = hub.args
|
||||||
|
|
||||||
def reply(self, hc: "HttpCli") -> bool:
|
def reply(self, hc: "HttpCli") -> bool:
|
||||||
if hc.vpath.endswith("device.xml"):
|
if hc.vpath.endswith("device.xml"):
|
||||||
|
@ -28,9 +28,10 @@ if True: # pylint: disable=using-constant-test
|
|||||||
import typing
|
import typing
|
||||||
from typing import Any, Optional, Union
|
from typing import Any, Optional, Union
|
||||||
|
|
||||||
from .__init__ import ANYWIN, EXE, MACOS, TYPE_CHECKING, E, EnvParams, unicode
|
from .__init__ import ANYWIN, EXE, TYPE_CHECKING, E, EnvParams, unicode
|
||||||
from .authsrv import BAD_CFG, AuthSrv
|
from .authsrv import BAD_CFG, AuthSrv
|
||||||
from .cert import ensure_cert
|
from .cert import ensure_cert
|
||||||
|
from .httpsrv import HttpSrv
|
||||||
from .mtag import HAVE_FFMPEG, HAVE_FFPROBE
|
from .mtag import HAVE_FFMPEG, HAVE_FFPROBE
|
||||||
from .tcpsrv import TcpSrv
|
from .tcpsrv import TcpSrv
|
||||||
from .th_srv import HAVE_PIL, HAVE_VIPS, HAVE_WEBP, ThumbSrv
|
from .th_srv import HAVE_PIL, HAVE_VIPS, HAVE_WEBP, ThumbSrv
|
||||||
@ -51,7 +52,6 @@ from .util import (
|
|||||||
ansi_re,
|
ansi_re,
|
||||||
build_netmap,
|
build_netmap,
|
||||||
min_ex,
|
min_ex,
|
||||||
mp,
|
|
||||||
odfusion,
|
odfusion,
|
||||||
pybin,
|
pybin,
|
||||||
start_log_thrs,
|
start_log_thrs,
|
||||||
@ -67,16 +67,6 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
|
|
||||||
class SvcHub(object):
|
class SvcHub(object):
|
||||||
"""
|
|
||||||
Hosts all services which cannot be parallelized due to reliance on monolithic resources.
|
|
||||||
Creates a Broker which does most of the heavy stuff; hosted services can use this to perform work:
|
|
||||||
hub.broker.<say|ask>(destination, args_list).
|
|
||||||
|
|
||||||
Either BrokerThr (plain threads) or BrokerMP (multiprocessing) is used depending on configuration.
|
|
||||||
Nothing is returned synchronously; if you want any value returned from the call,
|
|
||||||
put() can return a queue (if want_reply=True) which has a blocking get() with the response.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
args: argparse.Namespace,
|
args: argparse.Namespace,
|
||||||
@ -163,16 +153,6 @@ class SvcHub(object):
|
|||||||
if args.log_thrs:
|
if args.log_thrs:
|
||||||
start_log_thrs(self.log, args.log_thrs, 0)
|
start_log_thrs(self.log, args.log_thrs, 0)
|
||||||
|
|
||||||
if not args.use_fpool and args.j != 1:
|
|
||||||
args.no_fpool = True
|
|
||||||
t = "multithreading enabled with -j {}, so disabling fpool -- this can reduce upload performance on some filesystems"
|
|
||||||
self.log("root", t.format(args.j))
|
|
||||||
|
|
||||||
if not args.no_fpool and args.j != 1:
|
|
||||||
t = "WARNING: ignoring --use-fpool because multithreading (-j{}) is enabled"
|
|
||||||
self.log("root", t.format(args.j), c=3)
|
|
||||||
args.no_fpool = True
|
|
||||||
|
|
||||||
for name, arg in (
|
for name, arg in (
|
||||||
("iobuf", "iobuf"),
|
("iobuf", "iobuf"),
|
||||||
("s-rd-sz", "s_rd_sz"),
|
("s-rd-sz", "s_rd_sz"),
|
||||||
@ -316,13 +296,7 @@ class SvcHub(object):
|
|||||||
self.mdns: Optional["MDNS"] = None
|
self.mdns: Optional["MDNS"] = None
|
||||||
self.ssdp: Optional["SSDPd"] = None
|
self.ssdp: Optional["SSDPd"] = None
|
||||||
|
|
||||||
# decide which worker impl to use
|
self.httpsrv = HttpSrv(self, None)
|
||||||
if self.check_mp_enable():
|
|
||||||
from .broker_mp import BrokerMp as Broker
|
|
||||||
else:
|
|
||||||
from .broker_thr import BrokerThr as Broker # type: ignore
|
|
||||||
|
|
||||||
self.broker = Broker(self)
|
|
||||||
|
|
||||||
def start_ftpd(self) -> None:
|
def start_ftpd(self) -> None:
|
||||||
time.sleep(30)
|
time.sleep(30)
|
||||||
@ -361,15 +335,14 @@ class SvcHub(object):
|
|||||||
|
|
||||||
def thr_httpsrv_up(self) -> None:
|
def thr_httpsrv_up(self) -> None:
|
||||||
time.sleep(1 if self.args.ign_ebind_all else 5)
|
time.sleep(1 if self.args.ign_ebind_all else 5)
|
||||||
expected = self.broker.num_workers * self.tcpsrv.nsrv
|
expected = self.tcpsrv.nsrv
|
||||||
failed = expected - self.httpsrv_up
|
failed = expected - self.httpsrv_up
|
||||||
if not failed:
|
if not failed:
|
||||||
return
|
return
|
||||||
|
|
||||||
if self.args.ign_ebind_all:
|
if self.args.ign_ebind_all:
|
||||||
if not self.tcpsrv.srv:
|
if not self.tcpsrv.srv:
|
||||||
for _ in range(self.broker.num_workers):
|
self.cb_httpsrv_up()
|
||||||
self.broker.say("cb_httpsrv_up")
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if self.args.ign_ebind and self.tcpsrv.srv:
|
if self.args.ign_ebind and self.tcpsrv.srv:
|
||||||
@ -387,8 +360,6 @@ class SvcHub(object):
|
|||||||
|
|
||||||
def cb_httpsrv_up(self) -> None:
|
def cb_httpsrv_up(self) -> None:
|
||||||
self.httpsrv_up += 1
|
self.httpsrv_up += 1
|
||||||
if self.httpsrv_up != self.broker.num_workers:
|
|
||||||
return
|
|
||||||
|
|
||||||
ar = self.args
|
ar = self.args
|
||||||
for _ in range(10 if ar.ftp or ar.ftps else 0):
|
for _ in range(10 if ar.ftp or ar.ftps else 0):
|
||||||
@ -723,7 +694,6 @@ class SvcHub(object):
|
|||||||
self.log("root", "reloading config")
|
self.log("root", "reloading config")
|
||||||
self.asrv.reload()
|
self.asrv.reload()
|
||||||
self.up2k.reload(rescan_all_vols)
|
self.up2k.reload(rescan_all_vols)
|
||||||
self.broker.reload()
|
|
||||||
self.reloading = 0
|
self.reloading = 0
|
||||||
|
|
||||||
def _reload_blocking(self, rescan_all_vols: bool = True) -> None:
|
def _reload_blocking(self, rescan_all_vols: bool = True) -> None:
|
||||||
@ -808,7 +778,7 @@ class SvcHub(object):
|
|||||||
tasks.append(Daemon(self.ssdp.stop, "ssdp"))
|
tasks.append(Daemon(self.ssdp.stop, "ssdp"))
|
||||||
slp = time.time() + 0.5
|
slp = time.time() + 0.5
|
||||||
|
|
||||||
self.broker.shutdown()
|
self.httpsrv.shutdown()
|
||||||
self.tcpsrv.shutdown()
|
self.tcpsrv.shutdown()
|
||||||
self.up2k.shutdown()
|
self.up2k.shutdown()
|
||||||
|
|
||||||
@ -970,48 +940,6 @@ class SvcHub(object):
|
|||||||
if ex.errno != errno.EPIPE:
|
if ex.errno != errno.EPIPE:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def check_mp_support(self) -> str:
|
|
||||||
if MACOS:
|
|
||||||
return "multiprocessing is wonky on mac osx;"
|
|
||||||
elif sys.version_info < (3, 3):
|
|
||||||
return "need python 3.3 or newer for multiprocessing;"
|
|
||||||
|
|
||||||
try:
|
|
||||||
x: mp.Queue[tuple[str, str]] = mp.Queue(1)
|
|
||||||
x.put(("foo", "bar"))
|
|
||||||
if x.get()[0] != "foo":
|
|
||||||
raise Exception()
|
|
||||||
except:
|
|
||||||
return "multiprocessing is not supported on your platform;"
|
|
||||||
|
|
||||||
return ""
|
|
||||||
|
|
||||||
def check_mp_enable(self) -> bool:
|
|
||||||
if self.args.j == 1:
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
if mp.cpu_count() <= 1:
|
|
||||||
raise Exception()
|
|
||||||
except:
|
|
||||||
self.log("svchub", "only one CPU detected; multiprocessing disabled")
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
# support vscode debugger (bonus: same behavior as on windows)
|
|
||||||
mp.set_start_method("spawn", True)
|
|
||||||
except AttributeError:
|
|
||||||
# py2.7 probably, anyways dontcare
|
|
||||||
pass
|
|
||||||
|
|
||||||
err = self.check_mp_support()
|
|
||||||
if not err:
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
self.log("svchub", err)
|
|
||||||
self.log("svchub", "cannot efficiently use multiple CPU cores")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def sd_notify(self) -> None:
|
def sd_notify(self) -> None:
|
||||||
try:
|
try:
|
||||||
zb = os.getenv("NOTIFY_SOCKET")
|
zb = os.getenv("NOTIFY_SOCKET")
|
||||||
|
@ -297,7 +297,7 @@ class TcpSrv(object):
|
|||||||
if self.args.q:
|
if self.args.q:
|
||||||
print(msg)
|
print(msg)
|
||||||
|
|
||||||
self.hub.broker.say("listen", srv)
|
self.hub.httpsrv.listen(srv)
|
||||||
|
|
||||||
self.srv = srvs
|
self.srv = srvs
|
||||||
self.bound = bound
|
self.bound = bound
|
||||||
@ -305,7 +305,7 @@ class TcpSrv(object):
|
|||||||
self._distribute_netdevs()
|
self._distribute_netdevs()
|
||||||
|
|
||||||
def _distribute_netdevs(self):
|
def _distribute_netdevs(self):
|
||||||
self.hub.broker.say("set_netdevs", self.netdevs)
|
self.hub.httpsrv.set_netdevs(self.netdevs)
|
||||||
self.hub.start_zeroconf()
|
self.hub.start_zeroconf()
|
||||||
gencert(self.log, self.args, self.netdevs)
|
gencert(self.log, self.args, self.netdevs)
|
||||||
self.hub.restart_ftpd()
|
self.hub.restart_ftpd()
|
||||||
|
@ -7,7 +7,6 @@ from .__init__ import TYPE_CHECKING
|
|||||||
from .authsrv import VFS
|
from .authsrv import VFS
|
||||||
from .bos import bos
|
from .bos import bos
|
||||||
from .th_srv import HAVE_WEBP, thumb_path
|
from .th_srv import HAVE_WEBP, thumb_path
|
||||||
from .util import Cooldown
|
|
||||||
|
|
||||||
if True: # pylint: disable=using-constant-test
|
if True: # pylint: disable=using-constant-test
|
||||||
from typing import Optional, Union
|
from typing import Optional, Union
|
||||||
@ -18,14 +17,11 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
class ThumbCli(object):
|
class ThumbCli(object):
|
||||||
def __init__(self, hsrv: "HttpSrv") -> None:
|
def __init__(self, hsrv: "HttpSrv") -> None:
|
||||||
self.broker = hsrv.broker
|
self.hub = hsrv.hub
|
||||||
self.log_func = hsrv.log
|
self.log_func = hsrv.log
|
||||||
self.args = hsrv.args
|
self.args = hsrv.args
|
||||||
self.asrv = hsrv.asrv
|
self.asrv = hsrv.asrv
|
||||||
|
|
||||||
# cache on both sides for less broker spam
|
|
||||||
self.cooldown = Cooldown(self.args.th_poke)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
c = hsrv.th_cfg
|
c = hsrv.th_cfg
|
||||||
if not c:
|
if not c:
|
||||||
@ -134,13 +130,11 @@ class ThumbCli(object):
|
|||||||
|
|
||||||
if ret:
|
if ret:
|
||||||
tdir = os.path.dirname(tpath)
|
tdir = os.path.dirname(tpath)
|
||||||
if self.cooldown.poke(tdir):
|
self.hub.thumbsrv.poke(tdir)
|
||||||
self.broker.say("thumbsrv.poke", tdir)
|
|
||||||
|
|
||||||
if want_opus:
|
if want_opus:
|
||||||
# audio files expire individually
|
# audio files expire individually
|
||||||
if self.cooldown.poke(tpath):
|
self.hub.thumbsrv.poke(tpath)
|
||||||
self.broker.say("thumbsrv.poke", tpath)
|
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -150,5 +144,4 @@ class ThumbCli(object):
|
|||||||
if not bos.path.getsize(os.path.join(ptop, rem)):
|
if not bos.path.getsize(os.path.join(ptop, rem)):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
x = self.broker.ask("thumbsrv.get", ptop, rem, mtime, fmt)
|
return self.hub.thumbsrv.get(ptop, rem, mtime, fmt)
|
||||||
return x.get() # type: ignore
|
|
||||||
|
@ -2745,9 +2745,9 @@ class Up2k(object):
|
|||||||
cj["size"],
|
cj["size"],
|
||||||
cj["ptop"],
|
cj["ptop"],
|
||||||
ap1,
|
ap1,
|
||||||
self.hub.broker,
|
self.hub,
|
||||||
reg,
|
reg,
|
||||||
"up2k._get_volsize",
|
"_get_volsize",
|
||||||
)
|
)
|
||||||
bos.makedirs(ap2)
|
bos.makedirs(ap2)
|
||||||
vfs.lim.nup(cj["addr"])
|
vfs.lim.nup(cj["addr"])
|
||||||
|
@ -69,8 +69,6 @@ sed -ri s/copyparty.exe/copyparty$esuf.exe/ loader.rc2
|
|||||||
|
|
||||||
excl=(
|
excl=(
|
||||||
asyncio
|
asyncio
|
||||||
copyparty.broker_mp
|
|
||||||
copyparty.broker_mpw
|
|
||||||
copyparty.smbd
|
copyparty.smbd
|
||||||
ctypes.macholib
|
ctypes.macholib
|
||||||
curses
|
curses
|
||||||
|
@ -7,10 +7,6 @@ copyparty/bos,
|
|||||||
copyparty/bos/__init__.py,
|
copyparty/bos/__init__.py,
|
||||||
copyparty/bos/bos.py,
|
copyparty/bos/bos.py,
|
||||||
copyparty/bos/path.py,
|
copyparty/bos/path.py,
|
||||||
copyparty/broker_mp.py,
|
|
||||||
copyparty/broker_mpw.py,
|
|
||||||
copyparty/broker_thr.py,
|
|
||||||
copyparty/broker_util.py,
|
|
||||||
copyparty/cert.py,
|
copyparty/cert.py,
|
||||||
copyparty/cfg.py,
|
copyparty/cfg.py,
|
||||||
copyparty/dxml.py,
|
copyparty/dxml.py,
|
||||||
|
@ -170,12 +170,14 @@ class Cfg(Namespace):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class NullBroker(object):
|
class NullUp2k(object):
|
||||||
def say(self, *args):
|
def hash_file(*a):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def ask(self, *args):
|
|
||||||
pass
|
class NullHub(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.up2k = NullUp2k()
|
||||||
|
|
||||||
|
|
||||||
class VSock(object):
|
class VSock(object):
|
||||||
@ -206,7 +208,7 @@ class VHttpSrv(object):
|
|||||||
self.asrv = asrv
|
self.asrv = asrv
|
||||||
self.log = log
|
self.log = log
|
||||||
|
|
||||||
self.broker = NullBroker()
|
self.hub = NullHub()
|
||||||
self.prism = None
|
self.prism = None
|
||||||
self.bans = {}
|
self.bans = {}
|
||||||
self.nreq = 0
|
self.nreq = 0
|
||||||
|
Loading…
Reference in New Issue
Block a user