From 9aa5cc98a5b2f6996d3232cd3ac108253df49a46 Mon Sep 17 00:00:00 2001 From: Erik Rozendaal Date: Tue, 24 Dec 2024 17:35:47 +0100 Subject: [PATCH] Enforce user defined unique keys for aggregates --- db/sequent_pgsql.sql | 19 ++++++++++++ db/sequent_schema_indexes.sql | 5 +++ db/sequent_schema_tables.sql | 6 ++++ lib/sequent/core/aggregate_root.rb | 5 +++ lib/sequent/core/event_store.rb | 11 +++++-- lib/sequent/core/stream_record.rb | 11 +++++-- .../sequent/core/aggregate_repository_spec.rb | 31 +++++++++++++++++++ 7 files changed, 84 insertions(+), 4 deletions(-) diff --git a/db/sequent_pgsql.sql b/db/sequent_pgsql.sql index 0cfe3996..f924fc86 100644 --- a/db/sequent_pgsql.sql +++ b/db/sequent_pgsql.sql @@ -135,6 +135,7 @@ DECLARE _provided_events_partition_key aggregates.events_partition_key%TYPE; _events_partition_key aggregates.events_partition_key%TYPE; _snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE; + _unique_keys jsonb; BEGIN _command_id = store_command(_command); @@ -165,6 +166,7 @@ BEGIN _aggregate_id = _aggregate->>'aggregate_id'; _provided_events_partition_key = _aggregate->>'events_partition_key'; _snapshot_outdated_at = _aggregate->>'snapshot_outdated_at'; + _unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb); SELECT * INTO _aggregate_row FROM aggregates WHERE aggregate_id = _aggregate_id; _events_partition_key = COALESCE(_provided_events_partition_key, _aggregate_row.events_partition_key, ''); @@ -179,6 +181,23 @@ BEGIN DO UPDATE SET events_partition_key = EXCLUDED.events_partition_key WHERE aggregates.events_partition_key IS DISTINCT FROM EXCLUDED.events_partition_key; + DELETE FROM aggregate_unique_keys AS target + WHERE target.aggregate_id = _aggregate_id + AND NOT (_unique_keys ? target.scope); + + BEGIN + INSERT INTO aggregate_unique_keys AS target (aggregate_id, scope, key) + SELECT _aggregate_id, key, value + FROM jsonb_each(_unique_keys) AS x + ON CONFLICT (aggregate_id, scope) DO UPDATE + SET key = EXCLUDED.key + WHERE target.key <> EXCLUDED.key; + EXCEPTION + WHEN unique_violation THEN + RAISE unique_violation + USING MESSAGE = 'duplicate aggregate key value for aggregate ' || _aggregate_id || ' (' || SQLERRM || ')'; + END; + INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json) SELECT _events_partition_key, _aggregate_id, diff --git a/db/sequent_schema_indexes.sql b/db/sequent_schema_indexes.sql index fe0a1e95..36ddd373 100644 --- a/db/sequent_schema_indexes.sql +++ b/db/sequent_schema_indexes.sql @@ -14,6 +14,11 @@ CREATE INDEX events_event_type_id_idx ON events (event_type_id); ALTER TABLE aggregates ADD FOREIGN KEY (aggregate_type_id) REFERENCES aggregate_types (id) ON UPDATE CASCADE; +ALTER TABLE aggregate_unique_keys + ADD PRIMARY KEY (aggregate_id, scope), + ADD UNIQUE (scope, key), + ADD FOREIGN KEY (aggregate_id) REFERENCES aggregates (aggregate_id) ON UPDATE CASCADE ON DELETE CASCADE; + ALTER TABLE events ADD FOREIGN KEY (partition_key, aggregate_id) REFERENCES aggregates (events_partition_key, aggregate_id) ON UPDATE CASCADE ON DELETE RESTRICT; diff --git a/db/sequent_schema_tables.sql b/db/sequent_schema_tables.sql index 4a498f4b..78241f3f 100644 --- a/db/sequent_schema_tables.sql +++ b/db/sequent_schema_tables.sql @@ -25,6 +25,12 @@ CREATE TABLE aggregates ( created_at timestamp with time zone NOT NULL DEFAULT NOW() ) PARTITION BY RANGE (aggregate_id); +CREATE TABLE aggregate_unique_keys ( + aggregate_id uuid NOT NULL, + scope text NOT NULL, + key jsonb NOT NULL +); + CREATE TABLE events ( aggregate_id uuid NOT NULL, partition_key text NOT NULL DEFAULT '', diff --git a/lib/sequent/core/aggregate_root.rb b/lib/sequent/core/aggregate_root.rb index d3efc068..67f031a2 100644 --- a/lib/sequent/core/aggregate_root.rb +++ b/lib/sequent/core/aggregate_root.rb @@ -98,12 +98,17 @@ def to_s "#{self.class.name}: #{@id}" end + def unique_keys + {} + end + def event_stream EventStream.new( aggregate_type: self.class.name, aggregate_id: id, events_partition_key: events_partition_key, snapshot_outdated_at: snapshot_outdated? ? Time.now : nil, + unique_keys:, ) end diff --git a/lib/sequent/core/event_store.rb b/lib/sequent/core/event_store.rb index 3521851b..4af13063 100644 --- a/lib/sequent/core/event_store.rb +++ b/lib/sequent/core/event_store.rb @@ -18,6 +18,9 @@ class EventStore class OptimisticLockingError < RuntimeError end + class AggregateKeyNotUniqueError < RuntimeError + end + class DeserializeEventError < RuntimeError attr_reader :event_hash @@ -250,8 +253,12 @@ def store_events(command, streams_with_events = []) Sequent::Core::Oj.dump(events), ], ) - rescue ActiveRecord::RecordNotUnique - raise OptimisticLockingError + rescue ActiveRecord::RecordNotUnique => e + if e.message =~ /duplicate aggregate key value/ + raise AggregateKeyNotUniqueError + else + raise OptimisticLockingError + end end def convert_timestamp(timestamp) diff --git a/lib/sequent/core/stream_record.rb b/lib/sequent/core/stream_record.rb index 940691df..2dbd2731 100644 --- a/lib/sequent/core/stream_record.rb +++ b/lib/sequent/core/stream_record.rb @@ -4,8 +4,15 @@ module Sequent module Core - EventStream = Data.define(:aggregate_type, :aggregate_id, :events_partition_key, :snapshot_outdated_at) do - def initialize(aggregate_type:, aggregate_id:, events_partition_key: '', snapshot_outdated_at: nil) + EventStream = Data.define( + :aggregate_type, + :aggregate_id, + :events_partition_key, + :snapshot_outdated_at, + :unique_keys, + ) do + def initialize(aggregate_type:, aggregate_id:, events_partition_key: '', snapshot_outdated_at: nil, + unique_keys: {}) super end end diff --git a/spec/lib/sequent/core/aggregate_repository_spec.rb b/spec/lib/sequent/core/aggregate_repository_spec.rb index 184dfcca..6439960f 100644 --- a/spec/lib/sequent/core/aggregate_repository_spec.rb +++ b/spec/lib/sequent/core/aggregate_repository_spec.rb @@ -481,5 +481,36 @@ def ping expect(aggregate.first.pinged).to eq(2) end end + + context 'with unique keys' do + class DummyWithUniqueKeysCreated < Sequent::Core::Event + attrs unique_keys: Object + end + + class DummyAggregateWithUniqueKeys < Sequent::Core::AggregateRoot + def initialize(id, unique_keys) + super(id) + apply DummyWithUniqueKeysCreated, unique_keys: + end + + def unique_keys + @unique_keys || {} + end + + on DummyWithUniqueKeysCreated do |event| + @unique_keys = event.unique_keys + end + end + + it 'enforces key uniqueness with the same scope' do + dummy1 = DummyAggregateWithUniqueKeys.new(Sequent.new_uuid, {email: 'test@example.com'}) + dummy2 = DummyAggregateWithUniqueKeys.new(Sequent.new_uuid, {email: 'test@example.com'}) + Sequent.aggregate_repository.add_aggregate(dummy1) + Sequent.aggregate_repository.add_aggregate(dummy2) + + expect { Sequent.aggregate_repository.commit(DummyCommand.new) } + .to raise_error Sequent::Core::EventStore::AggregateKeyNotUniqueError + end + end end end