Skip to content
This repository has been archived by the owner on Mar 28, 2019. It is now read-only.

Commit

Permalink
Mimic pluggability of other backends
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem committed Feb 29, 2016
1 parent 8d1fb13 commit 5c93e04
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 37 deletions.
2 changes: 2 additions & 0 deletions cliquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

DEFAULT_SETTINGS = {
'backoff': None,
'background.workers': 'cliquet.workers.memory',
'background.processes': 1,
'batch_max_requests': 25,
'cache_backend': '',
'cache_url': '',
Expand Down
47 changes: 37 additions & 10 deletions cliquet/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import cliquet
from cliquet import errors
from cliquet import events
from cliquet import utils
from cliquet import statsd
from cliquet import cache
from cliquet import storage
from cliquet import permission
from cliquet import workers
from cliquet.logs import logger
from cliquet.events import ResourceRead, ResourceChanged, ACTIONS
from cliquet.workers import get_memory_workers

from pyramid.events import NewRequest, NewResponse
from pyramid.exceptions import ConfigurationError
Expand Down Expand Up @@ -362,7 +362,7 @@ def on_new_response(event):

class EventActionFilter(object):
def __init__(self, actions, config):
actions = ACTIONS.from_string_list(actions)
actions = events.ACTIONS.from_string_list(actions)
self.actions = [action.value for action in actions]

def phash(self):
Expand Down Expand Up @@ -390,7 +390,9 @@ def setup_listeners(config):
config.add_subscriber_predicate('for_actions', EventActionFilter)
config.add_subscriber_predicate('for_resources', EventResourceFilter)

write_actions = (ACTIONS.CREATE, ACTIONS.UPDATE, ACTIONS.DELETE)
write_actions = (events.ACTIONS.CREATE,
events.ACTIONS.UPDATE,
events.ACTIONS.DELETE)
settings = config.get_settings()
listeners = aslist(settings['event_listeners'])

Expand All @@ -412,27 +414,52 @@ def setup_listeners(config):
key = 'listeners.%s' % name
listener = statsd_client.timer(key)(listener.__call__)

# is_async = asbool(settings.get(prefix + 'async', 'false'))
# if is_async and hasattr(config.registry, 'workers'):
# # Wrap the listener callback to use background workers.
# def async_listener(event):
# config.registry.workers.apply_async('event',
# listener,
# (event,),
# listener.done)

# listener = async_listener

# Default actions are write actions only.
actions = aslist(settings.get(prefix + 'actions', ''))
if len(actions) > 0:
actions = ACTIONS.from_string_list(actions)
actions = events.ACTIONS.from_string_list(actions)
else:
actions = write_actions

# By default, it listens to every resources.
resource_names = aslist(settings.get(prefix + 'resources', ''))
options = dict(for_actions=actions, for_resources=resource_names)

if ACTIONS.READ in actions:
config.add_subscriber(listener, ResourceRead, **options)
# If read action is specified, subscribe to read event.
if events.ACTIONS.READ in actions:
event_cls = events.ResourceRead
config.add_subscriber(listener, event_cls, **options)
if len(actions) == 1:
return

config.add_subscriber(listener, ResourceChanged, **options)
# If write action is specified, subscribe to changed event.
event_cls = events.ResourceChanged
config.add_subscriber(listener, event_cls, **options)


def setup_workers(config):
settings = config.get_settings()
num_workers = int(settings.get('background.processes', 1))
config.registry.workers = get_memory_workers(num_workers)

workers_mod = settings['background.workers']
workers_mod = config.maybe_dotted(workers_mod)
backend = workers_mod.load_from_config(config)
if not isinstance(backend, workers.WorkersBase):
raise ConfigurationError("Invalid workers backend: %s" % backend)
config.registry.workers = backend

heartbeat = workers.heartbeat(backend)
config.registry.heartbeats['workers'] = heartbeat


def load_default_settings(config, default_settings):
Expand Down
27 changes: 11 additions & 16 deletions cliquet/listeners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
from pyramid.threadlocal import get_current_registry
from cliquet import logger


class ListenerBase(object):
def _done(self, name, res_id, success, result):
def __init__(self, *args, **kwargs):
pass

def _async_run(self, event):
workers = get_current_registry().workers
workers.apply_async('event', self._run, (event,), self._done)

def _run(self, event):
raise NotImplementedError()

def __call__(self, event, async=True):
def __call__(self, event):
"""
:param event: Incoming event
:param async: Run asynchronously, default: True
"""
if async:
return self._async_run(event)
else:
# not used yet
return self._run(event) # pragma: no cover
raise NotImplementedError()

def done(self, name, res_id, success, result):
logger.info("Async listener done.",
name=name,
result_id=res_id,
success=success,
result=result)
2 changes: 1 addition & 1 deletion cliquet/listeners/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, client, listname, *args, **kwargs):
self._client = client
self.listname = listname

def _run(self, event): # pragma: no cover
def __call__(self, event):
try:
payload = json.dumps(event.payload)
except TypeError:
Expand Down
22 changes: 22 additions & 0 deletions cliquet/tests/test_initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,25 @@ def test_plugin_benefits_from_cors_setup(self):
}
resp = app.options('/v0/attachment', headers=headers, status=200)
self.assertIn('Access-Control-Allow-Origin', resp.headers)


class WorkersSetupTest(unittest.TestCase):

def test_wrong_class(self):
settings = {
'background.workers': 'cliquet.storage.redis',
'storage_url': '',
'storage_pool_size': 1
}
config = Configurator(settings=settings)
with self.assertRaises(ConfigurationError):
initialization.setup_workers(config)

def test_registers_heartbeat(self):
settings = {
'background.workers': 'cliquet.workers.memory',
}
config = Configurator(settings=settings)
config.registry.heartbeats = {}
initialization.setup_workers(config)
self.assertIsNotNone(config.registry.heartbeats.get('workers'))
16 changes: 12 additions & 4 deletions cliquet/tests/test_listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def make_app(self, extra_settings={}):
settings.update(**extra_settings)
config = testing.setUp(settings=settings)
config.commit()
initialization.setup_workers(config)
initialization.setup_listeners(config)
return config

Expand Down Expand Up @@ -151,8 +150,11 @@ class ListenerCalledTest(unittest.TestCase):

def setUp(self):
self.config = testing.setUp()
self.config.add_settings({'events_pool_size': 1,
'events_url': 'redis://localhost:6379/0'})
self.config.registry.heartbeats = {}
self.config.add_settings({
'events_pool_size': 1,
'events_url': 'redis://localhost:6379/0',
'background.workers': 'cliquet.workers.memory'})
initialization.setup_workers(self.config)
self.config.commit()
self._redis = create_from_config(self.config, prefix='events_')
Expand Down Expand Up @@ -224,4 +226,10 @@ class ListenerBaseTest(unittest.TestCase):
def test_not_implemented(self):
# make sure we can't use the base listener
listener = ListenerBase()
self.assertRaises(NotImplementedError, listener._run, object())
self.assertRaises(NotImplementedError, listener, object())

def test_done_logs_by_default(self):
listener = ListenerBase()
with mock.patch('cliquet.logger.info') as mocked:
listener.done(name='event', res_id=123, success=True, result=None)
self.assertTrue(mocked.called)
14 changes: 12 additions & 2 deletions cliquet/tests/test_workers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
import unittest
import time
from cliquet.workers import MemoryWorkers

from cliquet.workers import WorkersBase
from cliquet.workers.memory import Workers


def boom():
raise Exception('ok')


class WorkersBaseTest(unittest.TestCase):

def test_not_implemented(self):
# make sure we can't use the base listener
workers = WorkersBase()
self.assertRaises(NotImplementedError, workers.apply_async, '', None)


class TestMemoryWorkers(unittest.TestCase):
def setUp(self):
self.workers = MemoryWorkers(size=1)
self.workers = Workers(size=1)

def tearDown(self):
self.workers.close()
Expand Down
20 changes: 20 additions & 0 deletions cliquet/workers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class WorkersBase(object):
"""Background workers abstraction used by event listeners.
"""
def apply_async(self, name, func, args=None, callback=None):
"""Run the specified `func` in background with the specified `args`
and calls `callback` with the result.
:param str name: an arbitrary identifier
:param func func: a function to run in background
:param tuple args: a list of parameters to provide to `func`
:param func callback: the function to be called when task is done.
"""
raise NotImplementedError()


def heartbeat(backend):
"""No-op heartbeat check for workers.
XXX: find out a way to provide heartbeat feature.
"""
return lambda r: True
12 changes: 8 additions & 4 deletions cliquet/workers.py → cliquet/workers/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from functools import partial
from collections import defaultdict, OrderedDict

from . import WorkersBase

try: # pragma: no cover
# until dill works with pypy, let's use plain Pickle.
# https://github.com/uqfoundation/dill/issues/73
Expand Down Expand Up @@ -43,7 +45,7 @@ def _run(dumped): # pragma: no cover
return True, result


class MemoryWorkers(object):
class Workers(WorkersBase):
def __init__(self, size=1, result_size_limit=100):
self.closed = True
self.initialize(size, result_size_limit)
Expand Down Expand Up @@ -113,13 +115,15 @@ def apply_async(self, name, func, args=None, callback=None):
_WORKERS_PER_PROCESS = {}


def get_memory_workers(size=1):
def load_from_config(config):
settings = config.get_settings()
num_workers = int(settings.get('background.processes', 1))
pid = os.getpid()
if pid in _WORKERS_PER_PROCESS:
workers = _WORKERS_PER_PROCESS[pid]
if workers.closed:
workers.initialize(size)
workers.initialize(num_workers)
else:
_WORKERS_PER_PROCESS[pid] = workers = MemoryWorkers(size)
_WORKERS_PER_PROCESS[pid] = workers = Workers(num_workers)

return workers
24 changes: 24 additions & 0 deletions cliquet_docs/reference/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,30 @@ for every resources.
cliquet.event_listeners.redis.resources = article comment
Asynchronous listeners
::::::::::::::::::::::

By default, listeners are executed synchronously. This means that their
execution is blocking the request/response cycle.

But listeners can be also be executed asynchronously by setting ``async = true``.

.. code-block:: ini
cliquet.event_listeners.redis.async = true
.. note::

By *Cliquet* uses a very simple model for background tasks: it relies on
child processes and shared memory. This is convenient for small purposes but
has some limitations. For example, background tasks are lost when the Web process
restarts.

Extending Cliquet to support solid and distributed asynchronous job queues
like `Celery <http://www.celeryproject.org/>`_ or `Python rq <python-rq.org>`_
is trivial. Get in touch with us!


Cache
=====

Expand Down

0 comments on commit 5c93e04

Please sign in to comment.