diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index 5b73f3d5..fe74eaf2 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -7,6 +7,7 @@ import threading from .__init__ import PY2, WINDOWS from .broker_mpw import MpWorker from .util import mp +from .broker_util import * if PY2 and not WINDOWS: @@ -109,7 +110,7 @@ class BrokerMp(object): obj = getattr(obj, node) # TODO will deadlock if dest performs another ipc - rv = obj(*args) + rv = try_exec(obj, *args, want_retval=retq_id) if retq_id: proc.q_pend.put([retq_id, "retq", rv]) diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py index 821b65e4..7aeee01c 100644 --- a/copyparty/broker_mpw.py +++ b/copyparty/broker_mpw.py @@ -8,7 +8,7 @@ import threading from .__init__ import PY2, WINDOWS from .httpsrv import HttpSrv -from .util import Queue +from .broker_util import * if PY2 and not WINDOWS: import pickle # nosec @@ -93,7 +93,7 @@ class MpWorker(object): def put(self, want_retval, dest, *args): if want_retval: - retq = Queue(1) + retq = ExceptionalQueue(1) retq_id = id(retq) with self.retpend_mutex: self.retpend[retq_id] = retq diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py index 665ed5cb..5d441799 100644 --- a/copyparty/broker_thr.py +++ b/copyparty/broker_thr.py @@ -4,8 +4,8 @@ from __future__ import print_function, unicode_literals import threading -from .util import Queue from .httpsrv import HttpSrv +from .broker_util import * class BrokerThr(object): @@ -39,13 +39,12 @@ class BrokerThr(object): obj = getattr(obj, node) # TODO will deadlock if dest performs another ipc - rv = obj(*args) - + rv = try_exec(obj, *args, want_retval=want_retval) if not want_retval: return # pretend we're broker_mp - retq = Queue(1) + retq = ExceptionalQueue(1) retq.put(rv) return retq diff --git a/copyparty/broker_util.py b/copyparty/broker_util.py new file mode 100644 index 00000000..4a36bffc --- /dev/null +++ b/copyparty/broker_util.py @@ -0,0 +1,45 @@ +# coding: utf-8 +from __future__ import print_function, unicode_literals + + +import traceback + +from .__init__ import PY2 +from .util import Pebkac + +if not PY2: + from queue import Queue +else: + from Queue import Queue # pylint: disable=import-error,no-name-in-module + + +class ExceptionalQueue(Queue): + def get(self, block=True, timeout=None): + rv = super(ExceptionalQueue, self).get(block, timeout) + + # TODO: how expensive is this? + if isinstance(rv, list): + if rv[0] == "exception": + if rv[1] == "pebkac": + raise Pebkac(*rv[2:]) + else: + raise Exception(rv[2]) + + return rv + + +def try_exec(func, *args, want_retval=False): + try: + return func(*args) + + except Pebkac as ex: + if not want_retval: + raise + + return ["exception", "pebkac", ex.code, str(ex)] + + except: + if not want_retval: + raise + + return ["exception", "stack", traceback.format_exc()]