Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Implementation for signals=False from queue to pipe #53

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 26 additions & 33 deletions timeout_decorator/timeout_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,24 @@ def new_function(*args, **kwargs):
return decorate


def _target(queue, function, *args, **kwargs):
def _target(child_conn, function, *args, **kwargs):
"""Run a function with arguments and return output via a queue.

This is a helper function for the Process created in _Timeout. It runs
the function with positional arguments and keyword arguments and then
returns the function's output by way of a queue. If an exception gets
raised, it is returned to _Timeout to be raised by the value property.
"""
try:
queue.put((True, function(*args, **kwargs)))
child_conn.send((True, function(*args, **kwargs)))
except:
queue.put((False, sys.exc_info()[1]))
child_conn.send((False, sys.exc_info()[1]))
finally:
child_conn.close()


class _Timeout(object):

"""Wrap a function and add a timeout (limit) attribute to it.

Instances of this class are automatically generated by the add_timeout
function defined above. Wrapping a function allows asynchronous calls
to be made and termination of execution after a timeout has passed.
Expand All @@ -125,49 +125,42 @@ def __init__(self, function, timeout_exception, exception_message, limit):
self.__exception_message = exception_message
self.__name__ = function.__name__
self.__doc__ = function.__doc__
self.__timeout = time.time()
self.__process = multiprocessing.Process()
self.__queue = multiprocessing.Queue()
self.__process = None
self.__parent_conn = None
self.__child_conn = None

def __call__(self, *args, **kwargs):
"""Execute the embedded function object asynchronously.

The function given to the constructor is transparently called and
requires that "ready" be intermittently polled. If and when it is
True, the "value" property may then be checked for returned data.
"""
self.__limit = kwargs.pop('timeout', self.__limit)
self.__queue = multiprocessing.Queue(1)
args = (self.__queue, self.__function) + args
self.__process = multiprocessing.Process(target=_target,
args=args,
kwargs=kwargs)
self.__parent_conn, self.__child_conn = multiprocessing.Pipe(duplex=False)

args = (self.__child_conn, self.__function) + args
self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
self.__timeout = self.__limit + time.time()
while not self.ready:
time.sleep(0.01)
return self.value
if self.__parent_conn.poll(self.__limit):
return self.value
else:
self.cancel()

def cancel(self):
"""Terminate any possible execution of the embedded function."""
self.__parent_conn.close()
if self.__process.is_alive():
self.__process.terminate()

_raise_exception(self.__timeout_exception, self.__exception_message)

@property
def ready(self):
"""Read-only property indicating status of "value" property."""
if self.__timeout < time.time():
self.cancel()
return self.__queue.full() and not self.__queue.empty()

@property
def value(self):
"""Read-only property containing data returned from function."""
if self.ready is True:
flag, load = self.__queue.get()
if flag:
return load
raise load
flag, load = self.__parent_conn.recv()
self.__parent_conn.close()
# when self.__parent_conn.recv() exits, maybe __process is still alive,
# then it might zombie the process. so join it explicitly
self.__process.join(1)

if flag:
return load
raise load