Skip to content

Commit

Permalink
Merge pull request #437 from zilverline/aggregate-unique-keys
Browse files Browse the repository at this point in the history
Enforce user defined unique keys for aggregates
  • Loading branch information
erikrozendaal authored Jan 16, 2025
2 parents 3d353b6 + f9484be commit a7abf4d
Show file tree
Hide file tree
Showing 17 changed files with 713 additions and 282 deletions.
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
# Changelog 8.1.0 (changes since 8.0.2)

- Support specifying unique keys for aggregates that are checked when
transaction is committed, reducing the need for having aggregates to
ensure uniqueness or relying on projectors. See
lib/sequent/core/helpers/unique_keys.rb for details.

This feature requires a new database, you can find an example
migration at `db/migrate/20250108162754_aggregate_unique_keys.rb`.
- `CommandHandlerHelpers` has been updated to:
- Load aggregates using the `given_events` so that unique keys can
be tested. This may require changing your test to correctly set up
the given events.
- Run your test using the real or fake event store.
- Use RSpec matchers in `then_events`.
- `then_events` now only checks the events since the last
`when_command` call occurred.
- You can now retrieve the current position of the event store using
`EventStore#position_mark` and load all the events since this
position using `EventStore#load_events_since_marked_position`. This
is used by `CommandHandlerHelpers` but in the future might have
other uses (such as tailing the event store as events are being
committed).

# Changelog 8.0.2 (changes since 8.0.1)

- Add support for ActiveRecord 8. Thanks evsasse.
Expand Down
31 changes: 31 additions & 0 deletions db/migrate/20250108162754_aggregate_unique_keys.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

class AggregateUniqueKeys < ActiveRecord::Migration[7.2]
def up
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
say 'Creating aggregate_unique_keys table', true
suppress_messages do
execute <<~SQL
CREATE TABLE IF NOT EXISTS aggregate_unique_keys (
aggregate_id uuid NOT NULL,
scope text NOT NULL,
key jsonb NOT NULL,
PRIMARY KEY (aggregate_id, scope),
UNIQUE (scope, key),
FOREIGN KEY (aggregate_id) REFERENCES aggregates (aggregate_id) ON UPDATE CASCADE ON DELETE CASCADE
)
SQL
end

say 'Creating event store stored procedures and views', true
suppress_messages do
sequent_pgsql_filename = File.join(Sequent.configuration.database_schema_directory, 'sequent_pgsql.sql')
execute File.read(sequent_pgsql_filename)
end
end
end

def down
fail ActiveRecord::IrreversibleMigration
end
end
24 changes: 24 additions & 0 deletions db/sequent_pgsql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ DECLARE
_provided_events_partition_key aggregates.events_partition_key%TYPE;
_events_partition_key aggregates.events_partition_key%TYPE;
_snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE;
_unique_keys jsonb;
BEGIN
_command_id = store_command(_command);

Expand All @@ -159,12 +160,22 @@ BEGIN
ORDER BY 1
ON CONFLICT DO NOTHING;

FOR _aggregate IN SELECT row->0 FROM jsonb_array_elements(_aggregates_with_events) AS row LOOP
_aggregate_id = _aggregate->>'aggregate_id';
_unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb);

DELETE FROM aggregate_unique_keys AS target
WHERE target.aggregate_id = _aggregate_id
AND NOT (_unique_keys ? target.scope);
END LOOP;

FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row
ORDER BY row->0->'aggregate_id', row->1->0->'event_json'->'sequence_number'
LOOP
_aggregate_id = _aggregate->>'aggregate_id';
_provided_events_partition_key = _aggregate->>'events_partition_key';
_snapshot_outdated_at = _aggregate->>'snapshot_outdated_at';
_unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb);

SELECT * INTO _aggregate_row FROM aggregates WHERE aggregate_id = _aggregate_id;
_events_partition_key = COALESCE(_provided_events_partition_key, _aggregate_row.events_partition_key, '');
Expand All @@ -179,6 +190,19 @@ BEGIN
DO UPDATE SET events_partition_key = EXCLUDED.events_partition_key
WHERE aggregates.events_partition_key IS DISTINCT FROM EXCLUDED.events_partition_key;

BEGIN
INSERT INTO aggregate_unique_keys AS target (aggregate_id, scope, key)
SELECT _aggregate_id, key, value
FROM jsonb_each(_unique_keys) AS x
ON CONFLICT (aggregate_id, scope) DO UPDATE
SET key = EXCLUDED.key
WHERE target.key <> EXCLUDED.key;
EXCEPTION
WHEN unique_violation THEN
RAISE unique_violation
USING MESSAGE = 'duplicate unique key value for aggregate ' || (_aggregate->>'aggregate_type') || ' ' || _aggregate_id || ' (' || SQLERRM || ')';
END;

INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json)
SELECT _events_partition_key,
_aggregate_id,
Expand Down
5 changes: 5 additions & 0 deletions db/sequent_schema_indexes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ CREATE INDEX events_event_type_id_idx ON events (event_type_id);
ALTER TABLE aggregates
ADD FOREIGN KEY (aggregate_type_id) REFERENCES aggregate_types (id) ON UPDATE CASCADE;

ALTER TABLE aggregate_unique_keys
ADD PRIMARY KEY (aggregate_id, scope),
ADD UNIQUE (scope, key),
ADD FOREIGN KEY (aggregate_id) REFERENCES aggregates (aggregate_id) ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE events
ADD FOREIGN KEY (partition_key, aggregate_id) REFERENCES aggregates (events_partition_key, aggregate_id)
ON UPDATE CASCADE ON DELETE RESTRICT;
Expand Down
6 changes: 6 additions & 0 deletions db/sequent_schema_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ CREATE TABLE aggregates (
created_at timestamp with time zone NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (aggregate_id);

CREATE TABLE aggregate_unique_keys (
aggregate_id uuid NOT NULL,
scope text NOT NULL,
key jsonb NOT NULL
);

CREATE TABLE events (
aggregate_id uuid NOT NULL,
partition_key text NOT NULL DEFAULT '',
Expand Down
Loading

0 comments on commit a7abf4d

Please sign in to comment.