From d40d223cc7de9a126e311dad4242b37b666e60bc Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 11 Mar 2015 17:15:57 +0200 Subject: [PATCH] Start of key count reconciliation. --- vxsandbox/resources/kv.py | 199 +++++++++++++++++++++++++-- vxsandbox/resources/tests/test_kv.py | 41 +++++- 2 files changed, 230 insertions(+), 10 deletions(-) diff --git a/vxsandbox/resources/kv.py b/vxsandbox/resources/kv.py index f33c825..5844ac7 100644 --- a/vxsandbox/resources/kv.py +++ b/vxsandbox/resources/kv.py @@ -8,6 +8,8 @@ import json from twisted.internet.defer import inlineCallbacks, returnValue +from twisted.internet.task import LoopingCall +from twisted.python import log from vumi.persist.txredis_manager import TxRedisManager @@ -33,13 +35,6 @@ class RedisResource(SandboxResource): Synonym for `keys_per_user_hard`. Deprecated. """ - # FIXME: - # - Currently we allow key expiry to be set. Keys that expire are - # not decremented from the sandbox's key limit. This means that - # some sandboxes might hit their key limit too soon. This is - # better than not allowing expiry of keys and filling up Redis - # though. - @inlineCallbacks def setup(self): self.r_config = self.config.get('redis_manager', {}) @@ -48,9 +43,13 @@ def setup(self): self.keys_per_user_soft = self.config.get( 'keys_per_user_soft', int(0.8 * self.keys_per_user_hard)) self.redis = yield TxRedisManager.from_config(self.r_config) + self.reconciler = Reconciler(self.redis) + self.reconciler.start() + @inlineCallbacks def teardown(self): - return self.redis.close_manager() + yield self.reconciler.stop() + yield self.redis.close_manager() def _count_key(self, sandbox_id): return "#".join(["count", sandbox_id]) @@ -235,3 +234,187 @@ def handle_incr(self, api, command): except Exception, e: returnValue(self.reply(command, success=False, reason=unicode(e))) returnValue(self.reply(command, value=int(value), success=True)) + + +class ReconciliationError(Exception): + """ + Raised when an error occurs during reconciliation. + """ + + +class ReconciliationStatus(object): + + COMPLETE = 'complete' + SCANNING = 'scanning' + SAVING = 'saving' + + def __init__(self, redis): + self._redis = redis + + @property + def status(self): + pass + + def load(self): + """ + Load reconciliation status from Redis. + """ + pass + + def save(self): + """ + Save reconciliation status to Redis. + """ + pass + + def complete(self): + """ + Returns `True` if the reconciliation is complete, `False` otherwise. + """ + return self.status == self.COMPLETE + + def scanning(self): + """ + Returns `True` if the reconciliation is scanning Redis keys, + `False` otherwise. + """ + return self.status == self.SCANNING + + def saving(self): + """ + Return `True` if the reconciliation is write sandbox key counts + back to Redis, `False` otherwise. + """ + return self.status == self.SAVING + + +class ReconciliationLock(object): + def __init__(self, redis, expiry): + self._redis = redis + self._expiry = expiry + + def acquire(self): + pass + + def release(self): + pass + + +class Reconciler(object): + """ + A task for reconciling key counts. + + Key counts are tracked accurately as keys are added or deleted, + but keys that expire in Redis are never removed from a sandboxes + key limit. + + :param TxRedisManager redis: + Redis connection manager. + + :param int period: + Seconds between reconciliation checks. Default is 600s. + + :param int recon_expiry: + Seconds before a reconciliation is considered outdated. + Default is one day. + """ + + DEFAULT_PERIOD = 10 * 60 # ten minutes + DEFAULT_RECON_EXPIRY = 24 * 60 * 60 # one day + + work_fraction = 0.95 # fraction of the period to do work for + + def __init__(self, redis, period=None, recon_expiry=None): + self._redis = redis + self._period = period or self.DEFAULT_PERIOD + self._recon_expiry = recon_expiry or self.DEFAULT_RECON_EXPIRY + self._task = LoopingCall(self.attempt_reconciliation) + self._done = None + + def _recon_key(self, *parts): + return "#".join(["recon"] + parts) + + def _lock_key(self): + return self._recon_key("lock") + + def start(self): + """ + Start attempting reconciliation. + """ + if self._done is None: + self._done = self._task.start(self._period, now=False) + self._done.addErrback( + lambda failure: log.err( + failure, "Reconciliation task failed.")) + + @inlineCallbacks + def stop(self): + """ + Stop attempting reconciliation. + """ + if self._done is not None: + self._task.stop() + yield self._done + self._done = None + + @inlineCallbacks + def attempt_reconciliation(self): + """ + Attempt to perform some reconciliation work. + + Only one worker performs reconciliation work at a time. This + reduces load on Redis and simplifies the implementation. + + Work consists off: + + * Load the reconciliation status from Redis. + * If no reconciliation is in progress and the last one was recent + enough, no work is needed. + * If there is a reconciliation in progress, continue scanning + keys from where the work stopped, updating counts as needed. + * After doing some work, write the results back to the reconciliation + status in Redis. + + When a reconciliation is done: + + * Write the results to the sandbox key counts. + * Make the reconciliation as complete and save the status in Redis. + + Work should continue for only one period and should wrap up when + the task is stopped. + """ + lock = ReconciliationLock(self._redis, expiry=self._period) + start_time = self._task.clock.seconds() + end_time = start_time + self._period * self.work_fraction + + def timeout(): + return self._task.clock.seconds() > end_time + + if not (yield lock.acquire()): + return + try: + recon = ReconciliationStatus(self._redis, self._task.clock) + yield recon.load() + + if recon.complete() and recon.expired(): + recon.reset() + + if recon.complete(): + return + + if recon.scanning(): + while (recon.scanning() and self._task.running + and not timeout()): + cursor = recon.scan_cursor() + cursor, keys = yield self._redis.scan(cursor, "match") + recon.update_scan(cursor, keys) + yield recon.save() + + if recon.saving(): + while (recon.saving_counts() and self._task.running + and not timeout()): + sandbox_id, key_count = recon.pop_count() + yield self._redis.set("#%s" % sandbox_id, key_count) + yield recon.save() + finally: + yield lock.release() diff --git a/vxsandbox/resources/tests/test_kv.py b/vxsandbox/resources/tests/test_kv.py index 4f8a867..42f11ba 100644 --- a/vxsandbox/resources/tests/test_kv.py +++ b/vxsandbox/resources/tests/test_kv.py @@ -1,11 +1,12 @@ import json import logging -from twisted.internet.defer import inlineCallbacks +from twisted.internet.defer import inlineCallbacks, Deferred from vumi.tests.helpers import PersistenceHelper +from vumi.tests.utils import VumiTestCase -from vxsandbox.resources.kv import RedisResource +from vxsandbox.resources.kv import RedisResource, Reconciler from vxsandbox.resources.tests.utils import ResourceTestCaseBase @@ -202,3 +203,39 @@ def test_handle_incr_hard_limit_reached(self): message, 'Redis hard limit of 100 keys reached for sandbox test_id. ' 'No more keys can be written.') + + +class TestReconciler(VumiTestCase): + @inlineCallbacks + def setUp(self): + super(TestReconciler, self).setUp() + self.persistence_helper = self.add_helper(PersistenceHelper()) + self.r_server = yield self.persistence_helper.get_redis_manager() + + def mk_reconciler(self, **kw): + reconciler = Reconciler(self.r_server, **kw) + self.addCleanup(reconciler.stop) + return reconciler + + def test_start(self): + reconciler = self.mk_reconciler() + self.assertEqual(reconciler._task.running, False) + self.assertEqual(reconciler._done, None) + reconciler.start() + self.assertEqual(reconciler._task.running, True) + self.assertTrue(isinstance(reconciler._done, Deferred)) + + @inlineCallbacks + def test_stop(self): + reconciler = self.mk_reconciler() + reconciler.start() + self.assertEqual(reconciler._task.running, True) + self.assertTrue(isinstance(reconciler._done, Deferred)) + yield reconciler.stop() + self.assertEqual(reconciler._task.running, False) + self.assertEqual(reconciler._done, None) + + @inlineCallbacks + def test_attempt_reconciliation_no_previous_recon(self): + reconciler = self.mk_reconciler() + yield reconciler.attempt_reconciliation()