Skip to content

Commit

Permalink
Merge pull request #435 from zilverline/avoid-exhausting-type-ids
Browse files Browse the repository at this point in the history
Avoid exhausting type ids due to transaction rollbacks
  • Loading branch information
erikrozendaal authored Jan 17, 2025
2 parents a7abf4d + 217e61d commit 6db3b3a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 31 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@
is used by `CommandHandlerHelpers` but in the future might have
other uses (such as tailing the event store as events are being
committed).
- Use `max(id) + 1` query to assign type ids to avoid exhausing the
SMALLINT range. Sequences are not rolled back when a transaction is
aborted so type ids may become exhausted. This is mainly a problem
in development when running tests, but could occur in production if
a transaction that adds a new type is buggy and gets rolled back
multiple times.

Re-apply the `sequent_pgsql.sql` file to your Sequent schema to
apply this fix.

# Changelog 8.0.2 (changes since 8.0.1)

Expand Down
82 changes: 54 additions & 28 deletions db/sequent_pgsql.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
ALTER TABLE aggregate_types ALTER COLUMN id DROP IDENTITY IF EXISTS;
ALTER TABLE command_types ALTER COLUMN id DROP IDENTITY IF EXISTS;
ALTER TABLE event_types ALTER COLUMN id DROP IDENTITY IF EXISTS;

DROP TYPE IF EXISTS aggregate_event_type CASCADE;
CREATE TYPE aggregate_event_type AS (
aggregate_type text,
Expand Down Expand Up @@ -95,19 +99,60 @@ BEGIN
END;
$$;

CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint
CREATE OR REPLACE PROCEDURE update_types(_command jsonb, _aggregates_with_events jsonb)
LANGUAGE plpgsql AS $$
DECLARE
_id commands.id%TYPE;
_command_json jsonb = _command->'command_json';
_type TEXT;
BEGIN
IF NOT EXISTS (SELECT 1 FROM command_types t WHERE t.type = _command->>'command_type') THEN
-- Only try inserting if it doesn't exist to avoid exhausting the id sequence
INSERT INTO command_types (type)
VALUES (_command->>'command_type')
ON CONFLICT DO NOTHING;
-- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid
-- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id.
LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE;
INSERT INTO command_types (id, type)
VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM command_types), _command->>'command_type')
ON CONFLICT (type) DO NOTHING;
END IF;

FOR _type IN (
SELECT DISTINCT row->0->>'aggregate_type' AS type
FROM jsonb_array_elements(_aggregates_with_events) AS row
EXCEPT
SELECT type FROM aggregate_types
ORDER BY 1
) LOOP
-- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid
-- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id.
LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE;
LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE;
INSERT INTO aggregate_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM aggregate_types), _type)
ON CONFLICT (type) DO NOTHING;
END LOOP;

FOR _type IN (
SELECT DISTINCT events->>'event_type' AS type
FROM jsonb_array_elements(_aggregates_with_events) AS row
CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events
EXCEPT
SELECT type FROM event_types
ORDER BY 1
) LOOP
-- Only when new types are added is this path executed, which should be rare. We do not use a sequence here to avoid
-- wasting ids (which are limited for this table) on rollback, we lock the table and select the minimum next id.
LOCK TABLE command_types IN ACCESS EXCLUSIVE MODE;
LOCK TABLE aggregate_types IN ACCESS EXCLUSIVE MODE;
LOCK TABLE event_types IN ACCESS EXCLUSIVE MODE;
INSERT INTO event_types (id, type) VALUES ((SELECT COALESCE(MAX(id) + 1, 1) FROM event_types), _type)
ON CONFLICT (type) DO NOTHING;
END LOOP;
END;
$$;

CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint
LANGUAGE plpgsql AS $$
DECLARE
_id commands.id%TYPE;
_command_json jsonb = _command->'command_json';
BEGIN
INSERT INTO commands (
created_at, user_id, aggregate_id, command_type_id, command_json,
event_aggregate_id, event_sequence_number
Expand Down Expand Up @@ -137,28 +182,9 @@ DECLARE
_snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE;
_unique_keys jsonb;
BEGIN
_command_id = store_command(_command);
CALL update_types(_command, _aggregates_with_events);

WITH types AS (
SELECT DISTINCT row->0->>'aggregate_type' AS type
FROM jsonb_array_elements(_aggregates_with_events) AS row
)
INSERT INTO aggregate_types (type)
SELECT type FROM types
WHERE type NOT IN (SELECT type FROM aggregate_types)
ORDER BY 1
ON CONFLICT DO NOTHING;

WITH types AS (
SELECT DISTINCT events->>'event_type' AS type
FROM jsonb_array_elements(_aggregates_with_events) AS row
CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events
)
INSERT INTO event_types (type)
SELECT type FROM types
WHERE type NOT IN (SELECT type FROM event_types)
ORDER BY 1
ON CONFLICT DO NOTHING;
_command_id = store_command(_command);

FOR _aggregate IN SELECT row->0 FROM jsonb_array_elements(_aggregates_with_events) AS row LOOP
_aggregate_id = _aggregate->>'aggregate_id';
Expand Down
6 changes: 3 additions & 3 deletions db/sequent_schema_tables.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE TABLE command_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE aggregate_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE event_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE command_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE aggregate_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL);
CREATE TABLE event_types (id SMALLINT PRIMARY KEY, type text UNIQUE NOT NULL);

CREATE SEQUENCE IF NOT EXISTS commands_id_seq;

Expand Down

0 comments on commit 6db3b3a

Please sign in to comment.