From 67463d5140c6437f7b53a7ed3e56359af562383a Mon Sep 17 00:00:00 2001 From: Erik Rozendaal Date: Thu, 19 Dec 2024 15:25:22 +0100 Subject: [PATCH 1/4] Avoid exhausting type ids due to transaction rollbacks Type ids (for aggregate, command, and event types) use the `SMALLINT` data type to optimize storage usage. However, when a sequence is used to generated ids the ids can be dropped on transaction rollback. This can cause the ids to be quickly exhausted when something is misconfigured. This change uses the highest id + 1 for the next id. This is slow and requires table locking, but since new types are rarely added this should not impact normal performance. --- db/sequent_pgsql.sql | 47 ++++++++++++++++++++++-------------- db/sequent_schema_tables.sql | 6 ++--- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/db/sequent_pgsql.sql b/db/sequent_pgsql.sql index 93eff3f6..a3194c72 100644 --- a/db/sequent_pgsql.sql +++ b/db/sequent_pgsql.sql @@ -102,10 +102,12 @@ DECLARE _command_json jsonb = _command->'command_json'; BEGIN IF NOT EXISTS (SELECT 1 FROM command_types t WHERE t.type = _command->>'command_type') THEN - -- Only try inserting if it doesn't exist to avoid exhausting the id sequence - INSERT INTO command_types (type) - VALUES (_command->>'command_type') - ON CONFLICT DO NOTHING; + -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here To avoid + -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. + LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE; + INSERT INTO command_types (id, type) + VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM command_types), _command->>'command_type') + ON CONFLICT (type) DO NOTHING; END IF; INSERT INTO commands ( @@ -136,29 +138,38 @@ DECLARE _events_partition_key aggregates.events_partition_key%TYPE; _snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE; _unique_keys jsonb; + _type TEXT; BEGIN _command_id = store_command(_command); - WITH types AS ( + FOR _type IN ( SELECT DISTINCT row->0->>'aggregate_type' AS type FROM jsonb_array_elements(_aggregates_with_events) AS row - ) - INSERT INTO aggregate_types (type) - SELECT type FROM types - WHERE type NOT IN (SELECT type FROM aggregate_types) - ORDER BY 1 - ON CONFLICT DO NOTHING; + EXCEPT + SELECT type FROM aggregate_types + ORDER BY 1 + ) LOOP + -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here To avoid + -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. + LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE; + INSERT INTO aggregate_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM aggregate_types), _type) + ON CONFLICT (type) DO NOTHING; + END LOOP; - WITH types AS ( + FOR _type IN ( SELECT DISTINCT events->>'event_type' AS type FROM jsonb_array_elements(_aggregates_with_events) AS row CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events - ) - INSERT INTO event_types (type) - SELECT type FROM types - WHERE type NOT IN (SELECT type FROM event_types) - ORDER BY 1 - ON CONFLICT DO NOTHING; + EXCEPT + SELECT type FROM event_types + ORDER BY 1 + ) LOOP + -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here To avoid + -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. + LOCK TABLE event_types IN ACCESS EXCLUSIVE MODE; + INSERT INTO event_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM event_types), _type) + ON CONFLICT (type) DO NOTHING; + END LOOP; FOR _aggregate IN SELECT row->0 FROM jsonb_array_elements(_aggregates_with_events) AS row LOOP _aggregate_id = _aggregate->>'aggregate_id'; diff --git a/db/sequent_schema_tables.sql b/db/sequent_schema_tables.sql index 78241f3f..bffaf432 100644 --- a/db/sequent_schema_tables.sql +++ b/db/sequent_schema_tables.sql @@ -1,6 +1,6 @@ -CREATE TABLE command_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); -CREATE TABLE aggregate_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); -CREATE TABLE event_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE command_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE aggregate_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE event_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL); CREATE SEQUENCE IF NOT EXISTS commands_id_seq; From 9520fc858a98b00cc6d44f4068ca78b578370e52 Mon Sep 17 00:00:00 2001 From: Erik Rozendaal Date: Fri, 20 Dec 2024 11:46:54 +0100 Subject: [PATCH 2/4] Drop IDENTITY default for type ids --- db/sequent_pgsql.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/db/sequent_pgsql.sql b/db/sequent_pgsql.sql index a3194c72..002dc06b 100644 --- a/db/sequent_pgsql.sql +++ b/db/sequent_pgsql.sql @@ -1,3 +1,7 @@ +ALTER TABLE aggregate_types ALTER COLUMN id DROP IDENTITY IF EXISTS; +ALTER TABLE command_types ALTER COLUMN id DROP IDENTITY IF EXISTS; +ALTER TABLE event_types ALTER COLUMN id DROP IDENTITY IF EXISTS; + DROP TYPE IF EXISTS aggregate_event_type CASCADE; CREATE TYPE aggregate_event_type AS ( aggregate_type text, From cd9c5ad3366b70390260aab0149fcf640e4d7019 Mon Sep 17 00:00:00 2001 From: Erik Rozendaal Date: Fri, 20 Dec 2024 11:58:19 +0100 Subject: [PATCH 3/4] Enforce correct locking order of type tables to avoid deadlock Note that updating the type tables is a very rare occurence (only when new code is deployed and the new types are committed to the event store) so locking these tables should not be a performance bottleneck in the normal case. --- db/sequent_pgsql.sql | 79 +++++++++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 34 deletions(-) diff --git a/db/sequent_pgsql.sql b/db/sequent_pgsql.sql index 002dc06b..27d422d9 100644 --- a/db/sequent_pgsql.sql +++ b/db/sequent_pgsql.sql @@ -99,14 +99,13 @@ BEGIN END; $$; -CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint +CREATE OR REPLACE PROCEDURE update_types(_command jsonb, _aggregates_with_events jsonb) LANGUAGE plpgsql AS $$ DECLARE - _id commands.id%TYPE; - _command_json jsonb = _command->'command_json'; + _type TEXT; BEGIN IF NOT EXISTS (SELECT 1 FROM command_types t WHERE t.type = _command->>'command_type') THEN - -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here To avoid + -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE; INSERT INTO command_types (id, type) @@ -114,6 +113,46 @@ BEGIN ON CONFLICT (type) DO NOTHING; END IF; + FOR _type IN ( + SELECT DISTINCT row->0->>'aggregate_type' AS type + FROM jsonb_array_elements(_aggregates_with_events) AS row + EXCEPT + SELECT type FROM aggregate_types + ORDER BY 1 + ) LOOP + -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid + -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. + LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE; + LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE; + INSERT INTO aggregate_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM aggregate_types), _type) + ON CONFLICT (type) DO NOTHING; + END LOOP; + + FOR _type IN ( + SELECT DISTINCT events->>'event_type' AS type + FROM jsonb_array_elements(_aggregates_with_events) AS row + CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events + EXCEPT + SELECT type FROM event_types + ORDER BY 1 + ) LOOP + -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid + -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. + LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE; + LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE; + LOCK TABLE event_types IN ACCESS EXCLUSIVE MODE; + INSERT INTO event_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM event_types), _type) + ON CONFLICT (type) DO NOTHING; + END LOOP; +END; +$$; + +CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + _id commands.id%TYPE; + _command_json jsonb = _command->'command_json'; +BEGIN INSERT INTO commands ( created_at, user_id, aggregate_id, command_type_id, command_json, event_aggregate_id, event_sequence_number @@ -142,38 +181,10 @@ DECLARE _events_partition_key aggregates.events_partition_key%TYPE; _snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE; _unique_keys jsonb; - _type TEXT; BEGIN - _command_id = store_command(_command); + CALL update_types(_command, _aggregates_with_events); - FOR _type IN ( - SELECT DISTINCT row->0->>'aggregate_type' AS type - FROM jsonb_array_elements(_aggregates_with_events) AS row - EXCEPT - SELECT type FROM aggregate_types - ORDER BY 1 - ) LOOP - -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here To avoid - -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. - LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE; - INSERT INTO aggregate_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM aggregate_types), _type) - ON CONFLICT (type) DO NOTHING; - END LOOP; - - FOR _type IN ( - SELECT DISTINCT events->>'event_type' AS type - FROM jsonb_array_elements(_aggregates_with_events) AS row - CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events - EXCEPT - SELECT type FROM event_types - ORDER BY 1 - ) LOOP - -- Only when new types are added is this path executed, which should be rare. We do not use a sequence here To avoid - -- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id. - LOCK TABLE event_types IN ACCESS EXCLUSIVE MODE; - INSERT INTO event_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM event_types), _type) - ON CONFLICT (type) DO NOTHING; - END LOOP; + _command_id = store_command(_command); FOR _aggregate IN SELECT row->0 FROM jsonb_array_elements(_aggregates_with_events) AS row LOOP _aggregate_id = _aggregate->>'aggregate_id'; From 217e61d7a3510aedc30379f2f6de8544d78dd81f Mon Sep 17 00:00:00 2001 From: Erik Rozendaal Date: Wed, 15 Jan 2025 15:19:12 +0100 Subject: [PATCH 4/4] Add changelog entry with migration instructions --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae14d96d..59f63929 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,15 @@ is used by `CommandHandlerHelpers` but in the future might have other uses (such as tailing the event store as events are being committed). +- Use `max(id) + 1` query to assign type ids to avoid exhausing the + SMALLINT range. Sequences are not rolled back when a transaction is + aborted so type ids may become exhausted. This is mainly a problem + in development when running tests, but could occur in production if + a transaction that adds a new type is buggy and gets rolled back + multiple times. + + Re-apply the `sequent_pgsql.sql` file to your Sequent schema to + apply this fix. # Changelog 8.0.2 (changes since 8.0.1)