Skip to content

Commit

Permalink
Extract Adapters, add Redis adapter (#18)
Browse files Browse the repository at this point in the history
This also adds a Redis adapter and a Memory adapter. The Redis adapter is useful since it can be a continuation of Prorate in a way - but with conditional fillup added in.

The memory store is useful in a different way - it can be used as a reference implementation to test other adapters against. Also make the adapter configurable on the `Pecorino` module.
  • Loading branch information
julik authored Apr 6, 2024
1 parent 35fd837 commit 65a65af
Show file tree
Hide file tree
Showing 27 changed files with 989 additions and 216 deletions.
31 changes: 5 additions & 26 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,11 @@ name: CI

on:
- push
- pull_request

env:
BUNDLE_PATH: vendor/bundle

jobs:
# lint:
# name: Code Style
# runs-on: ubuntu-22.04
# if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
# strategy:
# matrix:
# ruby:
# - '2.7'
# steps:
# - name: Checkout
# uses: actions/checkout@v4
# - name: Setup Ruby
# uses: ruby/setup-ruby@v1
# with:
# ruby-version: ${{ matrix.ruby }}
# bundler-cache: true
# - name: Rubocop Cache
# uses: actions/cache@v3
# with:
# path: ~/.cache/rubocop_cache
# key: ${{ runner.os }}-rubocop-${{ hashFiles('.rubocop.yml') }}
# restore-keys: |
# ${{ runner.os }}-rubocop-
# - name: Rubocop
# run: bundle exec rubocop
test:
name: Tests
runs-on: ubuntu-22.04
Expand All @@ -50,6 +24,11 @@ jobs:
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
ports:
- 5432:5432
redis:
image: redis
options: --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5
ports:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.7.0

- Allow `Pecorino.adapter` to be assigned, and add `adapter:` to all classes. This allows the adapter for Pecorino to be configured manually and overridden in an initializer.
- Add Redis-based adapter derived from Prorate
- Formalize and test the adapter API
- Add a memory-based adapter for single-process applications (and as a reference)

## 0.6.0

- Add `Pecorino::Block` for setting blocks directly. These are available both to `Throttle` with the same key and on their own. This can be used to set arbitrary blocks without having to configure a `Throttle` first.
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

Pecorino is a rate limiter based on the concept of leaky buckets, or more specifically - based on the [generic cell rate](https://brandur.org/rate-limiting) algorithm. It uses your DB as the storage backend for the throttles. It is compact, easy to install, and does not require additional infrastructure. The approach used by Pecorino has been previously used by [prorate](https://github.com/WeTransfer/prorate) with Redis, and that approach has proven itself.

Pecorino is designed to integrate seamlessly into any Rails application using a PostgreSQL or SQLite database (at the moment there is no MySQL support, we would be delighted if you could add it).
Pecorino is designed to integrate seamlessly into any Rails application, and will use either:

* A memory store (good enough if you have just 1 process)
* A PostgreSQL or SQLite database (at the moment there is no MySQL support, we would be delighted if you could add it)
* A Redis instance

If you would like to know more about the leaky bucket algorithm: [this article](http://live.julik.nl/2022/08/the-unreasonable-effectiveness-of-leaky-buckets) or the [Wikipedia article](https://en.wikipedia.org/wiki/Leaky_bucket) are both good starting points. [This Wikipedia article](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm) describes the generic cell rate algorithm in more detail as well.

Expand Down
57 changes: 31 additions & 26 deletions lib/pecorino.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@
require_relative "pecorino/railtie" if defined?(Rails::Railtie)

module Pecorino
autoload :Postgres, "pecorino/postgres"
autoload :Sqlite, "pecorino/sqlite"
autoload :LeakyBucket, "pecorino/leaky_bucket"
autoload :Block, "pecorino/block"
autoload :Throttle, "pecorino/throttle"
autoload :CachedThrottle, "pecorino/cached_throttle"

module Adapters
autoload :MemoryAdapter, "pecorino/adapters/memory_adapter"
autoload :PostgresAdapter, "pecorino/adapters/postgres_adapter"
autoload :SqliteAdapter, "pecorino/adapters/sqlite_adapter"
autoload :RedisAdapter, "pecorino/adapters/redis_adapter"
end

# Deletes stale leaky buckets and blocks which have expired. Run this method regularly to
# avoid accumulating too many unused rows in your tables.
#
# @return void
def self.prune!
# Delete all the old blocks here (if we are under a heavy swarm of requests which are all
# blocked it is probably better to avoid the big delete)
ActiveRecord::Base.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()")

# Prune buckets which are no longer used. No "uncached" needed here since we are using "execute"
ActiveRecord::Base.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()")
adapter.prune
end

# Creates the tables and indexes needed for Pecorino. Call this from your migrations like so:
Expand All @@ -38,36 +38,41 @@ def self.prune!
# @param active_record_schema[ActiveRecord::SchemaMigration] the migration through which we will create the tables
# @return void
def self.create_tables(active_record_schema)
active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t|
t.string :key, null: false
t.float :level, null: false
t.datetime :last_touched_at, null: false
t.datetime :may_be_deleted_after, null: false
end
active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true
active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after]
adapter.create_tables(active_record_schema)
end

active_record_schema.create_table :pecorino_blocks, id: :uuid do |t|
t.string :key, null: false
t.datetime :blocked_until, null: false
end
active_record_schema.add_index :pecorino_blocks, [:key], unique: true
active_record_schema.add_index :pecorino_blocks, [:blocked_until]
# Allows assignment of an adapter for storing throttles. Normally this would be a subclass of `Pecorino::Adapters::BaseAdapter`, but
# you can assign anything you like. Set this in an initializer. By default Pecorino will use the adapter configured from your main
# database, but you can also create a separate database for it - or use Redis or memory storage.
#
# @param adapter[Pecorino::Adapters::BaseAdapter]
# @return [Pecorino::Adapters::BaseAdapter]
def self.adapter=(adapter)
@adapter = adapter
end

# Returns the currently configured adapter, or the default adapter from the main database
#
# @return [Pecorino::Adapters::BaseAdapter]
def self.adapter
@adapter || default_adapter_from_main_database
end

# Returns the database implementation for setting the values atomically. Since the implementation
# differs per database, this method will return a different adapter depending on which database is
# being used
def self.adapter
#
# @param adapter[Pecorino::Adapters::BaseAdapter]
def self.default_adapter_from_main_database
model_class = ActiveRecord::Base
adapter_name = model_class.connection.adapter_name
case adapter_name
when /postgres/i
Pecorino::Postgres.new(model_class)
Pecorino::Adapters::PostgresAdapter.new(model_class)
when /sqlite/i
Pecorino::Sqlite.new(model_class)
Pecorino::Adapters::SqliteAdapter.new(model_class)
else
raise "Pecorino does not support #{adapter_name} just yet"
raise "Pecorino does not support the #{adapter_name} database just yet"
end
end
end
66 changes: 66 additions & 0 deletions lib/pecorino/adapters/base_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# frozen_string_literal: true

# An adapter allows Pecorino throttles, leaky buckets and other
# resources to interfact to a data storage backend - a database, usually.
class Pecorino::Adapters::BaseAdapter
# Returns the state of a leaky bucket. The state should be a tuple of two
# values: the current level (Float) and whether the bucket is now at capacity (Boolean)
#
# @param key[String] the key of the leaky bucket
# @param capacity[Float] the capacity of the leaky bucket to limit to
# @param leak_rate[Float] how many tokens leak out of the bucket per second
# @return [Array]
def state(key:, capacity:, leak_rate:)
[0, false]
end

# Adds tokens to the leaky bucket. The return value is a tuple of two
# values: the current level (Float) and whether the bucket is now at capacity (Boolean)
#
# @param key[String] the key of the leaky bucket
# @param capacity[Float] the capacity of the leaky bucket to limit to
# @param leak_rate[Float] how many tokens leak out of the bucket per second
# @param n_tokens[Float] how many tokens to add
# @return [Array]
def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
[0, false]
end

# Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will
# be added. If there isn't - the fillup will be rejected. The return value is a triplet of
# the current level (Float), whether the bucket is now at capacity (Boolean)
# and whether the fillup was accepted (Boolean)
#
# @param key[String] the key of the leaky bucket
# @param capacity[Float] the capacity of the leaky bucket to limit to
# @param leak_rate[Float] how many tokens leak out of the bucket per second
# @param n_tokens[Float] how many tokens to add
# @return [Array]
def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
[0, false, false]
end

# Sets a timed block for the given key - this is used when a throttle fires. The return value
# is not defined - the call should always succeed.
# @param key[String] the key of the block
# @param block_for[#to_f, Active Support Duration] the duration of the block, in seconds
def set_block(key:, block_for:)
end

# Returns the time until which a block for a given key is in effect. If there is no block in
# effect, the method should return `nil`. The return value is either a `Time` or `nil`
# @param key[String] the key of the block
def blocked_until(key:)
end

# Deletes leaky buckets which have an expiry value prior to now and throttle blocks which have
# now lapsed
# @return [void]
def prune
end

# Creates the database tables for Pecorino to operate, or initializes other
# schema-like resources the adapter needs to operate
def create_tables(active_record_schema)
end
end
147 changes: 147 additions & 0 deletions lib/pecorino/adapters/memory_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# frozen_string_literal: true

# A memory store for leaky buckets and blocks
class Pecorino::Adapters::MemoryAdapter
class KeyedLock
def initialize
@locked_keys = Set.new
@lock_mutex = Mutex.new
end

def lock(key)
loop do
@lock_mutex.synchronize do
next if @locked_keys.include?(key)
@locked_keys << key
return
end
end
end

def unlock(key)
@lock_mutex.synchronize do
@locked_keys.delete(key)
end
end

def with(key)
lock(key)
yield
ensure
unlock(key)
end
end

def initialize
@buckets = {}
@blocks = {}
@lock = KeyedLock.new
end

# Returns the state of a leaky bucket. The state should be a tuple of two
# values: the current level (Float) and whether the bucket is now at capacity (Boolean)
def state(key:, capacity:, leak_rate:)
@lock.lock(key)
level, ts = @buckets[key]
@lock.unlock(key)

return [0, false] unless level

dt = get_mono_time - ts
level_after_leak = [0, level - (leak_rate * dt)].max
[level_after_leak.to_f, (level_after_leak - capacity) >= 0]
end

# Adds tokens to the leaky bucket. The return value is a tuple of two
# values: the current level (Float) and whether the bucket is now at capacity (Boolean)
def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = false)
end

# Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will
# be added. If there isn't - the fillup will be rejected. The return value is a triplet of
# the current level (Float), whether the bucket is now at capacity (Boolean)
# and whether the fillup was accepted (Boolean)
def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = true)
end

# Sets a timed block for the given key - this is used when a throttle fires. The return value
# is not defined - the call should always succeed.
def set_block(key:, block_for:)
raise ArgumentError, "block_for must be positive" unless block_for > 0
@lock.lock(key)
@blocks[key] = get_mono_time + block_for.to_f
Time.now + block_for.to_f
ensure
@lock.unlock(key)
end

# Returns the time until which a block for a given key is in effect. If there is no block in
# effect, the method should return `nil`. The return value is either a `Time` or `nil`
def blocked_until(key:)
blocked_until_monotonic = @blocks[key]
return unless blocked_until_monotonic

now_monotonic = get_mono_time
return unless blocked_until_monotonic > now_monotonic

Time.now + (blocked_until_monotonic - now_monotonic)
end

# Deletes leaky buckets which have an expiry value prior to now and throttle blocks which have
# now lapsed
def prune
now_monotonic = get_mono_time

@blocks.keys.each do |key|
@lock.with(key) do
@blocks.delete(key) if @blocks[key] && @blocks[key] < now_monotonic
end
end

@buckets.keys.each do |key|
@lock.with(key) do
_level, expire_at_monotonic = @buckets[key]
@buckets.delete(key) if expire_at_monotonic && expire_at_monotonic < now_monotonic
end
end
end

# No-op
def create_tables(active_record_schema)
end

private

def add_tokens_with_lock(key, capacity, leak_rate, n_tokens, conditionally)
@lock.lock(key)
now = get_mono_time
level, ts, _ = @buckets[key] || [0.0, now]

dt = now - ts
level_after_leak = clamp(0, level - (leak_rate * dt), capacity)
level_after_fillup = level_after_leak + n_tokens
if level_after_fillup > capacity && conditionally
return [level_after_leak, level_after_leak >= capacity, _did_accept = false]
end

clamped_level_after_fillup = clamp(0, level_after_fillup, capacity)
expire_after = now + (level_after_fillup / leak_rate)
@buckets[key] = [clamped_level_after_fillup, now, expire_after]

[clamped_level_after_fillup, clamped_level_after_fillup == capacity, _did_accept = true]
ensure
@lock.unlock(key)
end

def get_mono_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def clamp(min, value, max)
return min if value < min
return max if value > max
value
end
end
Loading

0 comments on commit 65a65af

Please sign in to comment.