From 5c93e040d21d9be130e9c971dc430f4d8f0697b0 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Fri, 26 Feb 2016 18:14:43 +0100 Subject: [PATCH] Mimic pluggability of other backends --- cliquet/__init__.py | 2 + cliquet/initialization.py | 47 ++++++++++++++++++----- cliquet/listeners/__init__.py | 27 ++++++------- cliquet/listeners/redis.py | 2 +- cliquet/tests/test_initialization.py | 22 +++++++++++ cliquet/tests/test_listeners.py | 16 ++++++-- cliquet/tests/test_workers.py | 14 ++++++- cliquet/workers/__init__.py | 20 ++++++++++ cliquet/{workers.py => workers/memory.py} | 12 ++++-- cliquet_docs/reference/configuration.rst | 24 ++++++++++++ 10 files changed, 149 insertions(+), 37 deletions(-) create mode 100644 cliquet/workers/__init__.py rename cliquet/{workers.py => workers/memory.py} (92%) diff --git a/cliquet/__init__.py b/cliquet/__init__.py index b7d63c1f..64a41b03 100644 --- a/cliquet/__init__.py +++ b/cliquet/__init__.py @@ -28,6 +28,8 @@ DEFAULT_SETTINGS = { 'backoff': None, + 'background.workers': 'cliquet.workers.memory', + 'background.processes': 1, 'batch_max_requests': 25, 'cache_backend': '', 'cache_url': '', diff --git a/cliquet/initialization.py b/cliquet/initialization.py index 9d9e13d4..33fbef70 100644 --- a/cliquet/initialization.py +++ b/cliquet/initialization.py @@ -18,14 +18,14 @@ import cliquet from cliquet import errors +from cliquet import events from cliquet import utils from cliquet import statsd from cliquet import cache from cliquet import storage from cliquet import permission +from cliquet import workers from cliquet.logs import logger -from cliquet.events import ResourceRead, ResourceChanged, ACTIONS -from cliquet.workers import get_memory_workers from pyramid.events import NewRequest, NewResponse from pyramid.exceptions import ConfigurationError @@ -362,7 +362,7 @@ def on_new_response(event): class EventActionFilter(object): def __init__(self, actions, config): - actions = ACTIONS.from_string_list(actions) + actions = events.ACTIONS.from_string_list(actions) self.actions = [action.value for action in actions] def phash(self): @@ -390,7 +390,9 @@ def setup_listeners(config): config.add_subscriber_predicate('for_actions', EventActionFilter) config.add_subscriber_predicate('for_resources', EventResourceFilter) - write_actions = (ACTIONS.CREATE, ACTIONS.UPDATE, ACTIONS.DELETE) + write_actions = (events.ACTIONS.CREATE, + events.ACTIONS.UPDATE, + events.ACTIONS.DELETE) settings = config.get_settings() listeners = aslist(settings['event_listeners']) @@ -412,27 +414,52 @@ def setup_listeners(config): key = 'listeners.%s' % name listener = statsd_client.timer(key)(listener.__call__) + # is_async = asbool(settings.get(prefix + 'async', 'false')) + # if is_async and hasattr(config.registry, 'workers'): + # # Wrap the listener callback to use background workers. + # def async_listener(event): + # config.registry.workers.apply_async('event', + # listener, + # (event,), + # listener.done) + + # listener = async_listener + + # Default actions are write actions only. actions = aslist(settings.get(prefix + 'actions', '')) if len(actions) > 0: - actions = ACTIONS.from_string_list(actions) + actions = events.ACTIONS.from_string_list(actions) else: actions = write_actions + # By default, it listens to every resources. resource_names = aslist(settings.get(prefix + 'resources', '')) options = dict(for_actions=actions, for_resources=resource_names) - if ACTIONS.READ in actions: - config.add_subscriber(listener, ResourceRead, **options) + # If read action is specified, subscribe to read event. + if events.ACTIONS.READ in actions: + event_cls = events.ResourceRead + config.add_subscriber(listener, event_cls, **options) if len(actions) == 1: return - config.add_subscriber(listener, ResourceChanged, **options) + # If write action is specified, subscribe to changed event. + event_cls = events.ResourceChanged + config.add_subscriber(listener, event_cls, **options) def setup_workers(config): settings = config.get_settings() - num_workers = int(settings.get('background.processes', 1)) - config.registry.workers = get_memory_workers(num_workers) + + workers_mod = settings['background.workers'] + workers_mod = config.maybe_dotted(workers_mod) + backend = workers_mod.load_from_config(config) + if not isinstance(backend, workers.WorkersBase): + raise ConfigurationError("Invalid workers backend: %s" % backend) + config.registry.workers = backend + + heartbeat = workers.heartbeat(backend) + config.registry.heartbeats['workers'] = heartbeat def load_default_settings(config, default_settings): diff --git a/cliquet/listeners/__init__.py b/cliquet/listeners/__init__.py index 779df4e3..28cda676 100644 --- a/cliquet/listeners/__init__.py +++ b/cliquet/listeners/__init__.py @@ -1,24 +1,19 @@ -from pyramid.threadlocal import get_current_registry +from cliquet import logger class ListenerBase(object): - def _done(self, name, res_id, success, result): + def __init__(self, *args, **kwargs): pass - def _async_run(self, event): - workers = get_current_registry().workers - workers.apply_async('event', self._run, (event,), self._done) - - def _run(self, event): - raise NotImplementedError() - - def __call__(self, event, async=True): + def __call__(self, event): """ :param event: Incoming event - :param async: Run asynchronously, default: True """ - if async: - return self._async_run(event) - else: - # not used yet - return self._run(event) # pragma: no cover + raise NotImplementedError() + + def done(self, name, res_id, success, result): + logger.info("Async listener done.", + name=name, + result_id=res_id, + success=success, + result=result) diff --git a/cliquet/listeners/redis.py b/cliquet/listeners/redis.py index 0f8eced1..402f2cd3 100644 --- a/cliquet/listeners/redis.py +++ b/cliquet/listeners/redis.py @@ -19,7 +19,7 @@ def __init__(self, client, listname, *args, **kwargs): self._client = client self.listname = listname - def _run(self, event): # pragma: no cover + def __call__(self, event): try: payload = json.dumps(event.payload) except TypeError: diff --git a/cliquet/tests/test_initialization.py b/cliquet/tests/test_initialization.py index 6fa4c6eb..6216dfec 100644 --- a/cliquet/tests/test_initialization.py +++ b/cliquet/tests/test_initialization.py @@ -465,3 +465,25 @@ def test_plugin_benefits_from_cors_setup(self): } resp = app.options('/v0/attachment', headers=headers, status=200) self.assertIn('Access-Control-Allow-Origin', resp.headers) + + +class WorkersSetupTest(unittest.TestCase): + + def test_wrong_class(self): + settings = { + 'background.workers': 'cliquet.storage.redis', + 'storage_url': '', + 'storage_pool_size': 1 + } + config = Configurator(settings=settings) + with self.assertRaises(ConfigurationError): + initialization.setup_workers(config) + + def test_registers_heartbeat(self): + settings = { + 'background.workers': 'cliquet.workers.memory', + } + config = Configurator(settings=settings) + config.registry.heartbeats = {} + initialization.setup_workers(config) + self.assertIsNotNone(config.registry.heartbeats.get('workers')) diff --git a/cliquet/tests/test_listeners.py b/cliquet/tests/test_listeners.py index 99acf7bc..d3fc3257 100644 --- a/cliquet/tests/test_listeners.py +++ b/cliquet/tests/test_listeners.py @@ -28,7 +28,6 @@ def make_app(self, extra_settings={}): settings.update(**extra_settings) config = testing.setUp(settings=settings) config.commit() - initialization.setup_workers(config) initialization.setup_listeners(config) return config @@ -151,8 +150,11 @@ class ListenerCalledTest(unittest.TestCase): def setUp(self): self.config = testing.setUp() - self.config.add_settings({'events_pool_size': 1, - 'events_url': 'redis://localhost:6379/0'}) + self.config.registry.heartbeats = {} + self.config.add_settings({ + 'events_pool_size': 1, + 'events_url': 'redis://localhost:6379/0', + 'background.workers': 'cliquet.workers.memory'}) initialization.setup_workers(self.config) self.config.commit() self._redis = create_from_config(self.config, prefix='events_') @@ -224,4 +226,10 @@ class ListenerBaseTest(unittest.TestCase): def test_not_implemented(self): # make sure we can't use the base listener listener = ListenerBase() - self.assertRaises(NotImplementedError, listener._run, object()) + self.assertRaises(NotImplementedError, listener, object()) + + def test_done_logs_by_default(self): + listener = ListenerBase() + with mock.patch('cliquet.logger.info') as mocked: + listener.done(name='event', res_id=123, success=True, result=None) + self.assertTrue(mocked.called) diff --git a/cliquet/tests/test_workers.py b/cliquet/tests/test_workers.py index d7033445..8b3b68b0 100644 --- a/cliquet/tests/test_workers.py +++ b/cliquet/tests/test_workers.py @@ -1,15 +1,25 @@ import unittest import time -from cliquet.workers import MemoryWorkers + +from cliquet.workers import WorkersBase +from cliquet.workers.memory import Workers def boom(): raise Exception('ok') +class WorkersBaseTest(unittest.TestCase): + + def test_not_implemented(self): + # make sure we can't use the base listener + workers = WorkersBase() + self.assertRaises(NotImplementedError, workers.apply_async, '', None) + + class TestMemoryWorkers(unittest.TestCase): def setUp(self): - self.workers = MemoryWorkers(size=1) + self.workers = Workers(size=1) def tearDown(self): self.workers.close() diff --git a/cliquet/workers/__init__.py b/cliquet/workers/__init__.py new file mode 100644 index 00000000..20dfaee7 --- /dev/null +++ b/cliquet/workers/__init__.py @@ -0,0 +1,20 @@ +class WorkersBase(object): + """Background workers abstraction used by event listeners. + """ + def apply_async(self, name, func, args=None, callback=None): + """Run the specified `func` in background with the specified `args` + and calls `callback` with the result. + + :param str name: an arbitrary identifier + :param func func: a function to run in background + :param tuple args: a list of parameters to provide to `func` + :param func callback: the function to be called when task is done. + """ + raise NotImplementedError() + + +def heartbeat(backend): + """No-op heartbeat check for workers. + XXX: find out a way to provide heartbeat feature. + """ + return lambda r: True diff --git a/cliquet/workers.py b/cliquet/workers/memory.py similarity index 92% rename from cliquet/workers.py rename to cliquet/workers/memory.py index 0f191620..0c8c00f4 100644 --- a/cliquet/workers.py +++ b/cliquet/workers/memory.py @@ -6,6 +6,8 @@ from functools import partial from collections import defaultdict, OrderedDict +from . import WorkersBase + try: # pragma: no cover # until dill works with pypy, let's use plain Pickle. # https://github.com/uqfoundation/dill/issues/73 @@ -43,7 +45,7 @@ def _run(dumped): # pragma: no cover return True, result -class MemoryWorkers(object): +class Workers(WorkersBase): def __init__(self, size=1, result_size_limit=100): self.closed = True self.initialize(size, result_size_limit) @@ -113,13 +115,15 @@ def apply_async(self, name, func, args=None, callback=None): _WORKERS_PER_PROCESS = {} -def get_memory_workers(size=1): +def load_from_config(config): + settings = config.get_settings() + num_workers = int(settings.get('background.processes', 1)) pid = os.getpid() if pid in _WORKERS_PER_PROCESS: workers = _WORKERS_PER_PROCESS[pid] if workers.closed: - workers.initialize(size) + workers.initialize(num_workers) else: - _WORKERS_PER_PROCESS[pid] = workers = MemoryWorkers(size) + _WORKERS_PER_PROCESS[pid] = workers = Workers(num_workers) return workers diff --git a/cliquet_docs/reference/configuration.rst b/cliquet_docs/reference/configuration.rst index 4c8130e3..de319c63 100644 --- a/cliquet_docs/reference/configuration.rst +++ b/cliquet_docs/reference/configuration.rst @@ -312,6 +312,30 @@ for every resources. cliquet.event_listeners.redis.resources = article comment +Asynchronous listeners +:::::::::::::::::::::: + +By default, listeners are executed synchronously. This means that their +execution is blocking the request/response cycle. + +But listeners can be also be executed asynchronously by setting ``async = true``. + +.. code-block:: ini + + cliquet.event_listeners.redis.async = true + +.. note:: + + By *Cliquet* uses a very simple model for background tasks: it relies on + child processes and shared memory. This is convenient for small purposes but + has some limitations. For example, background tasks are lost when the Web process + restarts. + + Extending Cliquet to support solid and distributed asynchronous job queues + like `Celery `_ or `Python rq `_ + is trivial. Get in touch with us! + + Cache =====