parallelize python hashing too

This commit is contained in:
ed 2022-08-10 23:12:01 +02:00
parent 7de9775dd9
commit 92ed4ba3f8
4 changed files with 251 additions and 15 deletions

View File

@ -3,7 +3,7 @@ from __future__ import print_function, unicode_literals
""" """
up2k.py: upload to copyparty up2k.py: upload to copyparty
2022-08-08, v0.16, ed <irc.rizon.net>, MIT-Licensed 2022-08-10, v0.17, ed <irc.rizon.net>, MIT-Licensed
https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py
- dependencies: requests - dependencies: requests
@ -22,12 +22,29 @@ import atexit
import signal import signal
import base64 import base64
import hashlib import hashlib
import argparse
import platform import platform
import threading import threading
import datetime import datetime
import requests try:
import argparse
except:
m = "\n ERROR: need 'argparse'; download it here:\n https://github.com/ThomasWaldmann/argparse/raw/master/argparse.py\n"
print(m)
raise
try:
import requests
except:
if sys.version_info > (2, 7):
m = "\n ERROR: need 'requests'; run this:\n python -m pip install --user requests\n"
else:
m = "requests/2.18.4 urllib3/1.23 chardet/3.0.4 certifi/2020.4.5.1 idna/2.7"
m = [" https://pypi.org/project/" + x + "/#files" for x in m.split()]
m = "\n ERROR: need these:\n" + "\n".join(m) + "\n"
print(m)
raise
# from copyparty/__init__.py # from copyparty/__init__.py
@ -126,6 +143,89 @@ class FileSlice(object):
return ret return ret
class MTHash(object):
def __init__(self, cores):
self.f = None
self.sz = 0
self.csz = 0
self.omutex = threading.Lock()
self.imutex = threading.Lock()
self.work_q = Queue()
self.done_q = Queue()
self.thrs = []
for _ in range(cores):
t = threading.Thread(target=self.worker)
t.daemon = True
t.start()
self.thrs.append(t)
def hash(self, f, fsz, chunksz, pcb=None, pcb_opaque=None):
with self.omutex:
self.f = f
self.sz = fsz
self.csz = chunksz
chunks = {}
nchunks = int(math.ceil(fsz / chunksz))
for nch in range(nchunks):
self.work_q.put(nch)
ex = ""
for nch in range(nchunks):
qe = self.done_q.get()
try:
nch, dig, ofs, csz = qe
chunks[nch] = [dig, ofs, csz]
except:
ex = ex or qe
if pcb:
pcb(pcb_opaque, chunksz * nch)
if ex:
raise Exception(ex)
ret = []
for n in range(nchunks):
ret.append(chunks[n])
self.f = None
self.csz = 0
self.sz = 0
return ret
def worker(self):
while True:
ofs = self.work_q.get()
try:
v = self.hash_at(ofs)
except Exception as ex:
v = str(ex)
self.done_q.put(v)
def hash_at(self, nch):
f = self.f
ofs = ofs0 = nch * self.csz
hashobj = hashlib.sha512()
chunk_sz = chunk_rem = min(self.csz, self.sz - ofs)
while chunk_rem > 0:
with self.imutex:
f.seek(ofs)
buf = f.read(min(chunk_rem, 1024 * 1024 * 12))
if not buf:
raise Exception("EOF at " + str(ofs))
hashobj.update(buf)
chunk_rem -= len(buf)
ofs += len(buf)
digest = hashobj.digest()[:33]
digest = base64.urlsafe_b64encode(digest).decode("utf-8")
return nch, digest, ofs0, chunk_sz
_print = print _print = print
@ -322,8 +422,8 @@ def up2k_chunksize(filesize):
# mostly from copyparty/up2k.py # mostly from copyparty/up2k.py
def get_hashlist(file, pcb): def get_hashlist(file, pcb, mth):
# type: (File, any) -> None # type: (File, any, any) -> None
"""generates the up2k hashlist from file contents, inserts it into `file`""" """generates the up2k hashlist from file contents, inserts it into `file`"""
chunk_sz = up2k_chunksize(file.size) chunk_sz = up2k_chunksize(file.size)
@ -331,7 +431,12 @@ def get_hashlist(file, pcb):
file_ofs = 0 file_ofs = 0
ret = [] ret = []
with open(file.abs, "rb", 512 * 1024) as f: with open(file.abs, "rb", 512 * 1024) as f:
if mth and file.size >= 1024 * 512:
ret = mth.hash(f, file.size, chunk_sz, pcb, file)
file_rem = 0
while file_rem > 0: while file_rem > 0:
# same as `hash_at` except for `imutex` / bufsz
hashobj = hashlib.sha512() hashobj = hashlib.sha512()
chunk_sz = chunk_rem = min(chunk_sz, file_rem) chunk_sz = chunk_rem = min(chunk_sz, file_rem)
while chunk_rem > 0: while chunk_rem > 0:
@ -399,7 +504,7 @@ def handshake(req_ses, url, file, pw, search):
raise Exception(r.text) raise Exception(r.text)
if search: if search:
return r["hits"] return r["hits"], False
try: try:
pre, url = url.split("://") pre, url = url.split("://")
@ -517,6 +622,8 @@ class Ctl(object):
self.st_hash = [None, "(idle, starting...)"] # type: tuple[File, int] self.st_hash = [None, "(idle, starting...)"] # type: tuple[File, int]
self.st_up = [None, "(idle, starting...)"] # type: tuple[File, int] self.st_up = [None, "(idle, starting...)"] # type: tuple[File, int]
self.mth = MTHash(ar.J) if ar.J > 1 else None
self._fancy() self._fancy()
def _safe(self): def _safe(self):
@ -527,7 +634,7 @@ class Ctl(object):
upath = file.abs.decode("utf-8", "replace") upath = file.abs.decode("utf-8", "replace")
print("{0} {1}\n hash...".format(self.nfiles - nf, upath)) print("{0} {1}\n hash...".format(self.nfiles - nf, upath))
get_hashlist(file, None) get_hashlist(file, None, None)
burl = self.ar.url[:12] + self.ar.url[8:].split("/")[0] + "/" burl = self.ar.url[:12] + self.ar.url[8:].split("/")[0] + "/"
while True: while True:
@ -680,7 +787,7 @@ class Ctl(object):
time.sleep(0.05) time.sleep(0.05)
get_hashlist(file, self.cb_hasher) get_hashlist(file, self.cb_hasher, self.mth)
with self.mutex: with self.mutex:
self.hash_f += 1 self.hash_f += 1
self.hash_c += len(file.cids) self.hash_c += len(file.cids)
@ -809,6 +916,9 @@ def main():
if not VT100: if not VT100:
os.system("rem") # enables colors os.system("rem") # enables colors
cores = os.cpu_count() if hasattr(os, "cpu_count") else 4
hcores = min(cores, 3) # 4% faster than 4+ on py3.9 @ r5-4500U
# fmt: off # fmt: off
ap = app = argparse.ArgumentParser(formatter_class=APF, epilog=""" ap = app = argparse.ArgumentParser(formatter_class=APF, epilog="""
NOTE: NOTE:
@ -824,6 +934,7 @@ source file/folder selection uses rsync syntax, meaning that:
ap.add_argument("--ok", action="store_true", help="continue even if some local files are inaccessible") ap.add_argument("--ok", action="store_true", help="continue even if some local files are inaccessible")
ap = app.add_argument_group("performance tweaks") ap = app.add_argument_group("performance tweaks")
ap.add_argument("-j", type=int, metavar="THREADS", default=4, help="parallel connections") ap.add_argument("-j", type=int, metavar="THREADS", default=4, help="parallel connections")
ap.add_argument("-J", type=int, metavar="THREADS", default=hcores, help="num cpu-cores to use for hashing; set 0 or 1 for single-core hashing")
ap.add_argument("-nh", action="store_true", help="disable hashing while uploading") ap.add_argument("-nh", action="store_true", help="disable hashing while uploading")
ap.add_argument("--safe", action="store_true", help="use simple fallback approach") ap.add_argument("--safe", action="store_true", help="use simple fallback approach")
ap.add_argument("-z", action="store_true", help="ZOOMIN' (skip uploading files if they exist at the destination with the ~same last-modified timestamp, so same as yolo / turbo with date-chk but even faster)") ap.add_argument("-z", action="store_true", help="ZOOMIN' (skip uploading files if they exist at the destination with the ~same last-modified timestamp, so same as yolo / turbo with date-chk but even faster)")

View File

@ -336,6 +336,7 @@ def run_argparse(argv: list[str], formatter: Any, retry: bool) -> argparse.Names
fk_salt = "hunter2" fk_salt = "hunter2"
cores = os.cpu_count() if hasattr(os, "cpu_count") else 4 cores = os.cpu_count() if hasattr(os, "cpu_count") else 4
hcores = min(cores, 3) # 4% faster than 4+ on py3.9 @ r5-4500U
sects = [ sects = [
[ [
@ -611,6 +612,7 @@ def run_argparse(argv: list[str], formatter: Any, retry: bool) -> argparse.Names
ap2.add_argument("--no-idx", metavar="PTN", type=u, help="regex: disable indexing of matching paths during e2ds folder scans") ap2.add_argument("--no-idx", metavar="PTN", type=u, help="regex: disable indexing of matching paths during e2ds folder scans")
ap2.add_argument("--xdev", action="store_true", help="do not descend into other filesystems (symlink or bind-mount to another HDD, ...)") ap2.add_argument("--xdev", action="store_true", help="do not descend into other filesystems (symlink or bind-mount to another HDD, ...)")
ap2.add_argument("--xvol", action="store_true", help="skip symlinks leaving the volume root") ap2.add_argument("--xvol", action="store_true", help="skip symlinks leaving the volume root")
ap2.add_argument("--hash-mt", metavar="CORES", type=int, default=hcores, help="num cpu cores to use for file hashing; set 0 or 1 for single-core hashing")
ap2.add_argument("--re-maxage", metavar="SEC", type=int, default=0, help="disk rescan volume interval, 0=off, can be set per-volume with the 'scan' volflag") ap2.add_argument("--re-maxage", metavar="SEC", type=int, default=0, help="disk rescan volume interval, 0=off, can be set per-volume with the 'scan' volflag")
ap2.add_argument("--db-act", metavar="SEC", type=float, default=10, help="defer any scheduled volume reindexing until SEC seconds after last db write (uploads, renames, ...)") ap2.add_argument("--db-act", metavar="SEC", type=float, default=10, help="defer any scheduled volume reindexing until SEC seconds after last db write (uploads, renames, ...)")
ap2.add_argument("--srch-time", metavar="SEC", type=int, default=30, help="search deadline -- terminate searches running for more than SEC seconds") ap2.add_argument("--srch-time", metavar="SEC", type=int, default=30, help="search deadline -- terminate searches running for more than SEC seconds")

View File

@ -28,6 +28,7 @@ from .mtag import MParser, MTag
from .util import ( from .util import (
HAVE_SQLITE3, HAVE_SQLITE3,
SYMTIME, SYMTIME,
MTHash,
Pebkac, Pebkac,
ProgressPrinter, ProgressPrinter,
absreal, absreal,
@ -155,6 +156,11 @@ class Up2k(object):
self.fstab = Fstab(self.log_func) self.fstab = Fstab(self.log_func)
if self.args.hash_mt < 2:
self.mth: Optional[MTHash] = None
else:
self.mth = MTHash(self.args.hash_mt)
if self.args.no_fastboot: if self.args.no_fastboot:
self.deferred_init() self.deferred_init()
@ -841,7 +847,7 @@ class Up2k(object):
self.pp.msg = "a{} {}".format(self.pp.n, abspath) self.pp.msg = "a{} {}".format(self.pp.n, abspath)
if nohash: if nohash or not sz:
wark = up2k_wark_from_metadata(self.salt, sz, lmod, rd, fn) wark = up2k_wark_from_metadata(self.salt, sz, lmod, rd, fn)
else: else:
if sz > 1024 * 1024: if sz > 1024 * 1024:
@ -1059,7 +1065,7 @@ class Up2k(object):
sz2 = st.st_size sz2 = st.st_size
mt2 = int(st.st_mtime) mt2 = int(st.st_mtime)
if nohash: if nohash or not sz2:
w2 = up2k_wark_from_metadata(self.salt, sz2, mt2, rd, fn) w2 = up2k_wark_from_metadata(self.salt, sz2, mt2, rd, fn)
else: else:
if sz2 > 1024 * 1024 * 32: if sz2 > 1024 * 1024 * 32:
@ -2625,14 +2631,21 @@ class Up2k(object):
fsz = bos.path.getsize(path) fsz = bos.path.getsize(path)
csz = up2k_chunksize(fsz) csz = up2k_chunksize(fsz)
ret = [] ret = []
suffix = " MB, {}".format(path)
with open(fsenc(path), "rb", 512 * 1024) as f: with open(fsenc(path), "rb", 512 * 1024) as f:
if self.mth and fsz >= 1024 * 512:
tlt = self.mth.hash(f, fsz, csz, self.pp, prefix, suffix)
ret = [x[0] for x in tlt]
fsz = 0
while fsz > 0: while fsz > 0:
# same as `hash_at` except for `imutex` / bufsz
if self.stop: if self.stop:
return [] return []
if self.pp: if self.pp:
mb = int(fsz / 1024 / 1024) mb = int(fsz / 1024 / 1024)
self.pp.msg = "{}{} MB, {}".format(prefix, mb, path) self.pp.msg = prefix + str(mb) + suffix
hashobj = hashlib.sha512() hashobj = hashlib.sha512()
rem = min(csz, fsz) rem = min(csz, fsz)
@ -2873,11 +2886,17 @@ class Up2k(object):
abspath = os.path.join(ptop, rd, fn) abspath = os.path.join(ptop, rd, fn)
self.log("hashing " + abspath) self.log("hashing " + abspath)
inf = bos.stat(abspath) inf = bos.stat(abspath)
hashes = self._hashlist_from_file(abspath) if not inf.st_size:
if not hashes: wark = up2k_wark_from_metadata(
return self.salt, inf.st_size, int(inf.st_mtime), rd, fn
)
else:
hashes = self._hashlist_from_file(abspath)
if not hashes:
return
wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes)
wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes)
with self.mutex: with self.mutex:
self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size, ip, at) self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size, ip, at)
@ -2893,6 +2912,9 @@ class Up2k(object):
def shutdown(self) -> None: def shutdown(self) -> None:
self.stop = True self.stop = True
if self.mth:
self.mth.stop = True
for x in list(self.spools): for x in list(self.spools):
self._unspool(x) self._unspool(x)

View File

@ -4,6 +4,7 @@ from __future__ import print_function, unicode_literals
import base64 import base64
import contextlib import contextlib
import hashlib import hashlib
import math
import mimetypes import mimetypes
import os import os
import platform import platform
@ -21,6 +22,8 @@ import traceback
from collections import Counter from collections import Counter
from datetime import datetime from datetime import datetime
from queue import Queue
from .__init__ import ANYWIN, PY2, TYPE_CHECKING, VT100, WINDOWS from .__init__ import ANYWIN, PY2, TYPE_CHECKING, VT100, WINDOWS
from .__version__ import S_BUILD_DT, S_VERSION from .__version__ import S_BUILD_DT, S_VERSION
from .stolen import surrogateescape from .stolen import surrogateescape
@ -494,6 +497,104 @@ class ProgressPrinter(threading.Thread):
sys.stdout.flush() # necessary on win10 even w/ stderr btw sys.stdout.flush() # necessary on win10 even w/ stderr btw
class MTHash(object):
def __init__(self, cores: int):
self.pp: Optional[ProgressPrinter] = None
self.f: Optional[typing.BinaryIO] = None
self.sz = 0
self.csz = 0
self.stop = False
self.omutex = threading.Lock()
self.imutex = threading.Lock()
self.work_q: Queue[int] = Queue()
self.done_q: Queue[tuple[int, str, int, int]] = Queue()
self.thrs = []
for _ in range(cores):
t = threading.Thread(target=self.worker)
t.daemon = True
t.start()
self.thrs.append(t)
def hash(
self,
f: typing.BinaryIO,
fsz: int,
chunksz: int,
pp: Optional[ProgressPrinter] = None,
prefix: str = "",
suffix: str = "",
) -> list[tuple[str, int, int]]:
with self.omutex:
self.f = f
self.sz = fsz
self.csz = chunksz
chunks: dict[int, tuple[str, int, int]] = {}
nchunks = int(math.ceil(fsz / chunksz))
for nch in range(nchunks):
self.work_q.put(nch)
ex = ""
for nch in range(nchunks):
qe = self.done_q.get()
try:
nch, dig, ofs, csz = qe
chunks[nch] = (dig, ofs, csz)
except:
ex = ex or str(qe)
if pp:
mb = int((fsz - nch * chunksz) / 1024 / 1024)
pp.msg = prefix + str(mb) + suffix
if ex:
raise Exception(ex)
ret = []
for n in range(nchunks):
ret.append(chunks[n])
self.f = None
self.csz = 0
self.sz = 0
return ret
def worker(self) -> None:
while True:
ofs = self.work_q.get()
try:
v = self.hash_at(ofs)
except Exception as ex:
v = str(ex) # type: ignore
self.done_q.put(v)
def hash_at(self, nch: int) -> tuple[int, str, int, int]:
f = self.f
ofs = ofs0 = nch * self.csz
chunk_sz = chunk_rem = min(self.csz, self.sz - ofs)
if self.stop:
return nch, "", ofs0, chunk_sz
assert f
hashobj = hashlib.sha512()
while chunk_rem > 0:
with self.imutex:
f.seek(ofs)
buf = f.read(min(chunk_rem, 1024 * 1024 * 12))
if not buf:
raise Exception("EOF at " + str(ofs))
hashobj.update(buf)
chunk_rem -= len(buf)
ofs += len(buf)
bdig = hashobj.digest()[:33]
udig = base64.urlsafe_b64encode(bdig).decode("utf-8")
return nch, udig, ofs0, chunk_sz
def uprint(msg: str) -> None: def uprint(msg: str) -> None:
try: try:
print(msg, end="") print(msg, end="")