diff --git a/bin/up2k.py b/bin/up2k.py index 3cb43412..b034b3ab 100755 --- a/bin/up2k.py +++ b/bin/up2k.py @@ -3,7 +3,7 @@ from __future__ import print_function, unicode_literals """ up2k.py: upload to copyparty -2022-08-08, v0.16, ed , MIT-Licensed +2022-08-10, v0.17, ed , MIT-Licensed https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py - dependencies: requests @@ -22,12 +22,29 @@ import atexit import signal import base64 import hashlib -import argparse import platform import threading import datetime -import requests +try: + import argparse +except: + m = "\n ERROR: need 'argparse'; download it here:\n https://github.com/ThomasWaldmann/argparse/raw/master/argparse.py\n" + print(m) + raise + +try: + import requests +except: + if sys.version_info > (2, 7): + m = "\n ERROR: need 'requests'; run this:\n python -m pip install --user requests\n" + else: + m = "requests/2.18.4 urllib3/1.23 chardet/3.0.4 certifi/2020.4.5.1 idna/2.7" + m = [" https://pypi.org/project/" + x + "/#files" for x in m.split()] + m = "\n ERROR: need these:\n" + "\n".join(m) + "\n" + + print(m) + raise # from copyparty/__init__.py @@ -126,6 +143,89 @@ class FileSlice(object): return ret +class MTHash(object): + def __init__(self, cores): + self.f = None + self.sz = 0 + self.csz = 0 + self.omutex = threading.Lock() + self.imutex = threading.Lock() + self.work_q = Queue() + self.done_q = Queue() + self.thrs = [] + for _ in range(cores): + t = threading.Thread(target=self.worker) + t.daemon = True + t.start() + self.thrs.append(t) + + def hash(self, f, fsz, chunksz, pcb=None, pcb_opaque=None): + with self.omutex: + self.f = f + self.sz = fsz + self.csz = chunksz + + chunks = {} + nchunks = int(math.ceil(fsz / chunksz)) + for nch in range(nchunks): + self.work_q.put(nch) + + ex = "" + for nch in range(nchunks): + qe = self.done_q.get() + try: + nch, dig, ofs, csz = qe + chunks[nch] = [dig, ofs, csz] + except: + ex = ex or qe + + if pcb: + pcb(pcb_opaque, chunksz * nch) + + if ex: + raise Exception(ex) + + ret = [] + for n in range(nchunks): + ret.append(chunks[n]) + + self.f = None + self.csz = 0 + self.sz = 0 + return ret + + def worker(self): + while True: + ofs = self.work_q.get() + try: + v = self.hash_at(ofs) + except Exception as ex: + v = str(ex) + + self.done_q.put(v) + + def hash_at(self, nch): + f = self.f + ofs = ofs0 = nch * self.csz + hashobj = hashlib.sha512() + chunk_sz = chunk_rem = min(self.csz, self.sz - ofs) + while chunk_rem > 0: + with self.imutex: + f.seek(ofs) + buf = f.read(min(chunk_rem, 1024 * 1024 * 12)) + + if not buf: + raise Exception("EOF at " + str(ofs)) + + hashobj.update(buf) + chunk_rem -= len(buf) + ofs += len(buf) + + digest = hashobj.digest()[:33] + digest = base64.urlsafe_b64encode(digest).decode("utf-8") + return nch, digest, ofs0, chunk_sz + + _print = print @@ -322,8 +422,8 @@ def up2k_chunksize(filesize): # mostly from copyparty/up2k.py -def get_hashlist(file, pcb): - # type: (File, any) -> None +def get_hashlist(file, pcb, mth): + # type: (File, any, any) -> None """generates the up2k hashlist from file contents, inserts it into `file`""" chunk_sz = up2k_chunksize(file.size) @@ -331,7 +431,12 @@ def get_hashlist(file, pcb): file_ofs = 0 ret = [] with open(file.abs, "rb", 512 * 1024) as f: + if mth and file.size >= 1024 * 512: + ret = mth.hash(f, file.size, chunk_sz, pcb, file) + file_rem = 0 + while file_rem > 0: + # same as `hash_at` except for `imutex` / bufsz hashobj = hashlib.sha512() chunk_sz = chunk_rem = min(chunk_sz, file_rem) while chunk_rem > 0: @@ -399,7 +504,7 @@ def handshake(req_ses, url, file, pw, search): raise Exception(r.text) if search: - return r["hits"] + return r["hits"], False try: pre, url = url.split("://") @@ -517,6 +622,8 @@ class Ctl(object): self.st_hash = [None, "(idle, starting...)"] # type: tuple[File, int] self.st_up = [None, "(idle, starting...)"] # type: tuple[File, int] + self.mth = MTHash(ar.J) if ar.J > 1 else None + self._fancy() def _safe(self): @@ -527,7 +634,7 @@ class Ctl(object): upath = file.abs.decode("utf-8", "replace") print("{0} {1}\n hash...".format(self.nfiles - nf, upath)) - get_hashlist(file, None) + get_hashlist(file, None, None) burl = self.ar.url[:12] + self.ar.url[8:].split("/")[0] + "/" while True: @@ -680,7 +787,7 @@ class Ctl(object): time.sleep(0.05) - get_hashlist(file, self.cb_hasher) + get_hashlist(file, self.cb_hasher, self.mth) with self.mutex: self.hash_f += 1 self.hash_c += len(file.cids) @@ -809,6 +916,9 @@ def main(): if not VT100: os.system("rem") # enables colors + cores = os.cpu_count() if hasattr(os, "cpu_count") else 4 + hcores = min(cores, 3) # 4% faster than 4+ on py3.9 @ r5-4500U + # fmt: off ap = app = argparse.ArgumentParser(formatter_class=APF, epilog=""" NOTE: @@ -824,6 +934,7 @@ source file/folder selection uses rsync syntax, meaning that: ap.add_argument("--ok", action="store_true", help="continue even if some local files are inaccessible") ap = app.add_argument_group("performance tweaks") ap.add_argument("-j", type=int, metavar="THREADS", default=4, help="parallel connections") + ap.add_argument("-J", type=int, metavar="THREADS", default=hcores, help="num cpu-cores to use for hashing; set 0 or 1 for single-core hashing") ap.add_argument("-nh", action="store_true", help="disable hashing while uploading") ap.add_argument("--safe", action="store_true", help="use simple fallback approach") ap.add_argument("-z", action="store_true", help="ZOOMIN' (skip uploading files if they exist at the destination with the ~same last-modified timestamp, so same as yolo / turbo with date-chk but even faster)") diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 13f07d99..249a8936 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -336,6 +336,7 @@ def run_argparse(argv: list[str], formatter: Any, retry: bool) -> argparse.Names fk_salt = "hunter2" cores = os.cpu_count() if hasattr(os, "cpu_count") else 4 + hcores = min(cores, 3) # 4% faster than 4+ on py3.9 @ r5-4500U sects = [ [ @@ -611,6 +612,7 @@ def run_argparse(argv: list[str], formatter: Any, retry: bool) -> argparse.Names ap2.add_argument("--no-idx", metavar="PTN", type=u, help="regex: disable indexing of matching paths during e2ds folder scans") ap2.add_argument("--xdev", action="store_true", help="do not descend into other filesystems (symlink or bind-mount to another HDD, ...)") ap2.add_argument("--xvol", action="store_true", help="skip symlinks leaving the volume root") + ap2.add_argument("--hash-mt", metavar="CORES", type=int, default=hcores, help="num cpu cores to use for file hashing; set 0 or 1 for single-core hashing") ap2.add_argument("--re-maxage", metavar="SEC", type=int, default=0, help="disk rescan volume interval, 0=off, can be set per-volume with the 'scan' volflag") ap2.add_argument("--db-act", metavar="SEC", type=float, default=10, help="defer any scheduled volume reindexing until SEC seconds after last db write (uploads, renames, ...)") ap2.add_argument("--srch-time", metavar="SEC", type=int, default=30, help="search deadline -- terminate searches running for more than SEC seconds") diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 467f4314..7625b2a6 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -28,6 +28,7 @@ from .mtag import MParser, MTag from .util import ( HAVE_SQLITE3, SYMTIME, + MTHash, Pebkac, ProgressPrinter, absreal, @@ -155,6 +156,11 @@ class Up2k(object): self.fstab = Fstab(self.log_func) + if self.args.hash_mt < 2: + self.mth: Optional[MTHash] = None + else: + self.mth = MTHash(self.args.hash_mt) + if self.args.no_fastboot: self.deferred_init() @@ -841,7 +847,7 @@ class Up2k(object): self.pp.msg = "a{} {}".format(self.pp.n, abspath) - if nohash: + if nohash or not sz: wark = up2k_wark_from_metadata(self.salt, sz, lmod, rd, fn) else: if sz > 1024 * 1024: @@ -1059,7 +1065,7 @@ class Up2k(object): sz2 = st.st_size mt2 = int(st.st_mtime) - if nohash: + if nohash or not sz2: w2 = up2k_wark_from_metadata(self.salt, sz2, mt2, rd, fn) else: if sz2 > 1024 * 1024 * 32: @@ -2625,14 +2631,21 @@ class Up2k(object): fsz = bos.path.getsize(path) csz = up2k_chunksize(fsz) ret = [] + suffix = " MB, {}".format(path) with open(fsenc(path), "rb", 512 * 1024) as f: + if self.mth and fsz >= 1024 * 512: + tlt = self.mth.hash(f, fsz, csz, self.pp, prefix, suffix) + ret = [x[0] for x in tlt] + fsz = 0 + while fsz > 0: + # same as `hash_at` except for `imutex` / bufsz if self.stop: return [] if self.pp: mb = int(fsz / 1024 / 1024) - self.pp.msg = "{}{} MB, {}".format(prefix, mb, path) + self.pp.msg = prefix + str(mb) + suffix hashobj = hashlib.sha512() rem = min(csz, fsz) @@ -2873,11 +2886,17 @@ class Up2k(object): abspath = os.path.join(ptop, rd, fn) self.log("hashing " + abspath) inf = bos.stat(abspath) - hashes = self._hashlist_from_file(abspath) - if not hashes: - return + if not inf.st_size: + wark = up2k_wark_from_metadata( + self.salt, inf.st_size, int(inf.st_mtime), rd, fn + ) + else: + hashes = self._hashlist_from_file(abspath) + if not hashes: + return + + wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes) - wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes) with self.mutex: self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size, ip, at) @@ -2893,6 +2912,9 @@ class Up2k(object): def shutdown(self) -> None: self.stop = True + if self.mth: + self.mth.stop = True + for x in list(self.spools): self._unspool(x) diff --git a/copyparty/util.py b/copyparty/util.py index 61352359..0138b868 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -4,6 +4,7 @@ from __future__ import print_function, unicode_literals import base64 import contextlib import hashlib +import math import mimetypes import os import platform @@ -21,6 +22,8 @@ import traceback from collections import Counter from datetime import datetime +from queue import Queue + from .__init__ import ANYWIN, PY2, TYPE_CHECKING, VT100, WINDOWS from .__version__ import S_BUILD_DT, S_VERSION from .stolen import surrogateescape @@ -494,6 +497,104 @@ class ProgressPrinter(threading.Thread): sys.stdout.flush() # necessary on win10 even w/ stderr btw +class MTHash(object): + def __init__(self, cores: int): + self.pp: Optional[ProgressPrinter] = None + self.f: Optional[typing.BinaryIO] = None + self.sz = 0 + self.csz = 0 + self.stop = False + self.omutex = threading.Lock() + self.imutex = threading.Lock() + self.work_q: Queue[int] = Queue() + self.done_q: Queue[tuple[int, str, int, int]] = Queue() + self.thrs = [] + for _ in range(cores): + t = threading.Thread(target=self.worker) + t.daemon = True + t.start() + self.thrs.append(t) + + def hash( + self, + f: typing.BinaryIO, + fsz: int, + chunksz: int, + pp: Optional[ProgressPrinter] = None, + prefix: str = "", + suffix: str = "", + ) -> list[tuple[str, int, int]]: + with self.omutex: + self.f = f + self.sz = fsz + self.csz = chunksz + + chunks: dict[int, tuple[str, int, int]] = {} + nchunks = int(math.ceil(fsz / chunksz)) + for nch in range(nchunks): + self.work_q.put(nch) + + ex = "" + for nch in range(nchunks): + qe = self.done_q.get() + try: + nch, dig, ofs, csz = qe + chunks[nch] = (dig, ofs, csz) + except: + ex = ex or str(qe) + + if pp: + mb = int((fsz - nch * chunksz) / 1024 / 1024) + pp.msg = prefix + str(mb) + suffix + + if ex: + raise Exception(ex) + + ret = [] + for n in range(nchunks): + ret.append(chunks[n]) + + self.f = None + self.csz = 0 + self.sz = 0 + return ret + + def worker(self) -> None: + while True: + ofs = self.work_q.get() + try: + v = self.hash_at(ofs) + except Exception as ex: + v = str(ex) # type: ignore + + self.done_q.put(v) + + def hash_at(self, nch: int) -> tuple[int, str, int, int]: + f = self.f + ofs = ofs0 = nch * self.csz + chunk_sz = chunk_rem = min(self.csz, self.sz - ofs) + if self.stop: + return nch, "", ofs0, chunk_sz + + assert f + hashobj = hashlib.sha512() + while chunk_rem > 0: + with self.imutex: + f.seek(ofs) + buf = f.read(min(chunk_rem, 1024 * 1024 * 12)) + + if not buf: + raise Exception("EOF at " + str(ofs)) + + hashobj.update(buf) + chunk_rem -= len(buf) + ofs += len(buf) + + bdig = hashobj.digest()[:33] + udig = base64.urlsafe_b64encode(bdig).decode("utf-8") + return nch, udig, ofs0, chunk_sz + + def uprint(msg: str) -> None: try: print(msg, end="")