From 6ba0cc7867f536c405dc0896b8f5590fa228710c Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 13:20:27 +0100 Subject: [PATCH 01/36] Rework how adapters are stored Move them into a module and a directory. This will allow us to expand the adapter corpus shortly. --- lib/pecorino.rb | 35 +++++---------- lib/pecorino/adapters/base_adapter.rb | 45 +++++++++++++++++++ lib/pecorino/adapters/database_adapter.rb | 36 +++++++++++++++ .../postgres_adapter.rb} | 2 +- .../{sqlite.rb => adapters/sqlite_adapter.rb} | 2 +- 5 files changed, 93 insertions(+), 27 deletions(-) create mode 100644 lib/pecorino/adapters/base_adapter.rb create mode 100644 lib/pecorino/adapters/database_adapter.rb rename lib/pecorino/{postgres.rb => adapters/postgres_adapter.rb} (98%) rename lib/pecorino/{sqlite.rb => adapters/sqlite_adapter.rb} (99%) diff --git a/lib/pecorino.rb b/lib/pecorino.rb index 00eea4f..5358d1c 100644 --- a/lib/pecorino.rb +++ b/lib/pecorino.rb @@ -7,24 +7,23 @@ require_relative "pecorino/railtie" if defined?(Rails::Railtie) module Pecorino - autoload :Postgres, "pecorino/postgres" - autoload :Sqlite, "pecorino/sqlite" autoload :LeakyBucket, "pecorino/leaky_bucket" autoload :Block, "pecorino/block" autoload :Throttle, "pecorino/throttle" autoload :CachedThrottle, "pecorino/cached_throttle" + module Adapters + autoload :DatabaseAdapter, "pecorino/adapters/database_adapter" + autoload :PostgresAdapter, "pecorino/adapters/postgres_adapter" + autoload :SqliteAdapter, "pecorino/adapters/sqlite_adapter" + end + # Deletes stale leaky buckets and blocks which have expired. Run this method regularly to # avoid accumulating too many unused rows in your tables. # # @return void def self.prune! - # Delete all the old blocks here (if we are under a heavy swarm of requests which are all - # blocked it is probably better to avoid the big delete) - ActiveRecord::Base.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()") - - # Prune buckets which are no longer used. No "uncached" needed here since we are using "execute" - ActiveRecord::Base.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()") + adapter.prune end # Creates the tables and indexes needed for Pecorino. Call this from your migrations like so: @@ -38,21 +37,7 @@ def self.prune! # @param active_record_schema[ActiveRecord::SchemaMigration] the migration through which we will create the tables # @return void def self.create_tables(active_record_schema) - active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t| - t.string :key, null: false - t.float :level, null: false - t.datetime :last_touched_at, null: false - t.datetime :may_be_deleted_after, null: false - end - active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true - active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after] - - active_record_schema.create_table :pecorino_blocks, id: :uuid do |t| - t.string :key, null: false - t.datetime :blocked_until, null: false - end - active_record_schema.add_index :pecorino_blocks, [:key], unique: true - active_record_schema.add_index :pecorino_blocks, [:blocked_until] + adapter.create_tables(active_record_schema) end # Returns the database implementation for setting the values atomically. Since the implementation @@ -63,9 +48,9 @@ def self.adapter adapter_name = model_class.connection.adapter_name case adapter_name when /postgres/i - Pecorino::Postgres.new(model_class) + Pecorino::Adapters::PostgresAdapter.new(model_class) when /sqlite/i - Pecorino::Sqlite.new(model_class) + Pecorino::Adapters::SqliteAdapter.new(model_class) else raise "Pecorino does not support #{adapter_name} just yet" end diff --git a/lib/pecorino/adapters/base_adapter.rb b/lib/pecorino/adapters/base_adapter.rb new file mode 100644 index 0000000..e18abb5 --- /dev/null +++ b/lib/pecorino/adapters/base_adapter.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +# An adapter allows Pecorino throttles, leaky buckets and other +# resources to interfact to a data storage backend - a database, usually. +class Pecorino::Adapters::BaseAdapter + # Returns the state of a leaky bucket. The state should be a tuple of two + # values: the current level (Float) and whether the bucket is now at capacity (Boolean) + def state(key:, capacity:, leak_rate:) + [0, false] + end + + # Adds tokens to the leaky bucket. The return value is a tuple of two + # values: the current level (Float) and whether the bucket is now at capacity (Boolean) + def add_tokens(key:, capacity:, leak_rate:, n_tokens:) + [0, false] + end + + # Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will + # be added. If there isn't - the fillup will be rejected. The return value is a triplet of + # the current level (Float), whether the bucket is now at capacity (Boolean) + # and whether the fillup was accepted (Boolean) + def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) + [0, false, false] + end + + # Sets a timed block for the given key - this is used when a throttle fires. The return value + # is not defined - the call should always succeed. + def set_block(key:, block_for:) + end + + # Returns the time until which a block for a given key is in effect. If there is no block in + # effect, the method should return `nil`. The return value is either a `Time` or `nil` + def blocked_until(key:) + end + + # Deletes leaky buckets which have an expiry value prior to now and throttle blocks which have + # now lapsed + def prune + end + + # Creates the database tables for Pecorino to operate, or initializes other + # schema-like resources the adapter needs to operate + def create_tables(active_record_schema) + end +end diff --git a/lib/pecorino/adapters/database_adapter.rb b/lib/pecorino/adapters/database_adapter.rb new file mode 100644 index 0000000..19efeba --- /dev/null +++ b/lib/pecorino/adapters/database_adapter.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +class Pecorino::Adapters::DatabaseAdapter + attr_reader :model_class + + def initialize(model_class) + @model_class = model_class + end + + def prune + # Delete all the old blocks here (if we are under a heavy swarm of requests which are all + # blocked it is probably better to avoid the big delete) + model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()") + + # Prune buckets which are no longer used. No "uncached" needed here since we are using "execute" + model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()") + end + + def create_tables(active_record_schema) + active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t| + t.string :key, null: false + t.float :level, null: false + t.datetime :last_touched_at, null: false + t.datetime :may_be_deleted_after, null: false + end + active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true + active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after] + + active_record_schema.create_table :pecorino_blocks, id: :uuid do |t| + t.string :key, null: false + t.datetime :blocked_until, null: false + end + active_record_schema.add_index :pecorino_blocks, [:key], unique: true + active_record_schema.add_index :pecorino_blocks, [:blocked_until] + end +end diff --git a/lib/pecorino/postgres.rb b/lib/pecorino/adapters/postgres_adapter.rb similarity index 98% rename from lib/pecorino/postgres.rb rename to lib/pecorino/adapters/postgres_adapter.rb index 24368bc..721c0ee 100644 --- a/lib/pecorino/postgres.rb +++ b/lib/pecorino/adapters/postgres_adapter.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -Pecorino::Postgres = Struct.new(:model_class) do +class Pecorino::Adapters::PostgresAdapter < Pecorino::Adapters::DatabaseAdapter def state(key:, capacity:, leak_rate:) query_params = { key: key.to_s, diff --git a/lib/pecorino/sqlite.rb b/lib/pecorino/adapters/sqlite_adapter.rb similarity index 99% rename from lib/pecorino/sqlite.rb rename to lib/pecorino/adapters/sqlite_adapter.rb index db2c1ff..a56528e 100644 --- a/lib/pecorino/sqlite.rb +++ b/lib/pecorino/adapters/sqlite_adapter.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -Pecorino::Sqlite = Struct.new(:model_class) do +class Pecorino::Adapters::SqliteAdapter < Pecorino::Adapters::DatabaseAdapter def state(key:, capacity:, leak_rate:) # With a server database, it is really important to use the clock of the database itself so # that concurrent requests will see consistent bucket level calculations. Since SQLite is From 4b63934275abb533490c294b7ff1dc1f8bfb283b Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 18:05:04 +0100 Subject: [PATCH 02/36] Start extracting adapter tests --- test/adapters/adapter_test_methods.rb | 207 +++++++++++++++++++++++++ test/adapters/memory_adapter_test.rb | 16 ++ test/adapters/postgres_adapter_test.rb | 14 ++ test/adapters/sqlite_adapter_test.rb | 16 ++ 4 files changed, 253 insertions(+) create mode 100644 test/adapters/adapter_test_methods.rb create mode 100644 test/adapters/memory_adapter_test.rb create mode 100644 test/adapters/postgres_adapter_test.rb create mode 100644 test/adapters/sqlite_adapter_test.rb diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb new file mode 100644 index 0000000..30d5c50 --- /dev/null +++ b/test/adapters/adapter_test_methods.rb @@ -0,0 +1,207 @@ +module AdapterTestMethods + LEVEL_DELTA = 0.1 + + def adapter + return @adapter if @adapter + raise "Adapter test subclass needs to return an adapter implementation from here. If the object holds state, it should be the same during a test case." + end + + def random_key + Random.new(Minitest.seed).hex(4) + end + + def test_state_returns_zero_for_nonexistent_bucket + k = random_key + leak_rate = 2 + capacity = 3 + + level, is_full = adapter.state(key: k, capacity: capacity, leak_rate: leak_rate) + assert_equal 0, level + assert_equal is_full, false + end + + def test_bucket_lifecycle_with_unbounded_fillups + k = random_key + leak_rate = 2 + capacity = 1 + + level, is_full = adapter.add_tokens(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.3) + assert_in_delta level, 0.3, LEVEL_DELTA + assert_equal false, is_full + + level, is_full = adapter.add_tokens(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.35) + assert_in_delta level, 0.65, LEVEL_DELTA + assert_equal false, is_full + + level, is_full = adapter.add_tokens(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.4) + assert_in_delta level, 1.0, LEVEL_DELTA + assert_equal true, is_full + + level, _ = adapter.state(key: k, capacity: capacity, leak_rate: leak_rate) + assert_in_delta level, 1.0, LEVEL_DELTA + + sleep(0.25) + level, _ = adapter.state(key: k, capacity: capacity, leak_rate: leak_rate) + assert_in_delta level, 0.5, LEVEL_DELTA + + sleep(0.25) + level, _ = adapter.state(key: k, capacity: capacity, leak_rate: leak_rate) + assert_in_delta level, 0.0, LEVEL_DELTA + + sleep(0.25) + level, _ = adapter.state(key: k, capacity: capacity, leak_rate: leak_rate) + assert_in_delta level, 0.0, LEVEL_DELTA + end + + def test_clamps_fillup_with_negative_value + k = random_key + leak_rate = 1.1 + capacity = 15 + + level, _, _ = adapter.state(key: k, leak_rate: leak_rate, capacity: capacity) + assert_in_delta level, 0, 0.0001 + + level, _, _ = adapter.add_tokens(key: k, leak_rate: leak_rate, capacity: capacity, n_tokens: -10) + assert_in_delta level, 0, 0.1 + end + + def test_bucket_lifecycle_with_negative_fillups + k = random_key + leak_rate = 2 + capacity = 1 + + level, is_full = adapter.add_tokens(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 1) + assert_in_delta level, 1.0, LEVEL_DELTA + assert_equal true, is_full + + level, is_full = adapter.add_tokens(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: -0.35) + assert_in_delta level, 0.65, LEVEL_DELTA + assert_equal false, is_full + + level, is_full = adapter.add_tokens(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: -0.4) + assert_in_delta level, 0.25, LEVEL_DELTA + assert_equal false, is_full + + level, is_full = adapter.add_tokens(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: -0.4) + assert_in_delta level, 0.0, LEVEL_DELTA + assert_equal false, is_full + end + + def test_bucket_add_tokens_conditionally_accepts_single_fillup_to_capacity + k = random_key + leak_rate = 2 + capacity = 1 + + level, is_full, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 1) + assert_in_delta level, 1.0, LEVEL_DELTA + assert_equal is_full, true + assert_equal did_accept, true + end + + def test_bucket_add_tokens_conditionally_accepts_multiple_fillups_to_capacity + k = random_key + leak_rate = 2 + capacity = 1 + + level, is_full, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.5) + assert_in_delta level, 0.5, LEVEL_DELTA + assert_equal did_accept, true + + level, is_full, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.5) + assert_in_delta level, 1.0, LEVEL_DELTA + assert_equal did_accept, true + end + + def test_bucket_lifecycle_rejects_single_fillup_above_capacity + k = random_key + leak_rate = 2 + capacity = 1 + + level, is_full, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 1.2) + assert_in_delta level, 0.0, LEVEL_DELTA + assert_equal is_full, false + assert_equal did_accept, false + end + + def test_bucket_lifecycle_rejects_conditional_fillup_that_would_overflow + k = random_key + leak_rate = 2 + capacity = 1 + + 3.times do + _level, is_full, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.3) + assert_equal is_full, false + assert_equal did_accept, true + end + + level, is_full, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.3) + assert_in_delta level, 0.9, LEVEL_DELTA + assert_equal is_full, false + assert_equal did_accept, false + end + + def test_bucket_lifecycle_handles_conditional_fillup_in_steps + key = random_key + leak_rate = 1.0 + capacity = 1.0 + + counter = 0 + try_fillup = ->(fillup_by, should_have_reached_level, should_have_accepted) { + counter += 1 + level, became_full, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: fillup_by) + assert_equal did_accept, should_have_accepted, "Update #{counter} did_accept should be #{should_have_accepted}" + assert_in_delta should_have_reached_level, level, 0.1 + } + + try_fillup.call(1.1, 0.0, false) # Oversized fillup must be refused outright + try_fillup.call(0.3, 0.3, true) + try_fillup.call(0.3, 0.6, true) + try_fillup.call(0.3, 0.9, true) + try_fillup.call(0.3, 0.9, false) # Would take the bucket to 1.2, so must be rejected + + sleep(0.2) # Leak out 0.2 tokens + + try_fillup.call(0.3, 1.0, true) + try_fillup.call(-2, 0.0, true) # A negative fillup is permitted since it will never take the bucket above capacity + try_fillup.call(1.0, 1.0, true) # Filling up in one step should be permitted + end + + def test_bucket_lifecycle_allows_conditional_fillup_after_leaking_out + key = random_key + capacity = 30 + leak_rate = capacity / 0.5 + + _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 29.6) + assert did_accept + + _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 1) + refute did_accept + + sleep 0.6 # Spend enough time to allow the bucket to leak out completely + _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 1) + assert did_accept, "Once the bucket has leaked out to 0 the fillup should be accepted again" + end + + def test_prune + key = random_key + capacity = 30 + leak_rate = capacity / 0.5 + + adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 29.6) + adapter.set_block(key: key, block_for: 0.5) + + sleep 0.65 + + # Both the leaky bucket and the block should have expired by now, and `prune` should not raise + adapter.prune + end + + def test_create_tables + # Calling this method should not raise + return unless ActiveRecord::Base.connection_established? + + ActiveRecord::Schema.define(version: 1) do |via_definer| + adapter.create_tables(via_definer) + end + end +end diff --git a/test/adapters/memory_adapter_test.rb b/test/adapters/memory_adapter_test.rb new file mode 100644 index 0000000..65065bc --- /dev/null +++ b/test/adapters/memory_adapter_test.rb @@ -0,0 +1,16 @@ +require_relative "../test_helper" +require_relative "adapter_test_methods" + +class MemoryAdapterTest < Minitest::Test + include AdapterTestMethods + + def setup + @adapter = Pecorino::Adapters::MemoryAdapter.new + super + end + + def teardown + @adapter = nil + super + end +end diff --git a/test/adapters/postgres_adapter_test.rb b/test/adapters/postgres_adapter_test.rb new file mode 100644 index 0000000..526ae26 --- /dev/null +++ b/test/adapters/postgres_adapter_test.rb @@ -0,0 +1,14 @@ +require_relative "../test_helper" +require_relative "adapter_test_methods" + +class PostgresAdapterTest < ActiveSupport::TestCase + include AdapterTestMethods + + setup do + create_postgres_database + @adapter = Pecorino::Adapters::PostgresAdapter.new(ActiveRecord::Base) + end + + teardown { drop_postgres_database } +end + diff --git a/test/adapters/sqlite_adapter_test.rb b/test/adapters/sqlite_adapter_test.rb new file mode 100644 index 0000000..65065bc --- /dev/null +++ b/test/adapters/sqlite_adapter_test.rb @@ -0,0 +1,16 @@ +require_relative "../test_helper" +require_relative "adapter_test_methods" + +class MemoryAdapterTest < Minitest::Test + include AdapterTestMethods + + def setup + @adapter = Pecorino::Adapters::MemoryAdapter.new + super + end + + def teardown + @adapter = nil + super + end +end From 8aa59843839b8ba5ad754a033e49c49535e1a12f Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 18:08:22 +0100 Subject: [PATCH 03/36] Add a memory adapter --- lib/pecorino/adapters/memory_adapter.rb | 101 ++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 lib/pecorino/adapters/memory_adapter.rb diff --git a/lib/pecorino/adapters/memory_adapter.rb b/lib/pecorino/adapters/memory_adapter.rb new file mode 100644 index 0000000..56a49a8 --- /dev/null +++ b/lib/pecorino/adapters/memory_adapter.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +# A memory store for leaky buckets and blocks +class Pecorino::Adapters::MemoryAdapter + def initialize + @buckets = {} + @blocks = {} + end + + # Returns the state of a leaky bucket. The state should be a tuple of two + # values: the current level (Float) and whether the bucket is now at capacity (Boolean) + def state(key:, capacity:, leak_rate:) + level, ts = @buckets[key] + return [0, false] unless level + + dt = get_mono_time - ts + level_after_leak = [0, level - (leak_rate * dt)].max + [level_after_leak.to_f, (level_after_leak - capacity) >= 0] + end + + # Adds tokens to the leaky bucket. The return value is a tuple of two + # values: the current level (Float) and whether the bucket is now at capacity (Boolean) + def add_tokens(key:, capacity:, leak_rate:, n_tokens:) + now = get_mono_time + level, ts, _ = @buckets[key] || [0.0, now] + + dt = now - ts + level_after_fillup = clamp(0, level - (leak_rate * dt) + n_tokens, capacity) + + expire_after = now + (level_after_fillup / leak_rate) + @buckets[key] = [level_after_fillup, now, expire_after] + + [level_after_fillup, (level_after_fillup - capacity) >= 0] + end + + # Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will + # be added. If there isn't - the fillup will be rejected. The return value is a triplet of + # the current level (Float), whether the bucket is now at capacity (Boolean) + # and whether the fillup was accepted (Boolean) + def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) + now = get_mono_time + level, ts, _ = @buckets[key] || [0.0, now] + + dt = now - ts + level_after_leak = clamp(0, level - (leak_rate * dt), capacity) + level_after_fillup = level_after_leak + n_tokens + if level_after_fillup > capacity + return [level_after_leak, level_after_leak >= capacity, _did_accept = false] + end + + clamped_level_after_fillup = clamp(0, level_after_fillup, capacity) + expire_after = now + (level_after_fillup / leak_rate) + @buckets[key] = [clamped_level_after_fillup, now, expire_after] + + [clamped_level_after_fillup, clamped_level_after_fillup == capacity, _did_accept = true] + end + + # Sets a timed block for the given key - this is used when a throttle fires. The return value + # is not defined - the call should always succeed. + def set_block(key:, block_for:) + now = get_mono_time + expire_at = now + block_for.to_f + @blocks[key] = expire_at + end + + # Returns the time until which a block for a given key is in effect. If there is no block in + # effect, the method should return `nil`. The return value is either a `Time` or `nil` + def blocked_until(key:) + blocked_until_monotonic = @blocks[key] + return unless blocked_until_monotonic + + now_monotonic = get_mono_time + return unless blocked_until_monotonic > now_monotonic + + Time.now + (blocked_until_monotonic - now_monotonic) + end + + # Deletes leaky buckets which have an expiry value prior to now and throttle blocks which have + # now lapsed + def prune + now_monotonic = get_mono_time + @blocks.delete_if {|_, blocked_until_monotonic| blocked_until_monotonic < now_monotonic } + @buckets.delete_if {|_, (_level, expire_at_monotonic)| expire_at_monotonic < now_monotonic } + end + + # No-op + def create_tables(active_record_schema) + end + + private + + def get_mono_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def clamp(min, value, max) + return min if value < min + return max if value > max + value + end +end From a43958e5a15ddf8a03b32cb05d7346ca70777711 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 18:10:19 +0100 Subject: [PATCH 04/36] Allow multiple adapters to be instantiated --- test/adapters/adapter_test_methods.rb | 7 +++++-- test/adapters/memory_adapter_test.rb | 12 +++--------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index 30d5c50..4388733 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -2,8 +2,11 @@ module AdapterTestMethods LEVEL_DELTA = 0.1 def adapter - return @adapter if @adapter - raise "Adapter test subclass needs to return an adapter implementation from here. If the object holds state, it should be the same during a test case." + @adapter ||= create_adapter + end + + def create_adapter + raise "Adapter test subclass needs to return an adapter implementation from here." end def random_key diff --git a/test/adapters/memory_adapter_test.rb b/test/adapters/memory_adapter_test.rb index 65065bc..dd6e69f 100644 --- a/test/adapters/memory_adapter_test.rb +++ b/test/adapters/memory_adapter_test.rb @@ -1,16 +1,10 @@ require_relative "../test_helper" require_relative "adapter_test_methods" -class MemoryAdapterTest < Minitest::Test +class MemoryAdapterTest < ActiveSupport::TestCase include AdapterTestMethods - def setup - @adapter = Pecorino::Adapters::MemoryAdapter.new - super - end - - def teardown - @adapter = nil - super + def create_adapter + Pecorino::Adapters::MemoryAdapter.new end end From 038f2be84e274ded52b84d04631bf37ca6407aae Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 18:11:50 +0100 Subject: [PATCH 05/36] Slowly moving on --- test/adapters/postgres_adapter_test.rb | 10 +++++----- test/adapters/sqlite_adapter_test.rb | 13 +++++-------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/test/adapters/postgres_adapter_test.rb b/test/adapters/postgres_adapter_test.rb index 526ae26..8a6212b 100644 --- a/test/adapters/postgres_adapter_test.rb +++ b/test/adapters/postgres_adapter_test.rb @@ -4,11 +4,11 @@ class PostgresAdapterTest < ActiveSupport::TestCase include AdapterTestMethods - setup do - create_postgres_database - @adapter = Pecorino::Adapters::PostgresAdapter.new(ActiveRecord::Base) - end - + setup { create_postgres_database } teardown { drop_postgres_database } + + def create_adapter + Pecorino::Adapters::PostgresAdapter.new(ActiveRecord::Base) + end end diff --git a/test/adapters/sqlite_adapter_test.rb b/test/adapters/sqlite_adapter_test.rb index 65065bc..2dc302a 100644 --- a/test/adapters/sqlite_adapter_test.rb +++ b/test/adapters/sqlite_adapter_test.rb @@ -1,16 +1,13 @@ require_relative "../test_helper" require_relative "adapter_test_methods" -class MemoryAdapterTest < Minitest::Test +class SqliteAdapterTest < ActiveSupport::TestCase include AdapterTestMethods - def setup - @adapter = Pecorino::Adapters::MemoryAdapter.new - super - end + setup { create_sqlite_db } + teardown { drop_sqlite_db } - def teardown - @adapter = nil - super + def create_adapter + Pecorino::Adapters::SqliteAdapter.new(ActiveRecord::Base) end end From 48a22d7e6afe7ae254081dfa231012eeb4d27ba4 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 18:23:13 +0100 Subject: [PATCH 06/36] We need some threading tests but this is not "it" --- test/adapters/adapter_test_methods.rb | 18 ++++++++++++++++++ test/adapters/postgres_adapter_test.rb | 3 +-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index 4388733..cdde594 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -207,4 +207,22 @@ def test_create_tables adapter.create_tables(via_definer) end end + + def xtest_should_accept_threadsafe_conditional_fillups + k = random_key + capacity = 30 + leak_rate = capacity / 0.5 + + threads = 3.times.map do + Thread.new do + 9.times do + adapter.add_tokens_conditionally(key: k, leak_rate: leak_rate, capacity: capacity, n_tokens: 1) + end + end + end + threads.map(&:join) + + level, is_full = adapter.state(key: k, capacity: capacity, leak_rate: leak_rate) + assert_in_delta level, (3 * 9), LEVEL_DELTA + end end diff --git a/test/adapters/postgres_adapter_test.rb b/test/adapters/postgres_adapter_test.rb index 8a6212b..1bd33a6 100644 --- a/test/adapters/postgres_adapter_test.rb +++ b/test/adapters/postgres_adapter_test.rb @@ -4,11 +4,10 @@ class PostgresAdapterTest < ActiveSupport::TestCase include AdapterTestMethods - setup { create_postgres_database } + setup { warn self.object_id; create_postgres_database } teardown { drop_postgres_database } def create_adapter Pecorino::Adapters::PostgresAdapter.new(ActiveRecord::Base) end end - From cf75a16a7e33564a967588cc0900f07073ec5a5c Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 18:32:39 +0100 Subject: [PATCH 07/36] Add stubs for locking in memory adapter --- lib/pecorino/adapters/memory_adapter.rb | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/lib/pecorino/adapters/memory_adapter.rb b/lib/pecorino/adapters/memory_adapter.rb index 56a49a8..da3d9a2 100644 --- a/lib/pecorino/adapters/memory_adapter.rb +++ b/lib/pecorino/adapters/memory_adapter.rb @@ -5,6 +5,8 @@ class Pecorino::Adapters::MemoryAdapter def initialize @buckets = {} @blocks = {} + @locked_keys = Set.new + @lock_mutex = Mutex.new end # Returns the state of a leaky bucket. The state should be a tuple of two @@ -21,6 +23,7 @@ def state(key:, capacity:, leak_rate:) # Adds tokens to the leaky bucket. The return value is a tuple of two # values: the current level (Float) and whether the bucket is now at capacity (Boolean) def add_tokens(key:, capacity:, leak_rate:, n_tokens:) + lock(key) now = get_mono_time level, ts, _ = @buckets[key] || [0.0, now] @@ -31,6 +34,8 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) @buckets[key] = [level_after_fillup, now, expire_after] [level_after_fillup, (level_after_fillup - capacity) >= 0] + ensure + unlock(key) end # Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will @@ -38,6 +43,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) # the current level (Float), whether the bucket is now at capacity (Boolean) # and whether the fillup was accepted (Boolean) def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) + lock(key) now = get_mono_time level, ts, _ = @buckets[key] || [0.0, now] @@ -53,14 +59,19 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) @buckets[key] = [clamped_level_after_fillup, now, expire_after] [clamped_level_after_fillup, clamped_level_after_fillup == capacity, _did_accept = true] + ensure + unlock(key) end # Sets a timed block for the given key - this is used when a throttle fires. The return value # is not defined - the call should always succeed. def set_block(key:, block_for:) + lock(key) now = get_mono_time expire_at = now + block_for.to_f @blocks[key] = expire_at + ensure + unlock(key) end # Returns the time until which a block for a given key is in effect. If there is no block in @@ -89,6 +100,12 @@ def create_tables(active_record_schema) private + def lock(key) + end + + def unlock(key) + end + def get_mono_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end From 4ae93ddeb0d996cd6089d85ce7f8cfd44f1cc4c7 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 18:53:52 +0100 Subject: [PATCH 08/36] Extract lock --- lib/pecorino/adapters/memory_adapter.rb | 47 +++++++++++++++++-------- test/adapters/adapter_test_methods.rb | 5 ++- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/lib/pecorino/adapters/memory_adapter.rb b/lib/pecorino/adapters/memory_adapter.rb index da3d9a2..24f6dd4 100644 --- a/lib/pecorino/adapters/memory_adapter.rb +++ b/lib/pecorino/adapters/memory_adapter.rb @@ -2,17 +2,42 @@ # A memory store for leaky buckets and blocks class Pecorino::Adapters::MemoryAdapter + class KeyedLock + def initialize + @locked_keys = Set.new + @lock_mutex = Mutex.new + end + + def lock(key) + loop do + @lock_mutex.synchronize do + next if @locked_keys.include?(key) + @locked_keys << key + return + end + end + end + + def unlock(key) + @lock_mutex.synchronize do + @locked_keys.delete(key) + end + end + end + def initialize @buckets = {} @blocks = {} - @locked_keys = Set.new - @lock_mutex = Mutex.new + @lock = KeyedLock.new end # Returns the state of a leaky bucket. The state should be a tuple of two # values: the current level (Float) and whether the bucket is now at capacity (Boolean) def state(key:, capacity:, leak_rate:) + @lock.lock(key) level, ts = @buckets[key] + @lock.unlock(key) + return [0, false] unless level dt = get_mono_time - ts @@ -23,7 +48,7 @@ def state(key:, capacity:, leak_rate:) # Adds tokens to the leaky bucket. The return value is a tuple of two # values: the current level (Float) and whether the bucket is now at capacity (Boolean) def add_tokens(key:, capacity:, leak_rate:, n_tokens:) - lock(key) + @lock.lock(key) now = get_mono_time level, ts, _ = @buckets[key] || [0.0, now] @@ -35,7 +60,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) [level_after_fillup, (level_after_fillup - capacity) >= 0] ensure - unlock(key) + @lock.unlock(key) end # Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will @@ -43,7 +68,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) # the current level (Float), whether the bucket is now at capacity (Boolean) # and whether the fillup was accepted (Boolean) def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) - lock(key) + @lock.lock(key) now = get_mono_time level, ts, _ = @buckets[key] || [0.0, now] @@ -60,18 +85,18 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) [clamped_level_after_fillup, clamped_level_after_fillup == capacity, _did_accept = true] ensure - unlock(key) + @lock.unlock(key) end # Sets a timed block for the given key - this is used when a throttle fires. The return value # is not defined - the call should always succeed. def set_block(key:, block_for:) - lock(key) + @lock.lock(key) now = get_mono_time expire_at = now + block_for.to_f @blocks[key] = expire_at ensure - unlock(key) + @lock.unlock(key) end # Returns the time until which a block for a given key is in effect. If there is no block in @@ -100,12 +125,6 @@ def create_tables(active_record_schema) private - def lock(key) - end - - def unlock(key) - end - def get_mono_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index cdde594..6136d0c 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -200,12 +200,11 @@ def test_prune end def test_create_tables - # Calling this method should not raise - return unless ActiveRecord::Base.connection_established? - ActiveRecord::Schema.define(version: 1) do |via_definer| adapter.create_tables(via_definer) end + rescue ActiveRecord::ConnectionNotEstablished + # This adapter does not require a connection end def xtest_should_accept_threadsafe_conditional_fillups From 349d38ab343daef5bd2c8ffdd1d4d90c33fac38c Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 19:03:27 +0100 Subject: [PATCH 09/36] Rename test --- test/{throttle_postgres_test.rb => throttle_test.rb} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename test/{throttle_postgres_test.rb => throttle_test.rb} (100%) diff --git a/test/throttle_postgres_test.rb b/test/throttle_test.rb similarity index 100% rename from test/throttle_postgres_test.rb rename to test/throttle_test.rb From 83568dc71812f7882e2bda2d9440af5b53899c18 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 19:03:53 +0100 Subject: [PATCH 10/36] Rename leaky bucket test --- test/leaky_bucket_sqlite_test.rb | 14 ------ ..._postgres_test.rb => leaky_bucket_test.rb} | 46 +++++++++---------- 2 files changed, 21 insertions(+), 39 deletions(-) delete mode 100644 test/leaky_bucket_sqlite_test.rb rename test/{leaky_bucket_postgres_test.rb => leaky_bucket_test.rb} (84%) diff --git a/test/leaky_bucket_sqlite_test.rb b/test/leaky_bucket_sqlite_test.rb deleted file mode 100644 index 17b80ba..0000000 --- a/test/leaky_bucket_sqlite_test.rb +++ /dev/null @@ -1,14 +0,0 @@ -# frozen_string_literal: true - -require "test_helper" -require_relative "leaky_bucket_postgres_test" - -class LeakyBucketSqliteTest < LeakyBucketPostgresTest - def setup - setup_sqlite_db - end - - def teardown - drop_sqlite_db - end -end diff --git a/test/leaky_bucket_postgres_test.rb b/test/leaky_bucket_test.rb similarity index 84% rename from test/leaky_bucket_postgres_test.rb rename to test/leaky_bucket_test.rb index 99bcd19..ede6b64 100644 --- a/test/leaky_bucket_postgres_test.rb +++ b/test/leaky_bucket_test.rb @@ -2,20 +2,16 @@ require "test_helper" -class LeakyBucketPostgresTest < ActiveSupport::TestCase - def setup - create_postgres_database - end - - def teardown - drop_postgres_database +class LeakyBucketTest < ActiveSupport::TestCase + def memory_adapter + @adapter ||= Pecorino::Adapters::MemoryAdapter.new end # This test is performed multiple times since time is involved, and there can be fluctuations # between the iterations 8.times do |n| test "on iteration #{n} accepts a certain number of tokens and returns the new bucket level" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1.1, capacity: 15) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1.1, capacity: 15, adapter: memory_adapter) assert_in_delta bucket.state.level, 0, 0.0001 state = bucket.fillup(20) @@ -34,44 +30,44 @@ def teardown end test "exposes the parameters via reader methods" do - bucket = Pecorino::LeakyBucket.new(key: "some-bk", leak_rate: 1.1, capacity: 15) + bucket = Pecorino::LeakyBucket.new(key: "some-bk", leak_rate: 1.1, capacity: 15, adapter: memory_adapter) assert_equal bucket.key, "some-bk" assert_equal bucket.leak_rate, 1.1 assert_equal bucket.capacity, 15.0 end test "translates over_time into an appropriate leak_rate at instantiation" do - throttle = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 10, capacity: 20) + throttle = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 10, capacity: 20, adapter: memory_adapter) assert_in_delta 2.0, throttle.leak_rate, 0.01 end test "tells whether it is able to accept a value which will bring it to capacity" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1, capacity: 3) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1, capacity: 3, adapter: memory_adapter) assert bucket.able_to_accept?(3) end test "allows either of leak_rate or over_time to be used" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1.1, capacity: 15) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1.1, capacity: 15, adapter: memory_adapter) bucket.fillup(20) sleep 0.2 assert_in_delta bucket.state.level, 14.77, 0.1 - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 13.6, capacity: 15) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 13.6, capacity: 15, adapter: memory_adapter) bucket.fillup(20) sleep 0.2 assert_in_delta bucket.state.level, 14.77, 0.1 assert_raises(ArgumentError) do - Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 13.6, leak_rate: 1.1, capacity: 15) + Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 13.6, leak_rate: 1.1, capacity: 15, adapter: memory_adapter) end assert_raises(ArgumentError) do - Pecorino::LeakyBucket.new(key: Random.uuid, capacity: 15) + Pecorino::LeakyBucket.new(key: Random.uuid, capacity: 15, adapter: memory_adapter) end end test "does not allow a bucket to be created with a negative value" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1.1, capacity: 15) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1.1, capacity: 15, adapter: memory_adapter) assert_in_delta bucket.state.level, 0, 0.0001 state = bucket.fillup(-10) @@ -79,7 +75,7 @@ def teardown end test "allows check for the bucket leaking out" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1.1, capacity: 15) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 1.1, capacity: 15, adapter: memory_adapter) assert_in_delta bucket.state.level, 0, 0.0001 state = bucket.fillup(10) @@ -91,7 +87,7 @@ def teardown end test "allows the bucket to leak out completely" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 2, capacity: 1) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, leak_rate: 2, capacity: 1, adapter: memory_adapter) assert_predicate bucket.fillup(1), :full? sleep(0.25) @@ -102,37 +98,37 @@ def teardown end test "with conditional fillup, allows a freshly created bucket to be filled to capacity with one call" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0, adapter: memory_adapter) assert bucket.fillup_conditionally(1.0).accepted? end test "with conditional fillup, refuses a fillup that would overflow" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0, adapter: memory_adapter) refute bucket.fillup_conditionally(1.1).accepted? end test "with conditional fillup, allows an existing bucket to be filled to capacity on the second call (INSERT vs. UPDATE)" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0, adapter: memory_adapter) bucket.fillup(0.0) # Ensure the bucket row gets created assert bucket.fillup_conditionally(1.0).accepted? end test "with conditional fillup, allows an existing bucket to be filled to capacity in a sequence of calls" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0, adapter: memory_adapter) assert bucket.fillup_conditionally(0.5).accepted? assert bucket.fillup_conditionally(0.5).accepted? refute bucket.fillup_conditionally(0.1).accepted? end test "with conditional fillup, allows an existing bucket to be filled close to capacity in a sequence of calls" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0, adapter: memory_adapter) assert bucket.fillup_conditionally(0.5).accepted? assert bucket.fillup_conditionally(0.4).accepted? refute bucket.fillup_conditionally(0.2).accepted? end test "allows conditional fillup" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 1.0, capacity: 1.0, adapter: memory_adapter) counter = 0 try_fillup = ->(fillup_by, should_have_reached_level, should_have_accepted) { @@ -156,7 +152,7 @@ def teardown end test "allows conditional fillup even if the bucket leaks out to 0 between calls" do - bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 0.5, capacity: 30) + bucket = Pecorino::LeakyBucket.new(key: Random.uuid, over_time: 0.5, capacity: 30, adapter: memory_adapter) assert bucket.fillup_conditionally(29.6).accepted? refute bucket.fillup_conditionally(1).accepted? sleep 0.6 # Spend enough time to allow the bucket to leak out completely From fe231c748e3abfc8f17efec0798c09055e18a4e3 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 20:37:39 +0100 Subject: [PATCH 11/36] Getting along --- lib/pecorino.rb | 3 +- lib/pecorino/adapters/memory_adapter.rb | 9 +++--- lib/pecorino/block.rb | 11 ++++--- lib/pecorino/leaky_bucket.rb | 10 +++--- lib/pecorino/throttle.rb | 13 +++++--- test/adapters/adapter_test_methods.rb | 41 +++++++++++++++++++++---- test/adapters/postgres_adapter_test.rb | 2 +- test/throttle_test.rb | 22 ++++++------- 8 files changed, 72 insertions(+), 39 deletions(-) diff --git a/lib/pecorino.rb b/lib/pecorino.rb index 5358d1c..6df191f 100644 --- a/lib/pecorino.rb +++ b/lib/pecorino.rb @@ -13,6 +13,7 @@ module Pecorino autoload :CachedThrottle, "pecorino/cached_throttle" module Adapters + autoload :MemoryAdapter, "pecorino/adapters/memory_adapter" autoload :DatabaseAdapter, "pecorino/adapters/database_adapter" autoload :PostgresAdapter, "pecorino/adapters/postgres_adapter" autoload :SqliteAdapter, "pecorino/adapters/sqlite_adapter" @@ -43,7 +44,7 @@ def self.create_tables(active_record_schema) # Returns the database implementation for setting the values atomically. Since the implementation # differs per database, this method will return a different adapter depending on which database is # being used - def self.adapter + def self.adapter # default_adapter_from_main_database model_class = ActiveRecord::Base adapter_name = model_class.connection.adapter_name case adapter_name diff --git a/lib/pecorino/adapters/memory_adapter.rb b/lib/pecorino/adapters/memory_adapter.rb index 24f6dd4..c214c07 100644 --- a/lib/pecorino/adapters/memory_adapter.rb +++ b/lib/pecorino/adapters/memory_adapter.rb @@ -92,9 +92,8 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) # is not defined - the call should always succeed. def set_block(key:, block_for:) @lock.lock(key) - now = get_mono_time - expire_at = now + block_for.to_f - @blocks[key] = expire_at + @blocks[key] = get_mono_time + block_for.to_f + Time.now + block_for.to_f ensure @lock.unlock(key) end @@ -115,8 +114,8 @@ def blocked_until(key:) # now lapsed def prune now_monotonic = get_mono_time - @blocks.delete_if {|_, blocked_until_monotonic| blocked_until_monotonic < now_monotonic } - @buckets.delete_if {|_, (_level, expire_at_monotonic)| expire_at_monotonic < now_monotonic } + @blocks.delete_if { |_, blocked_until_monotonic| blocked_until_monotonic < now_monotonic } + @buckets.delete_if { |_, (_level, expire_at_monotonic)| expire_at_monotonic < now_monotonic } end # No-op diff --git a/lib/pecorino/block.rb b/lib/pecorino/block.rb index 66a1c92..03dc3fc 100644 --- a/lib/pecorino/block.rb +++ b/lib/pecorino/block.rb @@ -8,17 +8,20 @@ class Pecorino::Block # # @param key[String] the key to set the block for # @param block_for[Float] the number of seconds or a time interval to block for + # @param adapter[Pecorino::Adapters::BaseAdapter] the adapter to set the value in. # @return [Time] the time when the block will be released - def self.set!(key:, block_for:) - Pecorino.adapter.set_block(key: key, block_for: block_for) + def self.set!(key:, block_for:, adapter: Pecorino.adapter) + adapter.set_block(key: key, block_for: block_for) Time.now + block_for end # Returns the time until a certain block is in effect # + # @param key[String] the key to get the expiry time for + # @param adapter[Pecorino::Adapters::BaseAdapter] the adapter to get the value from # @return [Time,nil] the time when the block will be released - def self.blocked_until(key:) - t = Pecorino.adapter.blocked_until(key: key) + def self.blocked_until(key:, adapter: Pecorino.adapter) + t = adapter.blocked_until(key: key) (t && t > Time.now) ? t : nil end end diff --git a/lib/pecorino/leaky_bucket.rb b/lib/pecorino/leaky_bucket.rb index f858aef..90dfcf2 100644 --- a/lib/pecorino/leaky_bucket.rb +++ b/lib/pecorino/leaky_bucket.rb @@ -90,12 +90,14 @@ def accepted? # the bucket contents will then be capped at this value. So with # bucket_capacity set to 12 and a `fillup(14)` the bucket will reach the level # of 12, and will then immediately start leaking again. - def initialize(key:, capacity:, leak_rate: nil, over_time: nil) + # @param adapter[Pecorino::Adapters::BaseAdapter] a compatible adapter + def initialize(key:, capacity:, adapter: Pecorino.adapter, leak_rate: nil, over_time: nil) raise ArgumentError, "Either leak_rate: or over_time: must be specified" if leak_rate.nil? && over_time.nil? raise ArgumentError, "Either leak_rate: or over_time: may be specified, but not both" if leak_rate && over_time @leak_rate = leak_rate || (capacity / over_time.to_f) @key = key @capacity = capacity.to_f + @adapter = adapter end # Places `n` tokens in the bucket. If the bucket has less capacity than `n` tokens, the bucket will be filled to capacity. @@ -109,7 +111,7 @@ def initialize(key:, capacity:, leak_rate: nil, over_time: nil) # @param n_tokens[Float] How many tokens to fillup by # @return [State] the state of the bucket after the operation def fillup(n_tokens) - capped_level_after_fillup, is_full = Pecorino.adapter.add_tokens(capacity: @capacity, key: @key, leak_rate: @leak_rate, n_tokens: n_tokens) + capped_level_after_fillup, is_full = @adapter.add_tokens(capacity: @capacity, key: @key, leak_rate: @leak_rate, n_tokens: n_tokens) State.new(capped_level_after_fillup, is_full) end @@ -131,7 +133,7 @@ def fillup(n_tokens) # @param n_tokens[Float] How many tokens to fillup by # @return [ConditionalFillupResult] the state of the bucket after the operation and whether the operation succeeded def fillup_conditionally(n_tokens) - capped_level_after_fillup, is_full, did_accept = Pecorino.adapter.add_tokens_conditionally(capacity: @capacity, key: @key, leak_rate: @leak_rate, n_tokens: n_tokens) + capped_level_after_fillup, is_full, did_accept = @adapter.add_tokens_conditionally(capacity: @capacity, key: @key, leak_rate: @leak_rate, n_tokens: n_tokens) ConditionalFillupResult.new(capped_level_after_fillup, is_full, did_accept) end @@ -140,7 +142,7 @@ def fillup_conditionally(n_tokens) # # @return [State] the snapshotted state of the bucket at time of query def state - current_level, is_full = Pecorino.adapter.state(key: @key, capacity: @capacity, leak_rate: @leak_rate) + current_level, is_full = @adapter.state(key: @key, capacity: @capacity, leak_rate: @leak_rate) State.new(current_level, is_full) end diff --git a/lib/pecorino/throttle.rb b/lib/pecorino/throttle.rb index a9ce060..35494b8 100644 --- a/lib/pecorino/throttle.rb +++ b/lib/pecorino/throttle.rb @@ -99,10 +99,13 @@ def retry_after # @param key[String] the key for both the block record and the leaky bucket # @param block_for[Numeric] the number of seconds to block any further requests for. Defaults to time it takes # the bucket to leak out to the level of 0 + # @param adapter[Pecorino::Adapters::BaseAdapter] a compatible adapter # @param leaky_bucket_options Options for `Pecorino::LeakyBucket.new` # @see PecorinoLeakyBucket.new - def initialize(key:, block_for: nil, **leaky_bucket_options) - @bucket = Pecorino::LeakyBucket.new(key: key, **leaky_bucket_options) + def initialize(key:, block_for: nil, adapter: Pecorino.adapter, **leaky_bucket_options) + @adapter = adapter + leaky_bucket_options.delete(:adapter) + @bucket = Pecorino::LeakyBucket.new(key: key, adapter: @adapter, **leaky_bucket_options) @key = key.to_s @block_for = block_for ? block_for.to_f : (@bucket.capacity / @bucket.leak_rate) end @@ -116,7 +119,7 @@ def initialize(key:, block_for: nil, **leaky_bucket_options) # @param n_tokens[Float] # @return [boolean] def able_to_accept?(n_tokens = 1) - Pecorino.adapter.blocked_until(key: @key).nil? && @bucket.able_to_accept?(n_tokens) + @adapter.blocked_until(key: @key).nil? && @bucket.able_to_accept?(n_tokens) end # Register that a request is being performed. Will raise Throttled @@ -156,7 +159,7 @@ def request!(n = 1) # # @return [State] the state of the throttle after filling up the leaky bucket / trying to pass the block def request(n = 1) - existing_blocked_until = Pecorino::Block.blocked_until(key: @key) + existing_blocked_until = Pecorino::Block.blocked_until(key: @key, adapter: @adapter) return State.new(existing_blocked_until.utc) if existing_blocked_until # Topup the leaky bucket, and if the topup gets rejected - block the caller @@ -165,7 +168,7 @@ def request(n = 1) State.new(nil) else # and set the block if the fillup was rejected - fresh_blocked_until = Pecorino::Block.set!(key: @key, block_for: @block_for) + fresh_blocked_until = Pecorino::Block.set!(key: @key, block_for: @block_for, adapter: @adapter) State.new(fresh_blocked_until.utc) end end diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index 6136d0c..f84d06e 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -106,11 +106,11 @@ def test_bucket_add_tokens_conditionally_accepts_multiple_fillups_to_capacity leak_rate = 2 capacity = 1 - level, is_full, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.5) + level, _, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.5) assert_in_delta level, 0.5, LEVEL_DELTA assert_equal did_accept, true - level, is_full, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.5) + level, _, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.5) assert_in_delta level, 1.0, LEVEL_DELTA assert_equal did_accept, true end @@ -139,7 +139,7 @@ def test_bucket_lifecycle_rejects_conditional_fillup_that_would_overflow level, is_full, did_accept = adapter.add_tokens_conditionally(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.3) assert_in_delta level, 0.9, LEVEL_DELTA - assert_equal is_full, false + assert_equal is_full, false assert_equal did_accept, false end @@ -151,7 +151,7 @@ def test_bucket_lifecycle_handles_conditional_fillup_in_steps counter = 0 try_fillup = ->(fillup_by, should_have_reached_level, should_have_accepted) { counter += 1 - level, became_full, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: fillup_by) + level, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: fillup_by) assert_equal did_accept, should_have_accepted, "Update #{counter} did_accept should be #{should_have_accepted}" assert_in_delta should_have_reached_level, level, 0.1 } @@ -182,7 +182,36 @@ def test_bucket_lifecycle_allows_conditional_fillup_after_leaking_out sleep 0.6 # Spend enough time to allow the bucket to leak out completely _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 1) - assert did_accept, "Once the bucket has leaked out to 0 the fillup should be accepted again" + assert did_accept, "Once the bucket has leaked out to 0 the fillup should be accepted again" + end + + def test_set_block_sets_a_block + key = random_key + now = Time.now.utc + block_duration_s = 2.2 + + assert_nil adapter.blocked_until(key: key) + + set_block_result = adapter.set_block(key: key, block_for: block_duration_s) + assert_kind_of Time, set_block_result + assert_in_delta now + block_duration_s, set_block_result, 0.1 + + blocked_until = adapter.blocked_until(key: key) + assert_in_delta blocked_until, set_block_result, 0.1 + end + + def test_set_block_does_not_set_block_in_the_past + key = random_key + now = Time.now.utc + block_duration_s = -20 + + assert_nil adapter.blocked_until(key: key) + set_block_result = adapter.set_block(key: key, block_for: block_duration_s) + assert_kind_of Time, set_block_result + assert_in_delta now + block_duration_s, set_block_result, 0.1 + + blocked_until = adapter.blocked_until(key: key) + assert_nil blocked_until end def test_prune @@ -221,7 +250,7 @@ def xtest_should_accept_threadsafe_conditional_fillups end threads.map(&:join) - level, is_full = adapter.state(key: k, capacity: capacity, leak_rate: leak_rate) + level, _ = adapter.state(key: k, capacity: capacity, leak_rate: leak_rate) assert_in_delta level, (3 * 9), LEVEL_DELTA end end diff --git a/test/adapters/postgres_adapter_test.rb b/test/adapters/postgres_adapter_test.rb index 1bd33a6..273480b 100644 --- a/test/adapters/postgres_adapter_test.rb +++ b/test/adapters/postgres_adapter_test.rb @@ -4,7 +4,7 @@ class PostgresAdapterTest < ActiveSupport::TestCase include AdapterTestMethods - setup { warn self.object_id; create_postgres_database } + setup { create_postgres_database } teardown { drop_postgres_database } def create_adapter diff --git a/test/throttle_test.rb b/test/throttle_test.rb index 6025e18..e2c2b21 100644 --- a/test/throttle_test.rb +++ b/test/throttle_test.rb @@ -2,17 +2,13 @@ require "test_helper" -class ThrottlePostgresTest < ActiveSupport::TestCase - def setup - create_postgres_database - end - - def teardown - drop_postgres_database +class ThrottleTest < ActiveSupport::TestCase + def memory_adapter + @adapter ||= Pecorino::Adapters::MemoryAdapter.new end test "request! installs a block and then removes it and communicates the block using exceptions" do - throttle = Pecorino::Throttle.new(key: Random.uuid, over_time: 1.0, capacity: 30) + throttle = Pecorino::Throttle.new(key: Random.uuid, over_time: 1.0, capacity: 30, adapter: memory_adapter) state_after_first_request = throttle.request! assert_kind_of Pecorino::Throttle::State, state_after_first_request @@ -42,12 +38,12 @@ def teardown test "allows the block_for parameter to be omitted" do assert_nothing_raised do - Pecorino::Throttle.new(key: Random.uuid, over_time: 1, capacity: 30) + Pecorino::Throttle.new(key: Random.uuid, over_time: 1, capacity: 30, adapter: memory_adapter) end end test "still throttles using request() without raising exceptions" do - throttle = Pecorino::Throttle.new(key: Random.uuid, leak_rate: 30, capacity: 30, block_for: 3) + throttle = Pecorino::Throttle.new(key: Random.uuid, leak_rate: 30, capacity: 30, block_for: 3, adapter: memory_adapter) 20.times do state = throttle.request @@ -76,7 +72,7 @@ def teardown end test "able_to_accept? returns the prediction whether the throttle will accept" do - throttle = Pecorino::Throttle.new(key: Random.uuid, leak_rate: 30, capacity: 30, block_for: 2) + throttle = Pecorino::Throttle.new(key: Random.uuid, leak_rate: 30, capacity: 30, block_for: 2, adapter: memory_adapter) assert throttle.able_to_accept? assert throttle.able_to_accept?(29) @@ -93,7 +89,7 @@ def teardown end test "starts to throttle sooner with a higher fillup rate" do - throttle = Pecorino::Throttle.new(key: Random.uuid, leak_rate: 30, capacity: 30, block_for: 3) + throttle = Pecorino::Throttle.new(key: Random.uuid, leak_rate: 30, capacity: 30, block_for: 3, adapter: memory_adapter) 15.times do throttle.request!(2) @@ -108,7 +104,7 @@ def teardown end test "throttled() calls the block just once" do - throttle = Pecorino::Throttle.new(key: Random.uuid, over_time: 1.minute, capacity: 1) + throttle = Pecorino::Throttle.new(key: Random.uuid, over_time: 60, capacity: 1, adapter: memory_adapter) counter = 0 From 6c0e529acae37cc2ca4c7f0e072d83bdfd654077 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 20:52:37 +0100 Subject: [PATCH 12/36] Even NOW() is different --- lib/pecorino/adapters/postgres_adapter.rb | 5 +++ lib/pecorino/adapters/sqlite_adapter.rb | 6 ++++ test/adapters/adapter_test_methods.rb | 1 + test/adapters/postgres_adapter_test.rb | 20 ++++++++++++ test/adapters/sqlite_adapter_test.rb | 20 ++++++++++++ test/test_helper.rb | 39 ----------------------- test/throttle_sqlite_test.rb | 13 -------- 7 files changed, 52 insertions(+), 52 deletions(-) delete mode 100644 test/throttle_sqlite_test.rb diff --git a/lib/pecorino/adapters/postgres_adapter.rb b/lib/pecorino/adapters/postgres_adapter.rb index 721c0ee..5e86359 100644 --- a/lib/pecorino/adapters/postgres_adapter.rb +++ b/lib/pecorino/adapters/postgres_adapter.rb @@ -163,4 +163,9 @@ def blocked_until(key:) SQL model_class.connection.uncached { model_class.connection.select_value(block_check_query) } end + + def prune + model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()") + model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()") + end end diff --git a/lib/pecorino/adapters/sqlite_adapter.rb b/lib/pecorino/adapters/sqlite_adapter.rb index a56528e..0035391 100644 --- a/lib/pecorino/adapters/sqlite_adapter.rb +++ b/lib/pecorino/adapters/sqlite_adapter.rb @@ -190,4 +190,10 @@ def blocked_until(key:) blocked_until_s = model_class.connection.uncached { model_class.connection.select_value(block_check_query) } blocked_until_s && Time.at(blocked_until_s) end + + def prune + now_s = Time.now.to_f + model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < ?", now_s) + model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < ?", now_s) + end end diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index f84d06e..33f1864 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -229,6 +229,7 @@ def test_prune end def test_create_tables + adapter = create_adapter # Has to be in local scope ActiveRecord::Schema.define(version: 1) do |via_definer| adapter.create_tables(via_definer) end diff --git a/test/adapters/postgres_adapter_test.rb b/test/adapters/postgres_adapter_test.rb index 273480b..4674adc 100644 --- a/test/adapters/postgres_adapter_test.rb +++ b/test/adapters/postgres_adapter_test.rb @@ -10,4 +10,24 @@ class PostgresAdapterTest < ActiveSupport::TestCase def create_adapter Pecorino::Adapters::PostgresAdapter.new(ActiveRecord::Base) end + + def create_postgres_database + seed_db_name = Random.new(Minitest.seed).hex(4) + ActiveRecord::Migration.verbose = false + ActiveRecord::Base.establish_connection(adapter: "postgresql", database: "postgres") + ActiveRecord::Base.connection.create_database("pecorino_tests_%s" % seed_db_name, charset: :unicode) + ActiveRecord::Base.connection.close + ActiveRecord::Base.establish_connection(adapter: "postgresql", encoding: "unicode", database: "pecorino_tests_%s" % seed_db_name) + + ActiveRecord::Schema.define(version: 1) do |via_definer| + Pecorino.create_tables(via_definer) + end + end + + def drop_postgres_database + seed_db_name = Random.new(Minitest.seed).hex(4) + ActiveRecord::Base.connection.close + ActiveRecord::Base.establish_connection(adapter: "postgresql", database: "postgres") + ActiveRecord::Base.connection.drop_database("pecorino_tests_%s" % seed_db_name) + end end diff --git a/test/adapters/sqlite_adapter_test.rb b/test/adapters/sqlite_adapter_test.rb index 2dc302a..6ee0ebe 100644 --- a/test/adapters/sqlite_adapter_test.rb +++ b/test/adapters/sqlite_adapter_test.rb @@ -7,6 +7,26 @@ class SqliteAdapterTest < ActiveSupport::TestCase setup { create_sqlite_db } teardown { drop_sqlite_db } + def create_sqlite_db + ActiveRecord::Migration.verbose = false + ActiveRecord::Base.establish_connection(adapter: "sqlite3", database: test_db_filename) + + ActiveRecord::Schema.define(version: 1) do |via_definer| + Pecorino.create_tables(via_definer) + end + end + + def drop_sqlite_db + ActiveRecord::Base.connection.close + FileUtils.rm_rf(test_db_filename) + FileUtils.rm_rf(test_db_filename + "-wal") + FileUtils.rm_rf(test_db_filename + "-shm") + end + + def test_db_filename + "pecorino_tests_%s.sqlite3" % Random.new(Minitest.seed).hex(4) + end + def create_adapter Pecorino::Adapters::SqliteAdapter.new(ActiveRecord::Base) end diff --git a/test/test_helper.rb b/test/test_helper.rb index b0b69d3..f2b5757 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -9,43 +9,4 @@ require "active_record" class ActiveSupport::TestCase - def setup_sqlite_db - ActiveRecord::Migration.verbose = false - ActiveRecord::Base.establish_connection(adapter: "sqlite3", database: test_db_filename) - - ActiveRecord::Schema.define(version: 1) do |via_definer| - Pecorino.create_tables(via_definer) - end - end - - def drop_sqlite_db - ActiveRecord::Base.connection.close - FileUtils.rm_rf(test_db_filename) - FileUtils.rm_rf(test_db_filename + "-wal") - FileUtils.rm_rf(test_db_filename + "-shm") - end - - def test_db_filename - "pecorino_tests_%s.sqlite3" % Random.new(Minitest.seed).hex(4) - end - - def create_postgres_database - seed_db_name = Random.new(Minitest.seed).hex(4) - ActiveRecord::Migration.verbose = false - ActiveRecord::Base.establish_connection(adapter: "postgresql", database: "postgres") - ActiveRecord::Base.connection.create_database("pecorino_tests_%s" % seed_db_name, charset: :unicode) - ActiveRecord::Base.connection.close - ActiveRecord::Base.establish_connection(adapter: "postgresql", encoding: "unicode", database: "pecorino_tests_%s" % seed_db_name) - - ActiveRecord::Schema.define(version: 1) do |via_definer| - Pecorino.create_tables(via_definer) - end - end - - def drop_postgres_database - seed_db_name = Random.new(Minitest.seed).hex(4) - ActiveRecord::Base.connection.close - ActiveRecord::Base.establish_connection(adapter: "postgresql", database: "postgres") - ActiveRecord::Base.connection.drop_database("pecorino_tests_%s" % seed_db_name) - end end diff --git a/test/throttle_sqlite_test.rb b/test/throttle_sqlite_test.rb deleted file mode 100644 index 4da8430..0000000 --- a/test/throttle_sqlite_test.rb +++ /dev/null @@ -1,13 +0,0 @@ -# frozen_string_literal: true - -require "test_helper" - -class ThrottleSqliteTest < ThrottlePostgresTest - def setup - setup_sqlite_db - end - - def teardown - drop_sqlite_db - end -end From eccd557dbd16452b1ad187cd23c87572934d84b8 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 23:01:18 +0100 Subject: [PATCH 13/36] Seems to work. Mostly. --- lib/pecorino.rb | 1 + lib/pecorino/adapters/redis_adapter.rb | 86 ++++++++++++++++++ .../add_tokens_conditionally.lua | 89 +++++++++++++++++++ pecorino.gemspec | 1 + test/adapters/redis_adapter_test.rb | 21 +++++ 5 files changed, 198 insertions(+) create mode 100644 lib/pecorino/adapters/redis_adapter.rb create mode 100644 lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua create mode 100644 test/adapters/redis_adapter_test.rb diff --git a/lib/pecorino.rb b/lib/pecorino.rb index 6df191f..f4c685d 100644 --- a/lib/pecorino.rb +++ b/lib/pecorino.rb @@ -17,6 +17,7 @@ module Adapters autoload :DatabaseAdapter, "pecorino/adapters/database_adapter" autoload :PostgresAdapter, "pecorino/adapters/postgres_adapter" autoload :SqliteAdapter, "pecorino/adapters/sqlite_adapter" + autoload :RedisAdapter, "pecorino/adapters/redis_adapter" end # Deletes stale leaky buckets and blocks which have expired. Run this method regularly to diff --git a/lib/pecorino/adapters/redis_adapter.rb b/lib/pecorino/adapters/redis_adapter.rb new file mode 100644 index 0000000..93967e0 --- /dev/null +++ b/lib/pecorino/adapters/redis_adapter.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +require_relative "base_adapter" +require "digest" +require "redis" + +# An adapter allows Pecorino throttles, leaky buckets and other +# resources to interfact to a data storage backend - a database, usually. +class Pecorino::Adapters::RedisAdapter < Pecorino::Adapters::BaseAdapter + class RedisScript + def initialize(script_filename) + @script_body = File.read(File.dirname(__FILE__) + "/redis_adapter/" + script_filename) + @sha = Digest::SHA1.hexdigest(@script_body) + end + + def load_and_eval(redis, keys, argv) + redis.evalsha(@sha, keys: keys, argv: argv) + rescue Redis::CommandError => e + if e.message.include? "NOSCRIPT" + # The Redis server has never seen this script before. Needs to run only once in the entire lifetime + # of the Redis server, until the script changes - in which case it will be loaded under a different SHA + redis.script(:load, @script_body) + retry + else + raise e + end + end + end + + ADD_TOKENS_SCRIPT = RedisScript.new("add_tokens_conditionally.lua") + + def initialize(redis_connection_or_connection_pool, key_prefix: "pecorino") + @redis_pool = redis_connection_or_connection_pool + @key_prefix = key_prefix + end + + # Returns the state of a leaky bucket. The state should be a tuple of two + # values: the current level (Float) and whether the bucket is now at capacity (Boolean) + def state(key:, capacity:, leak_rate:) + add_tokens(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 0) + end + + # Adds tokens to the leaky bucket. The return value is a tuple of two + # values: the current level (Float) and whether the bucket is now at capacity (Boolean) + def add_tokens(key:, capacity:, leak_rate:, n_tokens:) + keys = ["#{@key_prefix}:leaky_bucket:#{key}:level", "#{@key_prefix}:leaky_bucket:#{key}:last_touched"] + argv = [leak_rate, n_tokens, capacity, _conditional = 0] + decimal_float_level, at_capacity_int, _ = with_redis do |redis| + ADD_TOKENS_SCRIPT.load_and_eval(redis, keys, argv) + end + [decimal_float_level.to_f, at_capacity_int == 1] + end + + # Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will + # be added. If there isn't - the fillup will be rejected. The return value is a triplet of + # the current level (Float), whether the bucket is now at capacity (Boolean) + # and whether the fillup was accepted (Boolean) + def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) + keys = ["#{@key_prefix}:leaky_bucket:#{key}:level", "#{@key_prefix}:leaky_bucket:#{key}:last_touched"] + argv = [leak_rate, n_tokens, capacity, _conditional = 1] + decimal_float_level, at_capacity_int, did_accept_int = with_redis do |redis| + ADD_TOKENS_SCRIPT.load_and_eval(redis, keys, argv) + end + [decimal_float_level.to_f, at_capacity_int == 1, did_accept_int == 1] + end + + # Sets a timed block for the given key - this is used when a throttle fires. The return value + # is not defined - the call should always succeed. + def set_block(key:, block_for:) + end + + # Returns the time until which a block for a given key is in effect. If there is no block in + # effect, the method should return `nil`. The return value is either a `Time` or `nil` + def blocked_until(key:) + end + + private + + def with_redis + if @redis_pool.respond_to?(:with) + @redis_pool.with {|conn| yield(conn) } + else + yield @redis_pool + end + end +end diff --git a/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua b/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua new file mode 100644 index 0000000..e9cb48e --- /dev/null +++ b/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua @@ -0,0 +1,89 @@ +-- Single threaded Leaky Bucket implementation (without blocking). +-- args: key_base, leak_rate, bucket_ttl, fillup. To just verify the state of the bucket leak_rate of 0 may be passed. +-- returns: the leve of the bucket in number of tokens + +-- this is required to be able to use TIME and writes; basically it lifts the script into IO +redis.replicate_commands() + +-- Redis documentation recommends passing the keys separately so that Redis +-- can - in the future - verify that they live on the same shard of a cluster, and +-- raise an error if they are not. As far as can be understood this functionality is not +-- yet present, but if we can make a little effort to make ourselves more future proof +-- we should. +local bucket_level_key = KEYS[1] +local last_updated_key = KEYS[2] + +local leak_rate = tonumber(ARGV[1]) +local fillup = tonumber(ARGV[2]) -- How many tokens this call adds to the bucket. +local bucket_capacity = tonumber(ARGV[3]) -- How many tokens is the bucket allowed to contain +local conditional_fillup = tonumber(ARGV[4]) -- Whether to fillup conditionally + +-- Compute the key TTL for the bucket. We are interested in how long it takes the bucket +-- to leak all the way to 0, as this is the time when the values stay relevant. We pad with 1 second +-- to have a little cushion. +local key_lifetime = math.ceil((bucket_capacity / leak_rate) + 1) + +-- Take a timestamp +local redis_time = redis.call("TIME") -- Array of [seconds, microseconds] +local now = tonumber(redis_time[1]) + (tonumber(redis_time[2]) / 1000000) + +-- get current bucket level. The throttle key might not exist yet in which +-- case we default to 0 +local bucket_level = tonumber(redis.call("GET", bucket_level_key)) or 0 + +-- ...and then perform the leaky bucket fillup/leak. We need to do this also when the bucket has +-- just been created because the initial fillup to add might be so high that it will +-- immediately overflow the bucket and trigger the throttle, on the first call. +local last_updated = tonumber(redis.call("GET", last_updated_key)) or now -- use sensible default of 'now' if the key does not exist + +-- Subtract the number of tokens leaked since last call +local dt = now - last_updated +local bucket_level_after_leaking = math.max(0, math.min(bucket_level - (leak_rate * dt), bucket_capacity)) +local bucket_level_after_fillup = bucket_level_after_leaking + fillup +local did_accept = 0 + +-- Figure out whether the fillup would overflow the bucket +if conditional_fillup and bucket_level_after_fillup > bucket_capacity then + local at_capacity = bucket_level_after_leaking >= bucket_capacity + -- See below about string return + return {string.format("%.9f", bucket_level_after_leaking), at_capacity, did_accept} +end + +-- and _then_ and add the tokens we fillup with. Cap the value to be 0 < capacity +local new_bucket_level = math.max(0, math.min(bucket_capacity, bucket_level_after_fillup)) + +-- Since we return a floating point number string-formatted even if the bucket is full we +-- have some loss of precision in the formatting, even if the bucket was actually full. +-- This bit of information is useful to preserve. +local at_capacity = 0 +if new_bucket_level == bucket_capacity then + at_capacity = 1 +end + +did_accept = 1 + +-- If both the initial level was 0, and the level after putting tokens in is 0 we +-- can avoid setting keys in Redis at all as this was only a level check. +if new_bucket_level == 0 and bucket_level == 0 then + return {"0.0", at_capacity, did_accept} +end + +-- Save the new bucket level +redis.call("SETEX", bucket_level_key, key_lifetime, new_bucket_level) + +-- Record when we updated the bucket so that the amount of tokens leaked +-- can be correctly determined on the next invocation +redis.call("SETEX", last_updated_key, key_lifetime, now) + +-- Most Redis adapters when used with the Lua interface truncate floats +-- to integers (at least in Python that is documented to be the case in +-- the Redis ebook here +-- https://redislabs.com/ebook/part-3-next-steps/chapter-11-scripting-redis-with-lua/11-1-adding-functionality-without-writing-c +-- We need access to the bucket level as a float value since our leak rate might as well be floating point, and to achieve that +-- we can go two ways. We can turn the float into a Lua string, and then parse it on the other side, or we can convert it to +-- a tuple of two integer values - one for the integer component and one for fraction. +-- Now, the unpleasant aspect is that when we do this we will lose precision - the number is not going to be +-- exactly equal to capacity, thus we lose the bit of information which tells us whether we filled up the bucket or not. +-- Also since the only moment we can register whether the bucket is above capacity is now - in this script, since +-- by the next call some tokens will have leaked. +return {string.format("%.9f", new_bucket_level), at_capacity, did_accept} diff --git a/pecorino.gemspec b/pecorino.gemspec index 074b260..60d38b7 100644 --- a/pecorino.gemspec +++ b/pecorino.gemspec @@ -39,6 +39,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "standard" spec.add_development_dependency "magic_frozen_string_literal" spec.add_development_dependency "minitest-fail-fast" + spec.add_development_dependency "redis", "~> 5", "< 6" # For more information and examples about making a new gem, checkout our # guide at: https://bundler.io/guides/creating_gem.html diff --git a/test/adapters/redis_adapter_test.rb b/test/adapters/redis_adapter_test.rb new file mode 100644 index 0000000..2944f19 --- /dev/null +++ b/test/adapters/redis_adapter_test.rb @@ -0,0 +1,21 @@ +require_relative "../test_helper" +require_relative "adapter_test_methods" + +class RedisAdapterTest < ActiveSupport::TestCase + include AdapterTestMethods + + teardown { delete_created_keys } + + def create_adapter + Pecorino::Adapters::RedisAdapter.new(Redis.new, key_prefix: key_prefix) + end + + def key_prefix + "pecorino-test" + Random.new(Minitest.seed).bytes(4) + end + + def delete_created_keys + r = Redis.new + r.del(r.keys(key_prefix + "*")) + end +end From 1d67a321fc45deb1c36f72e77addef93da01ded1 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Sun, 25 Feb 2024 23:16:57 +0100 Subject: [PATCH 14/36] And one more done --- lib/pecorino/adapters/redis_adapter.rb | 10 ++++++++++ .../redis_adapter/add_tokens_conditionally.lua | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/lib/pecorino/adapters/redis_adapter.rb b/lib/pecorino/adapters/redis_adapter.rb index 93967e0..7934c1c 100644 --- a/lib/pecorino/adapters/redis_adapter.rb +++ b/lib/pecorino/adapters/redis_adapter.rb @@ -67,11 +67,21 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) # Sets a timed block for the given key - this is used when a throttle fires. The return value # is not defined - the call should always succeed. def set_block(key:, block_for:) + return unless block_for > 0 + blocked_until = Time.now + block_for + with_redis do |r| + r.setex("#{@key_prefix}:leaky_bucket:#{key}:block", block_for.to_f.ceil, blocked_until.to_f) + end + blocked_until end # Returns the time until which a block for a given key is in effect. If there is no block in # effect, the method should return `nil`. The return value is either a `Time` or `nil` def blocked_until(key:) + seconds_from_epoch = with_redis do |r| + r.get("#{@key_prefix}:leaky_bucket:#{key}:block") + end + Time.at(seconds_from_epoch.to_f).utc end private diff --git a/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua b/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua index e9cb48e..b3e7582 100644 --- a/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua +++ b/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua @@ -43,7 +43,7 @@ local bucket_level_after_fillup = bucket_level_after_leaking + fillup local did_accept = 0 -- Figure out whether the fillup would overflow the bucket -if conditional_fillup and bucket_level_after_fillup > bucket_capacity then +if conditional_fillup == 1 and bucket_level_after_fillup > bucket_capacity then local at_capacity = bucket_level_after_leaking >= bucket_capacity -- See below about string return return {string.format("%.9f", bucket_level_after_leaking), at_capacity, did_accept} From c84453f67323515651dc4033b17e54c20bbe6e56 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Mon, 26 Feb 2024 01:29:39 +0100 Subject: [PATCH 15/36] Disallow block_for <= 0 --- lib/pecorino/adapters/memory_adapter.rb | 1 + lib/pecorino/adapters/postgres_adapter.rb | 1 + lib/pecorino/adapters/redis_adapter.rb | 3 ++- lib/pecorino/adapters/sqlite_adapter.rb | 1 + test/adapters/adapter_test_methods.rb | 16 ++++++++-------- 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/pecorino/adapters/memory_adapter.rb b/lib/pecorino/adapters/memory_adapter.rb index c214c07..c3e50d3 100644 --- a/lib/pecorino/adapters/memory_adapter.rb +++ b/lib/pecorino/adapters/memory_adapter.rb @@ -91,6 +91,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) # Sets a timed block for the given key - this is used when a throttle fires. The return value # is not defined - the call should always succeed. def set_block(key:, block_for:) + raise ArgumentError, "block_for must be positive" unless block_for > 0 @lock.lock(key) @blocks[key] = get_mono_time + block_for.to_f Time.now + block_for.to_f diff --git a/lib/pecorino/adapters/postgres_adapter.rb b/lib/pecorino/adapters/postgres_adapter.rb index 5e86359..e55521f 100644 --- a/lib/pecorino/adapters/postgres_adapter.rb +++ b/lib/pecorino/adapters/postgres_adapter.rb @@ -144,6 +144,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) end def set_block(key:, block_for:) + raise ArgumentError, "block_for must be positive" unless block_for > 0 query_params = {key: key.to_s, block_for: block_for.to_f} block_set_query = model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_blocks AS t diff --git a/lib/pecorino/adapters/redis_adapter.rb b/lib/pecorino/adapters/redis_adapter.rb index 7934c1c..cf43479 100644 --- a/lib/pecorino/adapters/redis_adapter.rb +++ b/lib/pecorino/adapters/redis_adapter.rb @@ -67,7 +67,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) # Sets a timed block for the given key - this is used when a throttle fires. The return value # is not defined - the call should always succeed. def set_block(key:, block_for:) - return unless block_for > 0 + raise ArgumentError, "block_for must be positive" unless block_for > 0 blocked_until = Time.now + block_for with_redis do |r| r.setex("#{@key_prefix}:leaky_bucket:#{key}:block", block_for.to_f.ceil, blocked_until.to_f) @@ -81,6 +81,7 @@ def blocked_until(key:) seconds_from_epoch = with_redis do |r| r.get("#{@key_prefix}:leaky_bucket:#{key}:block") end + return unless seconds_from_epoch Time.at(seconds_from_epoch.to_f).utc end diff --git a/lib/pecorino/adapters/sqlite_adapter.rb b/lib/pecorino/adapters/sqlite_adapter.rb index 0035391..d56fe85 100644 --- a/lib/pecorino/adapters/sqlite_adapter.rb +++ b/lib/pecorino/adapters/sqlite_adapter.rb @@ -163,6 +163,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) end def set_block(key:, block_for:) + raise ArgumentError, "block_for must be positive" unless block_for > 0 query_params = {id: SecureRandom.uuid, key: key.to_s, block_for: block_for.to_f, now_s: Time.now.to_f} block_set_query = model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_blocks AS t diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index 33f1864..a58a374 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -202,16 +202,16 @@ def test_set_block_sets_a_block def test_set_block_does_not_set_block_in_the_past key = random_key - now = Time.now.utc - block_duration_s = -20 - assert_nil adapter.blocked_until(key: key) - set_block_result = adapter.set_block(key: key, block_for: block_duration_s) - assert_kind_of Time, set_block_result - assert_in_delta now + block_duration_s, set_block_result, 0.1 + assert_raise(ArgumentError) { adapter.set_block(key: key, block_for: -20) } + assert_nil adapter.blocked_until(key: key) + end - blocked_until = adapter.blocked_until(key: key) - assert_nil blocked_until + def test_set_block_does_not_set_block_which_would_expire_immediately + key = random_key + assert_nil adapter.blocked_until(key: key) + assert_raise(ArgumentError) { adapter.set_block(key: key, block_for: 0) } + assert_nil adapter.blocked_until(key: key) end def test_prune From bf36b419a144273a6c15f9afe8be79ec372e0eb2 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Mon, 26 Feb 2024 02:01:44 +0100 Subject: [PATCH 16/36] Use memory adapter for cached throttle test --- test/cached_throttle_test.rb | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/test/cached_throttle_test.rb b/test/cached_throttle_test.rb index 434c875..e1f3040 100644 --- a/test/cached_throttle_test.rb +++ b/test/cached_throttle_test.rb @@ -3,17 +3,13 @@ require "test_helper" class CachedThrottleTest < ActiveSupport::TestCase - def setup - create_postgres_database - end - - def teardown - drop_postgres_database + def adapter + @adapter ||= Pecorino::Adapters::MemoryAdapter.new end test "caches results of request! and correctly raises Throttled until the block is lifted" do store = ActiveSupport::Cache::MemoryStore.new - throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1.second, block_for: 10.seconds) + throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1, block_for: 10, adapter: adapter) cached_throttle = Pecorino::CachedThrottle.new(store, throttle) state1 = cached_throttle.request! @@ -38,7 +34,7 @@ class << throttle test "caches results of able_to_accept? until the block is lifted" do store = ActiveSupport::Cache::MemoryStore.new - throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1.second, block_for: 10.seconds) + throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1, block_for: 10, adapter: adapter) cached_throttle = Pecorino::CachedThrottle.new(store, throttle) cached_throttle.request(1) @@ -57,7 +53,7 @@ class << throttle test "caches results of request() and correctly returns cached state until the block is lifted" do store = ActiveSupport::Cache::MemoryStore.new - throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1.second, block_for: 10.seconds) + throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1, block_for: 10, adapter: adapter) cached_throttle = Pecorino::CachedThrottle.new(store, throttle) state1 = cached_throttle.request(1) @@ -80,14 +76,14 @@ class << throttle test "returns the key of the contained throttle" do store = ActiveSupport::Cache::MemoryStore.new - throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1.second, block_for: 10.seconds) + throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1, block_for: 10, adapter: adapter) cached_throttle = Pecorino::CachedThrottle.new(store, throttle) assert_equal cached_throttle.key, throttle.key end test "does not run block in throttled() until the block is lifted" do store = ActiveSupport::Cache::MemoryStore.new - throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1.second, block_for: 10.seconds) + throttle = Pecorino::Throttle.new(key: Random.uuid, capacity: 2, over_time: 1, block_for: 10, adapter: adapter) cached_throttle = Pecorino::CachedThrottle.new(store, throttle) assert_equal 123, cached_throttle.throttled { 123 } From 1e706d13d6855331c8dd614a55185a2bb175e072 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Mon, 26 Feb 2024 02:03:01 +0100 Subject: [PATCH 17/36] Script comment --- .../adapters/redis_adapter/add_tokens_conditionally.lua | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua b/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua index b3e7582..fa76569 100644 --- a/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua +++ b/lib/pecorino/adapters/redis_adapter/add_tokens_conditionally.lua @@ -1,6 +1,7 @@ -- Single threaded Leaky Bucket implementation (without blocking). -- args: key_base, leak_rate, bucket_ttl, fillup. To just verify the state of the bucket leak_rate of 0 may be passed. --- returns: the leve of the bucket in number of tokens +-- returns: the leve of the bucket in number of tokens. +-- This script is largely adapted from Prorate https://github.com/WeTransfer/prorate -- this is required to be able to use TIME and writes; basically it lifts the script into IO redis.replicate_commands() From c064c815931dd34c4bbb74a0bdb40458565efad3 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Mon, 26 Feb 2024 10:39:07 +0100 Subject: [PATCH 18/36] Tweak a little --- lib/pecorino/adapters/memory_adapter.rb | 75 ++++++++++++++----------- lib/pecorino/adapters/redis_adapter.rb | 6 +- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/lib/pecorino/adapters/memory_adapter.rb b/lib/pecorino/adapters/memory_adapter.rb index c3e50d3..50f6c34 100644 --- a/lib/pecorino/adapters/memory_adapter.rb +++ b/lib/pecorino/adapters/memory_adapter.rb @@ -23,6 +23,13 @@ def unlock(key) @locked_keys.delete(key) end end + + def with(key) + lock(key) + yield + ensure + unlock(key) + end end def initialize @@ -48,19 +55,7 @@ def state(key:, capacity:, leak_rate:) # Adds tokens to the leaky bucket. The return value is a tuple of two # values: the current level (Float) and whether the bucket is now at capacity (Boolean) def add_tokens(key:, capacity:, leak_rate:, n_tokens:) - @lock.lock(key) - now = get_mono_time - level, ts, _ = @buckets[key] || [0.0, now] - - dt = now - ts - level_after_fillup = clamp(0, level - (leak_rate * dt) + n_tokens, capacity) - - expire_after = now + (level_after_fillup / leak_rate) - @buckets[key] = [level_after_fillup, now, expire_after] - - [level_after_fillup, (level_after_fillup - capacity) >= 0] - ensure - @lock.unlock(key) + add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = false) end # Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will @@ -68,24 +63,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) # the current level (Float), whether the bucket is now at capacity (Boolean) # and whether the fillup was accepted (Boolean) def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) - @lock.lock(key) - now = get_mono_time - level, ts, _ = @buckets[key] || [0.0, now] - - dt = now - ts - level_after_leak = clamp(0, level - (leak_rate * dt), capacity) - level_after_fillup = level_after_leak + n_tokens - if level_after_fillup > capacity - return [level_after_leak, level_after_leak >= capacity, _did_accept = false] - end - - clamped_level_after_fillup = clamp(0, level_after_fillup, capacity) - expire_after = now + (level_after_fillup / leak_rate) - @buckets[key] = [clamped_level_after_fillup, now, expire_after] - - [clamped_level_after_fillup, clamped_level_after_fillup == capacity, _did_accept = true] - ensure - @lock.unlock(key) + add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = true) end # Sets a timed block for the given key - this is used when a throttle fires. The return value @@ -115,8 +93,18 @@ def blocked_until(key:) # now lapsed def prune now_monotonic = get_mono_time - @blocks.delete_if { |_, blocked_until_monotonic| blocked_until_monotonic < now_monotonic } - @buckets.delete_if { |_, (_level, expire_at_monotonic)| expire_at_monotonic < now_monotonic } + + @blocks.delete_if do |key, blocked_until_monotonic| + @lock.with(key) do + blocked_until_monotonic < now_monotonic + end + end + + @buckets.delete_if do |key, (_level, expire_at_monotonic)| + @lock.with(key) do + expire_at_monotonic < now_monotonic + end + end end # No-op @@ -125,6 +113,27 @@ def create_tables(active_record_schema) private + def add_tokens_with_lock(key, capacity, leak_rate, n_tokens, conditionally) + @lock.lock(key) + now = get_mono_time + level, ts, _ = @buckets[key] || [0.0, now] + + dt = now - ts + level_after_leak = clamp(0, level - (leak_rate * dt), capacity) + level_after_fillup = level_after_leak + n_tokens + if level_after_fillup > capacity && conditionally + return [level_after_leak, level_after_leak >= capacity, _did_accept = false] + end + + clamped_level_after_fillup = clamp(0, level_after_fillup, capacity) + expire_after = now + (level_after_fillup / leak_rate) + @buckets[key] = [clamped_level_after_fillup, now, expire_after] + + [clamped_level_after_fillup, clamped_level_after_fillup == capacity, _did_accept = true] + ensure + @lock.unlock(key) + end + def get_mono_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end diff --git a/lib/pecorino/adapters/redis_adapter.rb b/lib/pecorino/adapters/redis_adapter.rb index cf43479..2aae245 100644 --- a/lib/pecorino/adapters/redis_adapter.rb +++ b/lib/pecorino/adapters/redis_adapter.rb @@ -4,8 +4,8 @@ require "digest" require "redis" -# An adapter allows Pecorino throttles, leaky buckets and other -# resources to interfact to a data storage backend - a database, usually. +# An adapter for storing Pecorino leaky buckets and blocks in Redis. It uses Lua +# to enforce atomicity for leaky bucket operations class Pecorino::Adapters::RedisAdapter < Pecorino::Adapters::BaseAdapter class RedisScript def initialize(script_filename) @@ -17,8 +17,6 @@ def load_and_eval(redis, keys, argv) redis.evalsha(@sha, keys: keys, argv: argv) rescue Redis::CommandError => e if e.message.include? "NOSCRIPT" - # The Redis server has never seen this script before. Needs to run only once in the entire lifetime - # of the Redis server, until the script changes - in which case it will be loaded under a different SHA redis.script(:load, @script_body) retry else From 17eeb81dd798c5e3cc8875fbaab710156e0c92b1 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Mon, 26 Feb 2024 13:04:01 +0100 Subject: [PATCH 19/36] Still getting there --- test/adapters/adapter_test_methods.rb | 3 +++ test/adapters/postgres_adapter_test.rb | 35 ++++++++++++++++++-------- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index a58a374..77a9e9e 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -1,3 +1,6 @@ +# The module contains the conformance tests for a storage adapter for Pecorino. A well-behaved adapter +# should pass all of these tests. When creating a new adapter include this module in your test case +# and overload the `create_adapter` method module AdapterTestMethods LEVEL_DELTA = 0.1 diff --git a/test/adapters/postgres_adapter_test.rb b/test/adapters/postgres_adapter_test.rb index 4674adc..7eab7f7 100644 --- a/test/adapters/postgres_adapter_test.rb +++ b/test/adapters/postgres_adapter_test.rb @@ -4,30 +4,45 @@ class PostgresAdapterTest < ActiveSupport::TestCase include AdapterTestMethods - setup { create_postgres_database } - teardown { drop_postgres_database } + setup { create_postgres_database_if_none } + teardown { truncate_test_tables } def create_adapter Pecorino::Adapters::PostgresAdapter.new(ActiveRecord::Base) end + SEED_DB_NAME = -> { "pecorino_tests_%s" % Random.new(Minitest.seed).hex(4) } + + def create_postgres_database_if_none + ActiveRecord::Base.establish_connection(adapter: "postgresql", encoding: "unicode", database: SEED_DB_NAME.call) + ActiveRecord::Base.connection.execute("SELECT 1 FROM pecorino_leaky_buckets") + rescue ActiveRecord::NoDatabaseError, ActiveRecord::ConnectionNotEstablished + create_postgres_database + retry + rescue ActiveRecord::StatementInvalid + retained_adapter = adapter # the schema define block is run via instance_exec so it does not retain scope + ActiveRecord::Schema.define(version: 1) do |via_definer| + retained_adapter.create_tables(via_definer) + end + retry + end + def create_postgres_database - seed_db_name = Random.new(Minitest.seed).hex(4) ActiveRecord::Migration.verbose = false ActiveRecord::Base.establish_connection(adapter: "postgresql", database: "postgres") ActiveRecord::Base.connection.create_database("pecorino_tests_%s" % seed_db_name, charset: :unicode) ActiveRecord::Base.connection.close - ActiveRecord::Base.establish_connection(adapter: "postgresql", encoding: "unicode", database: "pecorino_tests_%s" % seed_db_name) + ActiveRecord::Base.establish_connection(adapter: "postgresql", encoding: "unicode", database: SEED_DB_NAME.call) + end - ActiveRecord::Schema.define(version: 1) do |via_definer| - Pecorino.create_tables(via_definer) - end + def truncate_test_tables + ActiveRecord::Base.connection.execute("TRUNCATE TABLE pecorino_leaky_buckets") + ActiveRecord::Base.connection.execute("TRUNCATE TABLE pecorino_blocks") end - def drop_postgres_database - seed_db_name = Random.new(Minitest.seed).hex(4) + Minitest.after_run do ActiveRecord::Base.connection.close ActiveRecord::Base.establish_connection(adapter: "postgresql", database: "postgres") - ActiveRecord::Base.connection.drop_database("pecorino_tests_%s" % seed_db_name) + ActiveRecord::Base.connection.drop_database(SEED_DB_NAME.call) end end From 6c2d7f32816ae94da90d3a8b0c957971560ddf4a Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Mon, 26 Feb 2024 13:04:57 +0100 Subject: [PATCH 20/36] That mostly works --- test/adapters/postgres_adapter_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/adapters/postgres_adapter_test.rb b/test/adapters/postgres_adapter_test.rb index 7eab7f7..1e43786 100644 --- a/test/adapters/postgres_adapter_test.rb +++ b/test/adapters/postgres_adapter_test.rb @@ -30,7 +30,7 @@ def create_postgres_database_if_none def create_postgres_database ActiveRecord::Migration.verbose = false ActiveRecord::Base.establish_connection(adapter: "postgresql", database: "postgres") - ActiveRecord::Base.connection.create_database("pecorino_tests_%s" % seed_db_name, charset: :unicode) + ActiveRecord::Base.connection.create_database(SEED_DB_NAME.call, charset: :unicode) ActiveRecord::Base.connection.close ActiveRecord::Base.establish_connection(adapter: "postgresql", encoding: "unicode", database: SEED_DB_NAME.call) end From da9daff3d3494a3cbfb62098b541dbd615dca317 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Mon, 26 Feb 2024 21:36:56 +0100 Subject: [PATCH 21/36] A bit more Volkswagening --- test/adapters/adapter_test_methods.rb | 33 +++++++++++++------------- test/adapters/postgres_adapter_test.rb | 12 ++++++++++ 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index 77a9e9e..be708c0 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -39,7 +39,7 @@ def test_bucket_lifecycle_with_unbounded_fillups assert_in_delta level, 0.65, LEVEL_DELTA assert_equal false, is_full - level, is_full = adapter.add_tokens(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.4) + level, is_full = adapter.add_tokens(key: k, capacity: capacity, leak_rate: leak_rate, n_tokens: 0.7) assert_in_delta level, 1.0, LEVEL_DELTA assert_equal true, is_full @@ -173,19 +173,22 @@ def test_bucket_lifecycle_handles_conditional_fillup_in_steps end def test_bucket_lifecycle_allows_conditional_fillup_after_leaking_out - key = random_key - capacity = 30 - leak_rate = capacity / 0.5 + rng = Random.new(Minitest.seed) + 12.times do |i| + key = rng.hex(4) + capacity = 30 + leak_rate = capacity / 0.5 - _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 29.6) - assert did_accept + _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 29.6) + assert did_accept, "Should have accepted the topup on iteration #{i}" - _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 1) - refute did_accept + _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 1) + refute did_accept, "Should have refused the topup on iteration #{i}" - sleep 0.6 # Spend enough time to allow the bucket to leak out completely - _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 1) - assert did_accept, "Once the bucket has leaked out to 0 the fillup should be accepted again" + sleep 0.6 # Spend enough time to allow the bucket to leak out completely + _, _, did_accept = adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 1) + assert did_accept, "Once the bucket has leaked out to 0 the fillup should be accepted again on iteration #{i}" + end end def test_set_block_sets_a_block @@ -232,12 +235,8 @@ def test_prune end def test_create_tables - adapter = create_adapter # Has to be in local scope - ActiveRecord::Schema.define(version: 1) do |via_definer| - adapter.create_tables(via_definer) - end - rescue ActiveRecord::ConnectionNotEstablished - # This adapter does not require a connection + # All we are testing for is that the adapter responds to that method and accepts an argument + adapter.create_tables(nil) end def xtest_should_accept_threadsafe_conditional_fillups diff --git a/test/adapters/postgres_adapter_test.rb b/test/adapters/postgres_adapter_test.rb index 1e43786..2efab20 100644 --- a/test/adapters/postgres_adapter_test.rb +++ b/test/adapters/postgres_adapter_test.rb @@ -40,6 +40,18 @@ def truncate_test_tables ActiveRecord::Base.connection.execute("TRUNCATE TABLE pecorino_blocks") end + def test_create_tables + ActiveRecord::Base.transaction do + ActiveRecord::Base.connection.execute("DROP TABLE pecorino_leaky_buckets") + ActiveRecord::Base.connection.execute("DROP TABLE pecorino_blocks") + # The adapter has to be in a variable as the schema definition is scoped to the migrator, not self + retained_adapter = create_adapter # the schema define block is run via instance_exec so it does not retain scope + ActiveRecord::Schema.define(version: 1) do |via_definer| + retained_adapter.create_tables(via_definer) + end + end + end + Minitest.after_run do ActiveRecord::Base.connection.close ActiveRecord::Base.establish_connection(adapter: "postgresql", database: "postgres") From c824e290a8ff0f1d5a0e352a7b5c49caab7144ad Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Tue, 27 Feb 2024 13:06:52 +0100 Subject: [PATCH 22/36] Make Pecorino.adapter pluggable so that the user is not always at the mercy of auto-detection --- lib/pecorino.rb | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/pecorino.rb b/lib/pecorino.rb index f4c685d..e74ff97 100644 --- a/lib/pecorino.rb +++ b/lib/pecorino.rb @@ -42,10 +42,18 @@ def self.create_tables(active_record_schema) adapter.create_tables(active_record_schema) end + def self.adapter=(adapter) + @adapter = adapter + end + + def self.adapter + @adapter || default_adapter_from_main_database + end + # Returns the database implementation for setting the values atomically. Since the implementation # differs per database, this method will return a different adapter depending on which database is # being used - def self.adapter # default_adapter_from_main_database + def self.default_adapter_from_main_database # default_adapter_from_main_database model_class = ActiveRecord::Base adapter_name = model_class.connection.adapter_name case adapter_name From ff8ee920fd0ad9d0ec86a913d2fa6a733a0eb741 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Tue, 27 Feb 2024 13:07:20 +0100 Subject: [PATCH 23/36] Structure tests a bit better still --- test/adapters/adapter_test_methods.rb | 3 +-- test/adapters/memory_adapter_test.rb | 2 ++ test/adapters/redis_adapter_test.rb | 2 ++ test/adapters/sqlite_adapter_test.rb | 12 ++++++++++++ 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index be708c0..67c7990 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -235,8 +235,7 @@ def test_prune end def test_create_tables - # All we are testing for is that the adapter responds to that method and accepts an argument - adapter.create_tables(nil) + raise "This has to either be a no-op in your test (if your adapter doesn't need any tables) or needs to be written" end def xtest_should_accept_threadsafe_conditional_fillups diff --git a/test/adapters/memory_adapter_test.rb b/test/adapters/memory_adapter_test.rb index dd6e69f..0fd964d 100644 --- a/test/adapters/memory_adapter_test.rb +++ b/test/adapters/memory_adapter_test.rb @@ -7,4 +7,6 @@ class MemoryAdapterTest < ActiveSupport::TestCase def create_adapter Pecorino::Adapters::MemoryAdapter.new end + + undef :test_create_tables end diff --git a/test/adapters/redis_adapter_test.rb b/test/adapters/redis_adapter_test.rb index 2944f19..20bc4c0 100644 --- a/test/adapters/redis_adapter_test.rb +++ b/test/adapters/redis_adapter_test.rb @@ -18,4 +18,6 @@ def delete_created_keys r = Redis.new r.del(r.keys(key_prefix + "*")) end + + undef :test_create_tables end diff --git a/test/adapters/sqlite_adapter_test.rb b/test/adapters/sqlite_adapter_test.rb index 6ee0ebe..dbf51fc 100644 --- a/test/adapters/sqlite_adapter_test.rb +++ b/test/adapters/sqlite_adapter_test.rb @@ -30,4 +30,16 @@ def test_db_filename def create_adapter Pecorino::Adapters::SqliteAdapter.new(ActiveRecord::Base) end + + def test_create_tables + ActiveRecord::Base.transaction do + ActiveRecord::Base.connection.execute("DROP TABLE pecorino_leaky_buckets") + ActiveRecord::Base.connection.execute("DROP TABLE pecorino_blocks") + # The adapter has to be in a variable as the schema definition is scoped to the migrator, not self + retained_adapter = create_adapter # the schema define block is run via instance_exec so it does not retain scope + ActiveRecord::Schema.define(version: 1) do |via_definer| + retained_adapter.create_tables(via_definer) + end + end + end end From 43567347972b58ba3adff1ff1745794c93c17e53 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 28 Feb 2024 20:22:33 +0100 Subject: [PATCH 24/36] Add Redis in CI --- .github/workflows/ci.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 307ff2c..e66c1d9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -50,6 +50,11 @@ jobs: options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 ports: - 5432:5432 + redis: + image: redis + options: --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5 + ports: + - 6379:6379 steps: - name: Checkout uses: actions/checkout@v4 From cbae5f02691204719438d340355be745af5c64f4 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 28 Feb 2024 20:25:17 +0100 Subject: [PATCH 25/36] Reformat --- lib/pecorino/adapters/memory_adapter.rb | 4 ++-- lib/pecorino/adapters/redis_adapter.rb | 2 +- test/adapters/adapter_test_methods.rb | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/pecorino/adapters/memory_adapter.rb b/lib/pecorino/adapters/memory_adapter.rb index 50f6c34..b332979 100644 --- a/lib/pecorino/adapters/memory_adapter.rb +++ b/lib/pecorino/adapters/memory_adapter.rb @@ -55,7 +55,7 @@ def state(key:, capacity:, leak_rate:) # Adds tokens to the leaky bucket. The return value is a tuple of two # values: the current level (Float) and whether the bucket is now at capacity (Boolean) def add_tokens(key:, capacity:, leak_rate:, n_tokens:) - add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = false) + add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = false) end # Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will @@ -63,7 +63,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) # the current level (Float), whether the bucket is now at capacity (Boolean) # and whether the fillup was accepted (Boolean) def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) - add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = true) + add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = true) end # Sets a timed block for the given key - this is used when a throttle fires. The return value diff --git a/lib/pecorino/adapters/redis_adapter.rb b/lib/pecorino/adapters/redis_adapter.rb index 2aae245..ead5a8a 100644 --- a/lib/pecorino/adapters/redis_adapter.rb +++ b/lib/pecorino/adapters/redis_adapter.rb @@ -87,7 +87,7 @@ def blocked_until(key:) def with_redis if @redis_pool.respond_to?(:with) - @redis_pool.with {|conn| yield(conn) } + @redis_pool.with { |conn| yield(conn) } else yield @redis_pool end diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index 67c7990..5c6468b 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -209,14 +209,14 @@ def test_set_block_sets_a_block def test_set_block_does_not_set_block_in_the_past key = random_key assert_nil adapter.blocked_until(key: key) - assert_raise(ArgumentError) { adapter.set_block(key: key, block_for: -20) } + assert_raise(ArgumentError) { adapter.set_block(key: key, block_for: -20) } assert_nil adapter.blocked_until(key: key) end def test_set_block_does_not_set_block_which_would_expire_immediately key = random_key assert_nil adapter.blocked_until(key: key) - assert_raise(ArgumentError) { adapter.set_block(key: key, block_for: 0) } + assert_raise(ArgumentError) { adapter.set_block(key: key, block_for: 0) } assert_nil adapter.blocked_until(key: key) end From 145258af8a3b923955301499a51fb6146d142cc9 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 28 Feb 2024 20:25:51 +0100 Subject: [PATCH 26/36] Remove unneeded lint step --- .github/workflows/ci.yml | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e66c1d9..bc69653 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,31 +8,6 @@ env: BUNDLE_PATH: vendor/bundle jobs: - # lint: - # name: Code Style - # runs-on: ubuntu-22.04 - # if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository - # strategy: - # matrix: - # ruby: - # - '2.7' - # steps: - # - name: Checkout - # uses: actions/checkout@v4 - # - name: Setup Ruby - # uses: ruby/setup-ruby@v1 - # with: - # ruby-version: ${{ matrix.ruby }} - # bundler-cache: true - # - name: Rubocop Cache - # uses: actions/cache@v3 - # with: - # path: ~/.cache/rubocop_cache - # key: ${{ runner.os }}-rubocop-${{ hashFiles('.rubocop.yml') }} - # restore-keys: | - # ${{ runner.os }}-rubocop- - # - name: Rubocop - # run: bundle exec rubocop test: name: Tests runs-on: ubuntu-22.04 From 6087ba9a15ed3dfac87165ad87ea54e55d74dd08 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 28 Feb 2024 20:26:33 +0100 Subject: [PATCH 27/36] Just on push is sufficient --- .github/workflows/ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bc69653..953b4ae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,6 @@ name: CI on: - push - - pull_request env: BUNDLE_PATH: vendor/bundle From 6e5c6e98c8f4f1b04ec966998fc100c52f1efc50 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 28 Feb 2024 20:28:00 +0100 Subject: [PATCH 28/36] Document Redis is available now --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0e40c72..e2bc0f8 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,11 @@ Pecorino is a rate limiter based on the concept of leaky buckets, or more specifically - based on the [generic cell rate](https://brandur.org/rate-limiting) algorithm. It uses your DB as the storage backend for the throttles. It is compact, easy to install, and does not require additional infrastructure. The approach used by Pecorino has been previously used by [prorate](https://github.com/WeTransfer/prorate) with Redis, and that approach has proven itself. -Pecorino is designed to integrate seamlessly into any Rails application using a PostgreSQL or SQLite database (at the moment there is no MySQL support, we would be delighted if you could add it). +Pecorino is designed to integrate seamlessly into any Rails application, and will use either: + +* A memory store (good enough if you have just 1 process) +* A PostgreSQL or SQLite database (at the moment there is no MySQL support, we would be delighted if you could add it) +* A Redis instance If you would like to know more about the leaky bucket algorithm: [this article](http://live.julik.nl/2022/08/the-unreasonable-effectiveness-of-leaky-buckets) or the [Wikipedia article](https://en.wikipedia.org/wiki/Leaky_bucket) are both good starting points. [This Wikipedia article](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm) describes the generic cell rate algorithm in more detail as well. From e8298a7181f95511824f1cb179c748bf5c811df3 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 28 Feb 2024 20:32:03 +0100 Subject: [PATCH 29/36] Zap DatabaseAdapter Rule of 3 --- lib/pecorino.rb | 1 - lib/pecorino/adapters/database_adapter.rb | 36 ---------------- lib/pecorino/adapters/postgres_adapter.rb | 48 +++++++++++++++------ lib/pecorino/adapters/sqlite_adapter.rb | 52 ++++++++++++++++------- 4 files changed, 72 insertions(+), 65 deletions(-) delete mode 100644 lib/pecorino/adapters/database_adapter.rb diff --git a/lib/pecorino.rb b/lib/pecorino.rb index e74ff97..153368c 100644 --- a/lib/pecorino.rb +++ b/lib/pecorino.rb @@ -14,7 +14,6 @@ module Pecorino module Adapters autoload :MemoryAdapter, "pecorino/adapters/memory_adapter" - autoload :DatabaseAdapter, "pecorino/adapters/database_adapter" autoload :PostgresAdapter, "pecorino/adapters/postgres_adapter" autoload :SqliteAdapter, "pecorino/adapters/sqlite_adapter" autoload :RedisAdapter, "pecorino/adapters/redis_adapter" diff --git a/lib/pecorino/adapters/database_adapter.rb b/lib/pecorino/adapters/database_adapter.rb deleted file mode 100644 index 19efeba..0000000 --- a/lib/pecorino/adapters/database_adapter.rb +++ /dev/null @@ -1,36 +0,0 @@ -# frozen_string_literal: true - -class Pecorino::Adapters::DatabaseAdapter - attr_reader :model_class - - def initialize(model_class) - @model_class = model_class - end - - def prune - # Delete all the old blocks here (if we are under a heavy swarm of requests which are all - # blocked it is probably better to avoid the big delete) - model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()") - - # Prune buckets which are no longer used. No "uncached" needed here since we are using "execute" - model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()") - end - - def create_tables(active_record_schema) - active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t| - t.string :key, null: false - t.float :level, null: false - t.datetime :last_touched_at, null: false - t.datetime :may_be_deleted_after, null: false - end - active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true - active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after] - - active_record_schema.create_table :pecorino_blocks, id: :uuid do |t| - t.string :key, null: false - t.datetime :blocked_until, null: false - end - active_record_schema.add_index :pecorino_blocks, [:key], unique: true - active_record_schema.add_index :pecorino_blocks, [:blocked_until] - end -end diff --git a/lib/pecorino/adapters/postgres_adapter.rb b/lib/pecorino/adapters/postgres_adapter.rb index e55521f..392c7c4 100644 --- a/lib/pecorino/adapters/postgres_adapter.rb +++ b/lib/pecorino/adapters/postgres_adapter.rb @@ -1,6 +1,10 @@ # frozen_string_literal: true -class Pecorino::Adapters::PostgresAdapter < Pecorino::Adapters::DatabaseAdapter +class Pecorino::Adapters::PostgresAdapter + def initialize(model_class) + @model_class = model_class + end + def state(key:, capacity:, leak_rate:) query_params = { key: key.to_s, @@ -10,7 +14,7 @@ def state(key:, capacity:, leak_rate:) # The `level` of the bucket is what got stored at `last_touched_at` time, and we can # extrapolate from it to see how many tokens have leaked out since `last_touched_at` - # we don't need to UPDATE the value in the bucket here - sql = model_class.sanitize_sql_array([<<~SQL, query_params]) + sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) SELECT GREATEST( 0.0, LEAST( @@ -26,7 +30,7 @@ def state(key:, capacity:, leak_rate:) # If the return value of the query is a NULL it means no such bucket exists, # so we assume the bucket is empty - current_level = model_class.connection.uncached { model_class.connection.select_value(sql) } || 0.0 + current_level = @model_class.connection.uncached { @model_class.connection.select_value(sql) } || 0.0 [current_level, capacity - current_level.abs < 0.01] end @@ -45,7 +49,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) fillup: n_tokens.to_f } - sql = model_class.sanitize_sql_array([<<~SQL, query_params]) + sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_leaky_buckets AS t (key, last_touched_at, may_be_deleted_after, level) VALUES @@ -79,7 +83,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) # query as a repeat (since we use "select_one" for the RETURNING bit) and will not call into Postgres # correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here. # See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test - upserted = model_class.connection.uncached { model_class.connection.select_one(sql) } + upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) } capped_level_after_fillup, at_capacity = upserted.fetch("level"), upserted.fetch("at_capacity") [capped_level_after_fillup, at_capacity] end @@ -99,7 +103,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) fillup: n_tokens.to_f } - sql = model_class.sanitize_sql_array([<<~SQL, query_params]) + sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) WITH pre AS MATERIALIZED ( SELECT -- Note the double clamping here. First we clamp the "current level - leak" to not go below zero, @@ -137,7 +141,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) level AS level_after SQL - upserted = model_class.connection.uncached { model_class.connection.select_one(sql) } + upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) } level_after = upserted.fetch("level_after") level_before = upserted.fetch("level_before") [level_after, level_after >= capacity, level_after != level_before] @@ -146,7 +150,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) def set_block(key:, block_for:) raise ArgumentError, "block_for must be positive" unless block_for > 0 query_params = {key: key.to_s, block_for: block_for.to_f} - block_set_query = model_class.sanitize_sql_array([<<~SQL, query_params]) + block_set_query = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_blocks AS t (key, blocked_until) VALUES @@ -155,18 +159,36 @@ def set_block(key:, block_for:) blocked_until = GREATEST(EXCLUDED.blocked_until, t.blocked_until) RETURNING blocked_until SQL - model_class.connection.uncached { model_class.connection.select_value(block_set_query) } + @model_class.connection.uncached { @model_class.connection.select_value(block_set_query) } end def blocked_until(key:) - block_check_query = model_class.sanitize_sql_array([<<~SQL, key]) + block_check_query = @model_class.sanitize_sql_array([<<~SQL, key]) SELECT blocked_until FROM pecorino_blocks WHERE key = ? AND blocked_until >= clock_timestamp() LIMIT 1 SQL - model_class.connection.uncached { model_class.connection.select_value(block_check_query) } + @model_class.connection.uncached { @model_class.connection.select_value(block_check_query) } end def prune - model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()") - model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()") + @model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()") + @model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()") + end + + def create_tables(active_record_schema) + active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t| + t.string :key, null: false + t.float :level, null: false + t.datetime :last_touched_at, null: false + t.datetime :may_be_deleted_after, null: false + end + active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true + active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after] + + active_record_schema.create_table :pecorino_blocks, id: :uuid do |t| + t.string :key, null: false + t.datetime :blocked_until, null: false + end + active_record_schema.add_index :pecorino_blocks, [:key], unique: true + active_record_schema.add_index :pecorino_blocks, [:blocked_until] end end diff --git a/lib/pecorino/adapters/sqlite_adapter.rb b/lib/pecorino/adapters/sqlite_adapter.rb index d56fe85..54d4026 100644 --- a/lib/pecorino/adapters/sqlite_adapter.rb +++ b/lib/pecorino/adapters/sqlite_adapter.rb @@ -1,6 +1,10 @@ # frozen_string_literal: true -class Pecorino::Adapters::SqliteAdapter < Pecorino::Adapters::DatabaseAdapter +class Pecorino::Adapters::SqliteAdapter + def initialize(model_class) + @model_class = model_class + end + def state(key:, capacity:, leak_rate:) # With a server database, it is really important to use the clock of the database itself so # that concurrent requests will see consistent bucket level calculations. Since SQLite is @@ -17,7 +21,7 @@ def state(key:, capacity:, leak_rate:) # The `level` of the bucket is what got stored at `last_touched_at` time, and we can # extrapolate from it to see how many tokens have leaked out since `last_touched_at` - # we don't need to UPDATE the value in the bucket here - sql = model_class.sanitize_sql_array([<<~SQL, query_params]) + sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) SELECT MAX( 0.0, MIN( @@ -33,7 +37,7 @@ def state(key:, capacity:, leak_rate:) # If the return value of the query is a NULL it means no such bucket exists, # so we assume the bucket is empty - current_level = model_class.connection.uncached { model_class.connection.select_value(sql) } || 0.0 + current_level = @model_class.connection.uncached { @model_class.connection.select_value(sql) } || 0.0 [current_level, capacity - current_level.abs < 0.01] end @@ -54,7 +58,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) id: SecureRandom.uuid # SQLite3 does not autogenerate UUIDs } - sql = model_class.sanitize_sql_array([<<~SQL, query_params]) + sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_leaky_buckets AS t (id, key, last_touched_at, may_be_deleted_after, level) VALUES @@ -89,7 +93,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) # query as a repeat (since we use "select_one" for the RETURNING bit) and will not call into Postgres # correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here. # See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test - upserted = model_class.connection.uncached { model_class.connection.select_one(sql) } + upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) } capped_level_after_fillup, one_if_did_overflow = upserted.fetch("level"), upserted.fetch("did_overflow") [capped_level_after_fillup, one_if_did_overflow == 1] end @@ -114,7 +118,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) # Sadly with SQLite we need to do an INSERT first, because otherwise the inserted row is visible # to the WITH clause, so we cannot combine the initial fillup and the update into one statement. # This shuld be fine however since we will suppress the INSERT on a key conflict - insert_sql = model_class.sanitize_sql_array([<<~SQL, query_params]) + insert_sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_leaky_buckets AS t (id, key, last_touched_at, may_be_deleted_after, level) VALUES @@ -130,9 +134,9 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) -- so that it can't be deleted between our INSERT and our UPDATE may_be_deleted_after = EXCLUDED.may_be_deleted_after SQL - model_class.connection.execute(insert_sql) + @model_class.connection.execute(insert_sql) - sql = model_class.sanitize_sql_array([<<~SQL, query_params]) + sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) -- With SQLite MATERIALIZED has to be used so that level_post is calculated before the UPDATE takes effect WITH pre(level_post_with_uncapped_fillup, level_post) AS MATERIALIZED ( SELECT @@ -156,7 +160,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) level AS level_after SQL - upserted = model_class.connection.uncached { model_class.connection.select_one(sql) } + upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) } level_after = upserted.fetch("level_after") level_before = upserted.fetch("level_before") [level_after, level_after >= capacity, level_after != level_before] @@ -165,7 +169,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) def set_block(key:, block_for:) raise ArgumentError, "block_for must be positive" unless block_for > 0 query_params = {id: SecureRandom.uuid, key: key.to_s, block_for: block_for.to_f, now_s: Time.now.to_f} - block_set_query = model_class.sanitize_sql_array([<<~SQL, query_params]) + block_set_query = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_blocks AS t (id, key, blocked_until) VALUES @@ -174,13 +178,13 @@ def set_block(key:, block_for:) blocked_until = MAX(EXCLUDED.blocked_until, t.blocked_until) RETURNING blocked_until; SQL - blocked_until_s = model_class.connection.uncached { model_class.connection.select_value(block_set_query) } + blocked_until_s = @model_class.connection.uncached { @model_class.connection.select_value(block_set_query) } Time.at(blocked_until_s) end def blocked_until(key:) now_s = Time.now.to_f - block_check_query = model_class.sanitize_sql_array([<<~SQL, {now_s: now_s, key: key}]) + block_check_query = @model_class.sanitize_sql_array([<<~SQL, {now_s: now_s, key: key}]) SELECT blocked_until FROM @@ -188,13 +192,31 @@ def blocked_until(key:) WHERE key = :key AND blocked_until >= :now_s LIMIT 1 SQL - blocked_until_s = model_class.connection.uncached { model_class.connection.select_value(block_check_query) } + blocked_until_s = @model_class.connection.uncached { @model_class.connection.select_value(block_check_query) } blocked_until_s && Time.at(blocked_until_s) end def prune now_s = Time.now.to_f - model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < ?", now_s) - model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < ?", now_s) + @model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < ?", now_s) + @model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < ?", now_s) + end + + def create_tables(active_record_schema) + active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t| + t.string :key, null: false + t.float :level, null: false + t.datetime :last_touched_at, null: false + t.datetime :may_be_deleted_after, null: false + end + active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true + active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after] + + active_record_schema.create_table :pecorino_blocks, id: :uuid do |t| + t.string :key, null: false + t.datetime :blocked_until, null: false + end + active_record_schema.add_index :pecorino_blocks, [:key], unique: true + active_record_schema.add_index :pecorino_blocks, [:blocked_until] end end From 6d4fc1b8e452858c92d60fe932169f2718197cb9 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Tue, 12 Mar 2024 22:07:11 +0100 Subject: [PATCH 30/36] Remove database from block_test.rb --- test/block_test.rb | 8 -------- 1 file changed, 8 deletions(-) diff --git a/test/block_test.rb b/test/block_test.rb index f8b6c9e..a4420fc 100644 --- a/test/block_test.rb +++ b/test/block_test.rb @@ -3,14 +3,6 @@ require "test_helper" class BlockTest < ActiveSupport::TestCase - def setup - create_postgres_database - end - - def teardown - drop_postgres_database - end - test "sets a block" do k = Base64.strict_encode64(Random.bytes(4)) assert_nil Pecorino::Block.blocked_until(key: k) From 538842a7b9e0ab7b497d491b97a2ff0535fc0e26 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Tue, 12 Mar 2024 22:58:25 +0100 Subject: [PATCH 31/36] Allow negative values for Block.set! --- lib/pecorino/block.rb | 2 ++ test/block_test.rb | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/pecorino/block.rb b/lib/pecorino/block.rb index 03dc3fc..4dd8beb 100644 --- a/lib/pecorino/block.rb +++ b/lib/pecorino/block.rb @@ -13,6 +13,8 @@ class Pecorino::Block def self.set!(key:, block_for:, adapter: Pecorino.adapter) adapter.set_block(key: key, block_for: block_for) Time.now + block_for + rescue ArgumentError # negative block + nil end # Returns the time until a certain block is in effect diff --git a/test/block_test.rb b/test/block_test.rb index a4420fc..96b1b6c 100644 --- a/test/block_test.rb +++ b/test/block_test.rb @@ -15,8 +15,7 @@ class BlockTest < ActiveSupport::TestCase test "does not return a block which has lapsed" do k = Base64.strict_encode64(Random.bytes(4)) assert_nil Pecorino::Block.blocked_until(key: k) - assert Pecorino::Block.set!(key: k, block_for: -30.minutes) - + Pecorino::Block.set!(key: k, block_for: -30.minutes) blocked_until = Pecorino::Block.blocked_until(key: k) assert_nil blocked_until end From 5a4bb2b6956aa81f6516f62297ae4d6e7d700a51 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Tue, 12 Mar 2024 22:59:59 +0100 Subject: [PATCH 32/36] Bump version and changelog --- CHANGELOG.md | 7 +++++++ lib/pecorino/version.rb | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2df074c..5c1a726 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.7.0 + +- Allow `Pecorino.adapter` to be assigned, and add `adapter:` to all classes. This allows the adapter for Pecorino to be configured manually and overridden in an initializer. +- Add Redis-based adapter derived from Prorate +- Formalize and test the adapter API +- Add a memory-based adapter for single-process applications (and as a reference) + ## 0.6.0 - Add `Pecorino::Block` for setting blocks directly. These are available both to `Throttle` with the same key and on their own. This can be used to set arbitrary blocks without having to configure a `Throttle` first. diff --git a/lib/pecorino/version.rb b/lib/pecorino/version.rb index 2ef89ea..39b4e25 100644 --- a/lib/pecorino/version.rb +++ b/lib/pecorino/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Pecorino - VERSION = "0.6.0" + VERSION = "0.7.0" end From 3898531ce038cc1679fc7ec5aef030de68392780 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 20 Mar 2024 12:30:36 +0100 Subject: [PATCH 33/36] Add YARD comments to BaseAdapter --- lib/pecorino/adapters/base_adapter.rb | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/lib/pecorino/adapters/base_adapter.rb b/lib/pecorino/adapters/base_adapter.rb index e18abb5..b04e9c0 100644 --- a/lib/pecorino/adapters/base_adapter.rb +++ b/lib/pecorino/adapters/base_adapter.rb @@ -5,12 +5,23 @@ class Pecorino::Adapters::BaseAdapter # Returns the state of a leaky bucket. The state should be a tuple of two # values: the current level (Float) and whether the bucket is now at capacity (Boolean) + # + # @param key[String] the key of the leaky bucket + # @param capacity[Float] the capacity of the leaky bucket to limit to + # @param leak_rate[Float] how many tokens leak out of the bucket per second + # @return [Array] def state(key:, capacity:, leak_rate:) [0, false] end # Adds tokens to the leaky bucket. The return value is a tuple of two # values: the current level (Float) and whether the bucket is now at capacity (Boolean) + # + # @param key[String] the key of the leaky bucket + # @param capacity[Float] the capacity of the leaky bucket to limit to + # @param leak_rate[Float] how many tokens leak out of the bucket per second + # @param n_tokens[Float] how many tokens to add + # @return [Array] def add_tokens(key:, capacity:, leak_rate:, n_tokens:) [0, false] end @@ -19,22 +30,32 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:) # be added. If there isn't - the fillup will be rejected. The return value is a triplet of # the current level (Float), whether the bucket is now at capacity (Boolean) # and whether the fillup was accepted (Boolean) + # + # @param key[String] the key of the leaky bucket + # @param capacity[Float] the capacity of the leaky bucket to limit to + # @param leak_rate[Float] how many tokens leak out of the bucket per second + # @param n_tokens[Float] how many tokens to add + # @return [Array] def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) [0, false, false] end # Sets a timed block for the given key - this is used when a throttle fires. The return value # is not defined - the call should always succeed. + # @param key[String] the key of the block + # @param block_for[#to_f, Active Support Duration] the duration of the block, in seconds def set_block(key:, block_for:) end # Returns the time until which a block for a given key is in effect. If there is no block in # effect, the method should return `nil`. The return value is either a `Time` or `nil` + # @param key[String] the key of the block def blocked_until(key:) end # Deletes leaky buckets which have an expiry value prior to now and throttle blocks which have # now lapsed + # @return [void] def prune end From 73f6be5498a33dee3578fb4d1d5b7f8037d82b09 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 20 Mar 2024 12:30:44 +0100 Subject: [PATCH 34/36] Better pruning assertions --- test/adapters/adapter_test_methods.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/adapters/adapter_test_methods.rb b/test/adapters/adapter_test_methods.rb index 5c6468b..3fd7309 100644 --- a/test/adapters/adapter_test_methods.rb +++ b/test/adapters/adapter_test_methods.rb @@ -228,10 +228,11 @@ def test_prune adapter.add_tokens_conditionally(key: key, capacity: capacity, leak_rate: leak_rate, n_tokens: 29.6) adapter.set_block(key: key, block_for: 0.5) - sleep 0.65 + sleep 2 # Both the leaky bucket and the block should have expired by now, and `prune` should not raise - adapter.prune + assert_nothing_raised { adapter.prune } + assert_nil adapter.blocked_until(key: key) end def test_create_tables From 9662a33c83aa6c12c0940fb54f20c0c6880811ed Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 20 Mar 2024 12:34:24 +0100 Subject: [PATCH 35/36] Document adapter= and adapter --- lib/pecorino.rb | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/pecorino.rb b/lib/pecorino.rb index 153368c..0a3545d 100644 --- a/lib/pecorino.rb +++ b/lib/pecorino.rb @@ -41,10 +41,19 @@ def self.create_tables(active_record_schema) adapter.create_tables(active_record_schema) end + # Allows assignment of an adapter for storing throttles. Normally this would be a subclass of `Pecorino::Adapters::BaseAdapter`, but + # you can assign anything you like. Set this in an initializer. By default Pecorino will use the adapter configured from your main + # database, but you can also create a separate database for it - or use Redis or memory storage. + # + # @param adapter[Pecorino::Adapters::BaseAdapter] + # @return [Pecorino::Adapters::BaseAdapter] def self.adapter=(adapter) @adapter = adapter end + # Returns the currently configured adapter, or the default adapter from the main database + # + # @return [Pecorino::Adapters::BaseAdapter] def self.adapter @adapter || default_adapter_from_main_database end @@ -52,7 +61,9 @@ def self.adapter # Returns the database implementation for setting the values atomically. Since the implementation # differs per database, this method will return a different adapter depending on which database is # being used - def self.default_adapter_from_main_database # default_adapter_from_main_database + # + # @param adapter[Pecorino::Adapters::BaseAdapter] + def self.default_adapter_from_main_database model_class = ActiveRecord::Base adapter_name = model_class.connection.adapter_name case adapter_name @@ -61,7 +72,7 @@ def self.default_adapter_from_main_database # default_adapter_from_main_database when /sqlite/i Pecorino::Adapters::SqliteAdapter.new(model_class) else - raise "Pecorino does not support #{adapter_name} just yet" + raise "Pecorino does not support the #{adapter_name} database just yet" end end end From 047c9f005188f85b28058b550dd83e9750b4a08e Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 20 Mar 2024 12:38:07 +0100 Subject: [PATCH 36/36] Improve memory store cleanup --- lib/pecorino/adapters/memory_adapter.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/pecorino/adapters/memory_adapter.rb b/lib/pecorino/adapters/memory_adapter.rb index b332979..0d891d0 100644 --- a/lib/pecorino/adapters/memory_adapter.rb +++ b/lib/pecorino/adapters/memory_adapter.rb @@ -94,15 +94,16 @@ def blocked_until(key:) def prune now_monotonic = get_mono_time - @blocks.delete_if do |key, blocked_until_monotonic| + @blocks.keys.each do |key| @lock.with(key) do - blocked_until_monotonic < now_monotonic + @blocks.delete(key) if @blocks[key] && @blocks[key] < now_monotonic end end - @buckets.delete_if do |key, (_level, expire_at_monotonic)| + @buckets.keys.each do |key| @lock.with(key) do - expire_at_monotonic < now_monotonic + _level, expire_at_monotonic = @buckets[key] + @buckets.delete(key) if expire_at_monotonic && expire_at_monotonic < now_monotonic end end end