From ac80735ac8654382887b540aabbcc45da3e066f8 Mon Sep 17 00:00:00 2001 From: Matias Guijarro Date: Wed, 13 Jul 2016 15:12:41 +0200 Subject: [PATCH 1/2] Contribute a new gevent-compliant event loop (see issue #112) The gevent-compliant event loop is activated by passing `gevent=True` to the `embed` method of `repl` module. The event loop code is heavily inspired from the prompt-toolkit POSIX event loop, adapted to gevent. More details in file `contrib/gevent_eventloop.py` --- ptpython/contrib/gevent_eventloop.py | 88 ++++++++++++++++++++++++++++ ptpython/repl.py | 6 +- 2 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 ptpython/contrib/gevent_eventloop.py diff --git a/ptpython/contrib/gevent_eventloop.py b/ptpython/contrib/gevent_eventloop.py new file mode 100644 index 00000000..793d2f62 --- /dev/null +++ b/ptpython/contrib/gevent_eventloop.py @@ -0,0 +1,88 @@ +from prompt_toolkit.eventloop.base import EventLoop, INPUT_TIMEOUT +from prompt_toolkit.terminal.vt100_input import InputStream +from prompt_toolkit.eventloop.posix_utils import PosixStdinReader +from prompt_toolkit.eventloop.posix import call_on_sigwinch, DummyContext +from gevent import select + +class GeventEventLoop(EventLoop): + def __init__(self, *args, **kwargs): + super(EventLoop, self).__init__() + self.readers = dict() + self._running = True + + def run(self, stdin, callbacks): + inputstream = InputStream(callbacks.feed_key) + stdin_reader = PosixStdinReader(stdin.fileno()) + #ctx = call_on_sigwinch(self.received_winch) + ctx = DummyContext() + + with ctx: + while self._running: + r, _, _ = select.select([stdin.fileno()], [], [], INPUT_TIMEOUT) + if r: + data = stdin_reader.read() + inputstream.feed(data) + if stdin_reader.closed: + break + else: + # timeout + inputstream.flush() + callbacks.input_timeout() + continue + + def received_winch(self): + pass + + def stop(self): + """ + Stop the `run` call. (Normally called by + :class:`~prompt_toolkit.interface.CommandLineInterface`, when a result + is available, or Abort/Quit has been called.) + """ + self._running = False + + def close(self): + """ + Clean up of resources. Eventloop cannot be reused a second time after + this call. + """ + self.stop() + self.readers = dict() + + def add_reader(self, fd, callback): + """ + Start watching the file descriptor for read availability and then call + the callback. + """ + self.readers[fd] = gevent.get_hub().loop.io(fd, 1) + self.readers[fd].start(callback) + + def remove_reader(self, fd): + """ + Stop watching the file descriptor for read availability. + """ + try: + del self.readers[fd] + except KeyError: + pass + + def run_in_executor(self, callback): + """ + Run a long running function in a background thread. (This is + recommended for code that could block the event loop.) + Similar to Twisted's ``deferToThread``. + """ + gevent.spawn(callback) + + def call_from_executor(self, callback, _max_postpone_until=None): + """ + Call this function in the main event loop. Similar to Twisted's + ``callFromThread``. + + :param _max_postpone_until: `None` or `datetime` instance. For interal + use. If the eventloop is saturated, consider this task to be low + priority and postpone maximum until this timestamp. (For instance, + repaint is done using low priority.) + """ + gevent.spawn(callback) + diff --git a/ptpython/repl.py b/ptpython/repl.py index f2cd983d..336658be 100644 --- a/ptpython/repl.py +++ b/ptpython/repl.py @@ -250,7 +250,8 @@ def enter_to_continue(): def embed(globals=None, locals=None, configure=None, vi_mode=False, history_filename=None, title=None, - startup_paths=None, patch_stdout=False, return_asyncio_coroutine=False): + startup_paths=None, patch_stdout=False, return_asyncio_coroutine=False, + gevent=False): """ Call this to embed Python shell at the current point in your program. It's similar to `IPython.embed` and `bpython.embed`. :: @@ -285,6 +286,9 @@ def get_locals(): # Create eventloop. if return_asyncio_coroutine: eventloop = create_asyncio_eventloop() + elif gevent: + from .contrib.gevent_eventloop import GeventEventLoop + eventloop = GeventEventLoop() else: eventloop = create_eventloop() From d6da30425d630d82d7433e04694fe3ccfd65019b Mon Sep 17 00:00:00 2001 From: Matias Guijarro Date: Sun, 16 Apr 2017 17:15:04 +0200 Subject: [PATCH 2/2] fixes, including sigwinch support and `call_from_executor` --- ptpython/contrib/gevent_eventloop.py | 85 ++++++++++++++++++++++------ 1 file changed, 69 insertions(+), 16 deletions(-) diff --git a/ptpython/contrib/gevent_eventloop.py b/ptpython/contrib/gevent_eventloop.py index 793d2f62..e56de2a7 100644 --- a/ptpython/contrib/gevent_eventloop.py +++ b/ptpython/contrib/gevent_eventloop.py @@ -1,37 +1,69 @@ from prompt_toolkit.eventloop.base import EventLoop, INPUT_TIMEOUT from prompt_toolkit.terminal.vt100_input import InputStream from prompt_toolkit.eventloop.posix_utils import PosixStdinReader -from prompt_toolkit.eventloop.posix import call_on_sigwinch, DummyContext +from prompt_toolkit.eventloop.posix import call_on_sigwinch, DummyContext, in_main_thread +from prompt_toolkit.eventloop.select import fd_to_int from gevent import select +import time +import os class GeventEventLoop(EventLoop): def __init__(self, *args, **kwargs): super(EventLoop, self).__init__() self.readers = dict() self._running = True - + self._schedule_pipe_read,self._schedule_pipe_write = os.pipe() + self._calls_from_executor = list() + self._callbacks = None + self._winch_callback_done = True + def run(self, stdin, callbacks): inputstream = InputStream(callbacks.feed_key) stdin_reader = PosixStdinReader(stdin.fileno()) - #ctx = call_on_sigwinch(self.received_winch) - ctx = DummyContext() + self._callbacks = callbacks + + if in_main_thread(): + ctx = call_on_sigwinch(self.received_winch) + else: + ctx = DummyContext() + select_timeout = INPUT_TIMEOUT with ctx: while self._running: - r, _, _ = select.select([stdin.fileno()], [], [], INPUT_TIMEOUT) - if r: + r, _, _ = select.select([stdin.fileno(),self._schedule_pipe_read], + [], [],select_timeout) + if stdin.fileno() in r: + select_timeout = INPUT_TIMEOUT data = stdin_reader.read() inputstream.feed(data) if stdin_reader.closed: break + elif self._schedule_pipe_read in r: + os.read(self._schedule_pipe_read,8192) + while True: + try: + task = self._calls_from_executor.pop(0) + except IndexError: + break + else: + task() else: # timeout inputstream.flush() callbacks.input_timeout() - continue + select_timeout = None + + self._callbacks = None def received_winch(self): - pass + def process_winch(): + if self._callbacks: + self._callbacks.terminal_size_changed() + self._winch_callback_done = True + + if self._winch_callback_done: + self._winch_callback_done = False + self.call_from_executor(process_winch) def stop(self): """ @@ -40,6 +72,10 @@ def stop(self): is available, or Abort/Quit has been called.) """ self._running = False + try: + os.write(self._schedule_pipe_write,'x') + except (AttributeError, IndexError, OSError): + pass def close(self): """ @@ -47,13 +83,17 @@ def close(self): this call. """ self.stop() + for reader in self.readers.values(): + reader.kill() self.readers = dict() + self._callbacks = None def add_reader(self, fd, callback): """ Start watching the file descriptor for read availability and then call the callback. """ + fd = fd_to_int(fd) self.readers[fd] = gevent.get_hub().loop.io(fd, 1) self.readers[fd].start(callback) @@ -61,18 +101,18 @@ def remove_reader(self, fd): """ Stop watching the file descriptor for read availability. """ - try: - del self.readers[fd] - except KeyError: - pass - + fd = fd_to_int(fd) + task = self.readers.pop(fd,None) + if task is not None: + task.kill() + def run_in_executor(self, callback): """ Run a long running function in a background thread. (This is recommended for code that could block the event loop.) Similar to Twisted's ``deferToThread``. """ - gevent.spawn(callback) + self.call_from_executor(callback) def call_from_executor(self, callback, _max_postpone_until=None): """ @@ -84,5 +124,18 @@ def call_from_executor(self, callback, _max_postpone_until=None): priority and postpone maximum until this timestamp. (For instance, repaint is done using low priority.) """ - gevent.spawn(callback) - + if _max_postpone_until is None: + def start_executor(): + gevent.spawn(callback) + self._calls_from_executor.append(start_executor) + else: + def postpone(): + sleep_time = _max_postpone_until - time.time() + if sleep_time > 0: + gevent.sleep(sleep_time) + callback() + self._calls_from_executor.append(postpone) + try: + os.write(self._schedule_pipe_write,'x') + except (AttributeError, IndexError, OSError): + pass