Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reconciliation of sandbox kv store key counts. #6

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 191 additions & 8 deletions vxsandbox/resources/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import json

from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall
from twisted.python import log

from vumi.persist.txredis_manager import TxRedisManager

Expand All @@ -33,13 +35,6 @@ class RedisResource(SandboxResource):
Synonym for `keys_per_user_hard`. Deprecated.
"""

# FIXME:
# - Currently we allow key expiry to be set. Keys that expire are
# not decremented from the sandbox's key limit. This means that
# some sandboxes might hit their key limit too soon. This is
# better than not allowing expiry of keys and filling up Redis
# though.

@inlineCallbacks
def setup(self):
self.r_config = self.config.get('redis_manager', {})
Expand All @@ -48,9 +43,13 @@ def setup(self):
self.keys_per_user_soft = self.config.get(
'keys_per_user_soft', int(0.8 * self.keys_per_user_hard))
self.redis = yield TxRedisManager.from_config(self.r_config)
self.reconciler = Reconciler(self.redis)
self.reconciler.start()

@inlineCallbacks
def teardown(self):
return self.redis.close_manager()
yield self.reconciler.stop()
yield self.redis.close_manager()

def _count_key(self, sandbox_id):
return "#".join(["count", sandbox_id])
Expand Down Expand Up @@ -235,3 +234,187 @@ def handle_incr(self, api, command):
except Exception, e:
returnValue(self.reply(command, success=False, reason=unicode(e)))
returnValue(self.reply(command, value=int(value), success=True))


class ReconciliationError(Exception):
"""
Raised when an error occurs during reconciliation.
"""


class ReconciliationStatus(object):

COMPLETE = 'complete'
SCANNING = 'scanning'
SAVING = 'saving'

def __init__(self, redis):
self._redis = redis

@property
def status(self):
pass

def load(self):
"""
Load reconciliation status from Redis.
"""
pass

def save(self):
"""
Save reconciliation status to Redis.
"""
pass

def complete(self):
"""
Returns `True` if the reconciliation is complete, `False` otherwise.
"""
return self.status == self.COMPLETE

def scanning(self):
"""
Returns `True` if the reconciliation is scanning Redis keys,
`False` otherwise.
"""
return self.status == self.SCANNING

def saving(self):
"""
Return `True` if the reconciliation is write sandbox key counts
back to Redis, `False` otherwise.
"""
return self.status == self.SAVING


class ReconciliationLock(object):
def __init__(self, redis, expiry):
self._redis = redis
self._expiry = expiry

def acquire(self):
pass

def release(self):
pass


class Reconciler(object):
"""
A task for reconciling key counts.

Key counts are tracked accurately as keys are added or deleted,
but keys that expire in Redis are never removed from a sandboxes
key limit.

:param TxRedisManager redis:
Redis connection manager.

:param int period:
Seconds between reconciliation checks. Default is 600s.

:param int recon_expiry:
Seconds before a reconciliation is considered outdated.
Default is one day.
"""

DEFAULT_PERIOD = 10 * 60 # ten minutes
DEFAULT_RECON_EXPIRY = 24 * 60 * 60 # one day

work_fraction = 0.95 # fraction of the period to do work for

def __init__(self, redis, period=None, recon_expiry=None):
self._redis = redis
self._period = period or self.DEFAULT_PERIOD
self._recon_expiry = recon_expiry or self.DEFAULT_RECON_EXPIRY
self._task = LoopingCall(self.attempt_reconciliation)
self._done = None

def _recon_key(self, *parts):
return "#".join(["recon"] + parts)

def _lock_key(self):
return self._recon_key("lock")

def start(self):
"""
Start attempting reconciliation.
"""
if self._done is None:
self._done = self._task.start(self._period, now=False)
self._done.addErrback(
lambda failure: log.err(
failure, "Reconciliation task failed."))

@inlineCallbacks
def stop(self):
"""
Stop attempting reconciliation.
"""
if self._done is not None:
self._task.stop()
yield self._done
self._done = None

@inlineCallbacks
def attempt_reconciliation(self):
"""
Attempt to perform some reconciliation work.

Only one worker performs reconciliation work at a time. This
reduces load on Redis and simplifies the implementation.

Work consists off:

* Load the reconciliation status from Redis.
* If no reconciliation is in progress and the last one was recent
enough, no work is needed.
* If there is a reconciliation in progress, continue scanning
keys from where the work stopped, updating counts as needed.
* After doing some work, write the results back to the reconciliation
status in Redis.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this isn't yet ready for review, but its a bit unclear to me what part the locks play in these steps. Could you explain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is to prevent multiple workers from doing reconciliation work at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Missed this bit, sorry:

Only one worker performs reconciliation work at a time. This reduces load on Redis and simplifies the implementation.

This step confused me:

If there is a reconciliation in progress, continue scanning keys from where the work stopped, updating counts as needed.

I thought that meant that another recon can start while another is in progress, and it'd continue work from where the other recon is going to be ending.

Looking at the implementation, it seems like each looping call does a bit of work at a time, and then leaves the rest for the next call. Is that what happens?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


When a reconciliation is done:

* Write the results to the sandbox key counts.
* Make the reconciliation as complete and save the status in Redis.

Work should continue for only one period and should wrap up when
the task is stopped.
"""
lock = ReconciliationLock(self._redis, expiry=self._period)
start_time = self._task.clock.seconds()
end_time = start_time + self._period * self.work_fraction

def timeout():
return self._task.clock.seconds() > end_time

if not (yield lock.acquire()):
return
try:
recon = ReconciliationStatus(self._redis, self._task.clock)
yield recon.load()

if recon.complete() and recon.expired():
recon.reset()

if recon.complete():
return

if recon.scanning():
while (recon.scanning() and self._task.running
and not timeout()):
cursor = recon.scan_cursor()
cursor, keys = yield self._redis.scan(cursor, "match")
recon.update_scan(cursor, keys)
yield recon.save()

if recon.saving():
while (recon.saving_counts() and self._task.running
and not timeout()):
sandbox_id, key_count = recon.pop_count()
yield self._redis.set("#%s" % sandbox_id, key_count)
yield recon.save()
finally:
yield lock.release()
41 changes: 39 additions & 2 deletions vxsandbox/resources/tests/test_kv.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import logging

from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import inlineCallbacks, Deferred

from vumi.tests.helpers import PersistenceHelper
from vumi.tests.utils import VumiTestCase

from vxsandbox.resources.kv import RedisResource
from vxsandbox.resources.kv import RedisResource, Reconciler
from vxsandbox.resources.tests.utils import ResourceTestCaseBase


Expand Down Expand Up @@ -202,3 +203,39 @@ def test_handle_incr_hard_limit_reached(self):
message,
'Redis hard limit of 100 keys reached for sandbox test_id. '
'No more keys can be written.')


class TestReconciler(VumiTestCase):
@inlineCallbacks
def setUp(self):
super(TestReconciler, self).setUp()
self.persistence_helper = self.add_helper(PersistenceHelper())
self.r_server = yield self.persistence_helper.get_redis_manager()

def mk_reconciler(self, **kw):
reconciler = Reconciler(self.r_server, **kw)
self.addCleanup(reconciler.stop)
return reconciler

def test_start(self):
reconciler = self.mk_reconciler()
self.assertEqual(reconciler._task.running, False)
self.assertEqual(reconciler._done, None)
reconciler.start()
self.assertEqual(reconciler._task.running, True)
self.assertTrue(isinstance(reconciler._done, Deferred))

@inlineCallbacks
def test_stop(self):
reconciler = self.mk_reconciler()
reconciler.start()
self.assertEqual(reconciler._task.running, True)
self.assertTrue(isinstance(reconciler._done, Deferred))
yield reconciler.stop()
self.assertEqual(reconciler._task.running, False)
self.assertEqual(reconciler._done, None)

@inlineCallbacks
def test_attempt_reconciliation_no_previous_recon(self):
reconciler = self.mk_reconciler()
yield reconciler.attempt_reconciliation()