serialize exceptions across mp
This commit is contained in:
parent
68943ca454
commit
19d6487eaf
@ -7,6 +7,7 @@ import threading
|
||||
from .__init__ import PY2, WINDOWS
|
||||
from .broker_mpw import MpWorker
|
||||
from .util import mp
|
||||
from .broker_util import *
|
||||
|
||||
|
||||
if PY2 and not WINDOWS:
|
||||
@ -109,7 +110,7 @@ class BrokerMp(object):
|
||||
obj = getattr(obj, node)
|
||||
|
||||
# TODO will deadlock if dest performs another ipc
|
||||
rv = obj(*args)
|
||||
rv = try_exec(obj, *args, want_retval=retq_id)
|
||||
|
||||
if retq_id:
|
||||
proc.q_pend.put([retq_id, "retq", rv])
|
||||
|
@ -8,7 +8,7 @@ import threading
|
||||
|
||||
from .__init__ import PY2, WINDOWS
|
||||
from .httpsrv import HttpSrv
|
||||
from .util import Queue
|
||||
from .broker_util import *
|
||||
|
||||
if PY2 and not WINDOWS:
|
||||
import pickle # nosec
|
||||
@ -93,7 +93,7 @@ class MpWorker(object):
|
||||
|
||||
def put(self, want_retval, dest, *args):
|
||||
if want_retval:
|
||||
retq = Queue(1)
|
||||
retq = ExceptionalQueue(1)
|
||||
retq_id = id(retq)
|
||||
with self.retpend_mutex:
|
||||
self.retpend[retq_id] = retq
|
||||
|
@ -4,8 +4,8 @@ from __future__ import print_function, unicode_literals
|
||||
|
||||
import threading
|
||||
|
||||
from .util import Queue
|
||||
from .httpsrv import HttpSrv
|
||||
from .broker_util import *
|
||||
|
||||
|
||||
class BrokerThr(object):
|
||||
@ -39,13 +39,12 @@ class BrokerThr(object):
|
||||
obj = getattr(obj, node)
|
||||
|
||||
# TODO will deadlock if dest performs another ipc
|
||||
rv = obj(*args)
|
||||
|
||||
rv = try_exec(obj, *args, want_retval=want_retval)
|
||||
if not want_retval:
|
||||
return
|
||||
|
||||
# pretend we're broker_mp
|
||||
retq = Queue(1)
|
||||
retq = ExceptionalQueue(1)
|
||||
retq.put(rv)
|
||||
return retq
|
||||
|
||||
|
45
copyparty/broker_util.py
Normal file
45
copyparty/broker_util.py
Normal file
@ -0,0 +1,45 @@
|
||||
# coding: utf-8
|
||||
from __future__ import print_function, unicode_literals
|
||||
|
||||
|
||||
import traceback
|
||||
|
||||
from .__init__ import PY2
|
||||
from .util import Pebkac
|
||||
|
||||
if not PY2:
|
||||
from queue import Queue
|
||||
else:
|
||||
from Queue import Queue # pylint: disable=import-error,no-name-in-module
|
||||
|
||||
|
||||
class ExceptionalQueue(Queue):
|
||||
def get(self, block=True, timeout=None):
|
||||
rv = super(ExceptionalQueue, self).get(block, timeout)
|
||||
|
||||
# TODO: how expensive is this?
|
||||
if isinstance(rv, list):
|
||||
if rv[0] == "exception":
|
||||
if rv[1] == "pebkac":
|
||||
raise Pebkac(*rv[2:])
|
||||
else:
|
||||
raise Exception(rv[2])
|
||||
|
||||
return rv
|
||||
|
||||
|
||||
def try_exec(func, *args, want_retval=False):
|
||||
try:
|
||||
return func(*args)
|
||||
|
||||
except Pebkac as ex:
|
||||
if not want_retval:
|
||||
raise
|
||||
|
||||
return ["exception", "pebkac", ex.code, str(ex)]
|
||||
|
||||
except:
|
||||
if not want_retval:
|
||||
raise
|
||||
|
||||
return ["exception", "stack", traceback.format_exc()]
|
Loading…
Reference in New Issue
Block a user