Skip to content

Commit

Permalink
Merge pull request #406 from zilverline/replay-using-postgres-minimum…
Browse files Browse the repository at this point in the history
…-transaction-id

Base event migration on minimum in-progress transaction id
  • Loading branch information
erikrozendaal authored Feb 6, 2024
2 parents fcc7bcc + 338f5cb commit 9b5d5b6
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Changelog 7.0.x (changes since 7.0.0)

- Replaying all events for the view schema (using `sequent:migrate:online` and `sequent:migrate:offline`) now make use of the PostgreSQL committed transaction id to track events that have already been replayed. The replayed ids table (specified by the removed `Sequent::configuration.replayed_ids_table_name` option) is no longer used and can be dropped from your database.
- The `MessageDispatcher` class has been removed.
- Instance-of routes in projectors and other message handlers now use an optimized lookup mechanism. These are the most common handlers (`on MyEvent do ... end`).
- Many optimizations were applied to the `ReplayOptimizedPostgresPersistor`:
Expand Down
5 changes: 5 additions & 0 deletions db/sequent_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
t.text "event_json", :null => false
t.integer "command_record_id", :null => false
t.integer "stream_record_id", :null => false
t.bigint "xact_id"
end

execute %Q{
ALTER TABLE event_records ALTER COLUMN xact_id SET DEFAULT pg_current_xact_id()::text::bigint
}
execute %Q{
CREATE UNIQUE INDEX unique_event_per_aggregate ON event_records (
aggregate_id,
Expand All @@ -24,6 +28,7 @@
add_index "event_records", ["command_record_id"], :name => "index_event_records_on_command_record_id"
add_index "event_records", ["event_type"], :name => "index_event_records_on_event_type"
add_index "event_records", ["created_at"], :name => "index_event_records_on_created_at"
add_index "event_records", ["xact_id"], :name => "index_event_records_on_xact_id"

create_table "command_records", :force => true do |t|
t.string "user_id"
Expand Down
7 changes: 3 additions & 4 deletions docs/docs/concepts/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Autoregistered classes will be appended to any already manually registered `comm
Sequent detects duplicates it will currently fail.
When setting `enable_autoregistration` to `true` in your `initializer`
any [CommandHandlers](command-handler.html), [Projectors](projector.html) and [Workflows](workflow.html) are
automatically registered in your Sequent configuration.
automatically registered in your Sequent configuration.
When you have base classes that you don't want to have automatically registered you can
set `self.abstract_class = true` for these classes. Another option to skip autoregistration is to set
`self.skip_autoregister` to `true`.
Expand Down Expand Up @@ -135,9 +135,8 @@ For the latest configuration possibilities please check the `Sequent::Configurat
| number_of_replay_processes | The [number of process](#number_of_replay_processes) used while offline migration | `4` |
| offline_replay_persistor_class | The class used to persist the `Projector`s during the offline migration part. | `Sequent::Core::Persistors::ActiveRecordPersistor` |
| online_replay_persistor_class | The class used to persist the `Projector`s. | `Sequent::Core::Persistors::ActiveRecordPersistor` |
| primary_database_key | A symbol indicating the primary database if multiple databases are specified within the provided db_config | `:primary` |
| primary_database_key | A symbol indicating the primary database if multiple databases are specified within the provided db_config | `:primary` |
| primary_database_role | A symbol indicating the primary database role if using multiple databases with active record | `:writing` |
| replayed_ids_table_name | The name of the table in which Sequent keeps track of which events are already replayed during a [migration](migrations.html) | `'sequent_replayed_ids'` |
| snapshot_event_class | The event class marking something as a [Snapshot event](snapshotting.html) | `Sequent::Core::SnapshotEvent` |
| stream_record_class | The [class](event_store.html) mapped to the `stream_records` table | `Sequent::Core::StreamRecord` |
| strict_check_attributes_on_apply_events | Whether or not sequent should fail on calling `apply` with invalid attributes. | `false`. Will be enabled by default in the next major release. |
Expand All @@ -146,4 +145,4 @@ For the latest configuration possibilities please check the `Sequent::Configurat
| uuid_generator | The UUID Generator used. Mainly useful for testing | `Sequent::Core::RandomUuidGenerator` |
| versions_table_name | The name of the table in which Sequent checks which [migration version](migrations.html) is currently active | `'sequent_versions'` |
| view_schema_name | The name of the view_schema in which the projections are created. | `'view_schema'` |
| enable_autoregistration | Enable autoregistration. This will autoregister `Sequent::CommandHandler`s, `Sequent::Projector`s and `Sequent::Workflow`s | `false` |
| enable_autoregistration | Enable autoregistration. This will autoregister `Sequent::CommandHandler`s, `Sequent::Projector`s and `Sequent::Workflow`s | `false` |
12 changes: 1 addition & 11 deletions lib/sequent/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
module Sequent
class Configuration
DEFAULT_VERSIONS_TABLE_NAME = 'sequent_versions'
DEFAULT_REPLAYED_IDS_TABLE_NAME = 'sequent_replayed_ids'

DEFAULT_MIGRATION_SQL_FILES_DIRECTORY = 'db/tables'
DEFAULT_DATABASE_CONFIG_DIRECTORY = 'db'
Expand Down Expand Up @@ -68,8 +67,7 @@ class Configuration
:enable_autoregistration

attr_reader :migrations_class_name,
:versions_table_name,
:replayed_ids_table_name
:versions_table_name

def self.instance
@instance ||= new
Expand Down Expand Up @@ -104,7 +102,6 @@ def initialize
self.event_publisher = Sequent::Core::EventPublisher.new
self.disable_event_handlers = false
self.versions_table_name = DEFAULT_VERSIONS_TABLE_NAME
self.replayed_ids_table_name = DEFAULT_REPLAYED_IDS_TABLE_NAME
self.migration_sql_files_directory = DEFAULT_MIGRATION_SQL_FILES_DIRECTORY
self.view_schema_name = DEFAULT_VIEW_SCHEMA_NAME
self.event_store_schema_name = DEFAULT_EVENT_STORE_SCHEMA_NAME
Expand Down Expand Up @@ -135,13 +132,6 @@ def can_use_multiple_databases?
enable_multiple_database_support && ActiveRecord.version > Gem::Version.new('6.1.0')
end

def replayed_ids_table_name=(table_name)
fail ArgumentError, 'table_name can not be nil' unless table_name

@replayed_ids_table_name = table_name
Sequent::Migrations::ReplayedIds.table_name = table_name
end

def versions_table_name=(table_name)
fail ArgumentError, 'table_name can not be nil' unless table_name

Expand Down
1 change: 1 addition & 0 deletions lib/sequent/core/event_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class EventRecord < Sequent::ApplicationRecord
include SerializesEvent

self.table_name = 'event_records'
self.ignored_columns = %w[xact_id]

belongs_to :stream_record
belongs_to :command_record
Expand Down
13 changes: 0 additions & 13 deletions lib/sequent/migrations/replayed_ids.rb

This file was deleted.

13 changes: 11 additions & 2 deletions lib/sequent/migrations/versions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def self.migration_sql
CREATE TABLE IF NOT EXISTS #{table_name} (version integer NOT NULL, CONSTRAINT version_pk PRIMARY KEY(version));
ALTER TABLE #{table_name} drop constraint if exists only_one_running;
ALTER TABLE #{table_name} ADD COLUMN IF NOT EXISTS status INTEGER DEFAULT NULL CONSTRAINT only_one_running CHECK (status in (1,2,3));
ALTER TABLE #{table_name} ADD COLUMN IF NOT EXISTS xmin_xact_id BIGINT;
DROP INDEX IF EXISTS single_migration_running;
CREATE UNIQUE INDEX single_migration_running ON #{table_name} ((status * 0)) where status is not null;
SQL
Expand All @@ -33,11 +34,15 @@ def self.version_currently_migrating
end

def self.latest_version
order('version desc').limit(1).first&.version
latest&.version
end

def self.latest
order('version desc').limit(1).first
end

def self.start_online!(new_version)
create!(version: new_version, status: MIGRATE_ONLINE_RUNNING)
create!(version: new_version, status: MIGRATE_ONLINE_RUNNING, xmin_xact_id: current_snapshot_xmin_xact_id)
rescue ActiveRecord::RecordNotUnique
raise ConcurrentMigration, "Migration for version #{new_version} is already running"
end
Expand Down Expand Up @@ -68,6 +73,10 @@ def self.start_offline!(new_version)
def self.end_offline!(new_version)
find_by!(version: new_version, status: MIGRATE_OFFLINE_RUNNING).update(status: DONE)
end

def self.current_snapshot_xmin_xact_id
connection.execute('SELECT pg_snapshot_xmin(pg_current_snapshot())::text::bigint AS xmin').first['xmin']
end
end
end
end
82 changes: 48 additions & 34 deletions lib/sequent/migrations/view_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
require_relative 'executor'
require_relative 'sql'
require_relative 'versions'
require_relative 'replayed_ids'

module Sequent
module Migrations
Expand Down Expand Up @@ -199,13 +198,17 @@ def migrate_online
in_view_schema do
Versions.start_online!(Sequent.new_version)

truncate_replay_ids_table!

drop_old_tables(Sequent.new_version)
executor.execute_online(plan)
end

replay!(Sequent.configuration.online_replay_persistor_class.new, groups: groups) if plan.projectors.any?
if plan.projectors.any?
replay!(
Sequent.configuration.online_replay_persistor_class.new,
groups: groups,
maximum_xact_id_exclusive: Versions.running.first.xmin_xact_id,
)
end

in_view_schema do
executor.create_indexes_after_execute_online(plan)
Expand Down Expand Up @@ -237,7 +240,7 @@ def migrate_online
# 2.1 Rename current tables with the +current version+ as SUFFIX
# 2.2 Rename the new tables and remove the +new version+ suffix
# 2.3 Add the new version in the +Versions+ table
# 3. Performs cleanup of replayed event ids
# 3. Update the versions table to complete the migration
#
# If anything fails an exception is raised and everything is rolled back
#
Expand All @@ -256,8 +259,8 @@ def migrate_offline
if plan.projectors.any?
replay!(
Sequent.configuration.offline_replay_persistor_class.new,
exclude_ids: true,
groups: groups(group_exponent: 1),
minimum_xact_id_inclusive: Versions.running.first.xmin_xact_id,
)
end

Expand All @@ -268,9 +271,6 @@ def migrate_offline
# 2.3 Create migration record
Versions.end_offline!(Sequent.new_version)
end

# 3. Truncate replayed ids
truncate_replay_ids_table!
end
logger.info "Migrated to version #{Sequent.new_version}"
rescue ConcurrentMigration
Expand All @@ -291,7 +291,7 @@ def ensure_valid_plan!
def migrate_metadata_tables
Sequent::ApplicationRecord.transaction do
in_view_schema do
exec_sql([ReplayedIds.migration_sql, Versions.migration_sql].join("\n"))
exec_sql([Versions.migration_sql].join("\n"))
end
end
end
Expand All @@ -306,7 +306,13 @@ def ensure_version_correct!
end
end

def replay!(replay_persistor, groups:, projectors: plan.projectors, exclude_ids: false)
def replay!(
replay_persistor,
groups:,
projectors: plan.projectors,
minimum_xact_id_inclusive: nil,
maximum_xact_id_exclusive: nil
)
logger.info "groups: #{groups.size}"

with_sequent_config(replay_persistor, projectors) do
Expand All @@ -327,7 +333,14 @@ def replay!(replay_persistor, groups:, projectors: plan.projectors, exclude_ids:
Group (#{aggregate_prefixes.first}-#{aggregate_prefixes.last}) #{index + 1}/#{groups.size} replayed
EOS
time(msg) do
replay_events(aggregate_prefixes, event_types, exclude_ids, replay_persistor, &insert_ids)
replay_events(
aggregate_prefixes,
event_types,
minimum_xact_id_inclusive,
maximum_xact_id_exclusive,
replay_persistor,
&on_progress
)
end
nil
rescue StandardError => e
Expand All @@ -342,12 +355,19 @@ def replay!(replay_persistor, groups:, projectors: plan.projectors, exclude_ids:
end
end

def replay_events(aggregate_prefixes, event_types, exclude_already_replayed, replay_persistor, &on_progress)
replay_persistor.prepare

def replay_events(
aggregate_prefixes,
event_types,
minimum_xact_id_inclusive,
maximum_xact_id_exclusive,
replay_persistor,
&on_progress
)
Sequent.configuration.event_store.replay_events_from_cursor(
block_size: 1000,
get_events: -> { event_stream(aggregate_prefixes, event_types, exclude_already_replayed) },
get_events: -> {
event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive)
},
on_progress: on_progress,
)

Expand All @@ -362,15 +382,10 @@ def rollback_migration
establish_connection
drop_old_tables(Sequent.new_version)

truncate_replay_ids_table!
executor.reset_table_names(plan)
Versions.rollback!(Sequent.new_version)
end

def truncate_replay_ids_table!
exec_sql("truncate table #{ReplayedIds.table_name}")
end

def groups(group_exponent: 3, limit: nil, offset: nil)
number_of_groups = 16**group_exponent
groups = groups_of_aggregate_id_prefixes(number_of_groups)
Expand Down Expand Up @@ -411,15 +426,8 @@ def drop_old_tables(new_version)
end
end

def insert_ids
def on_progress
->(progress, done, ids) do
unless ids.empty?
exec_sql(
"insert into #{ReplayedIds.table_name} (event_id) values #{ids.map do |id|
"(#{id})"
end.join(',')}",
)
end
Sequent::Core::EventStore::PRINT_PROGRESS[progress, done, ids] if progress > 0
end
end
Expand All @@ -442,18 +450,24 @@ def with_sequent_config(replay_persistor, projectors, &block)
Sequent::Configuration.restore(old_config)
end

def event_stream(aggregate_prefixes, event_types, exclude_already_replayed)
def event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive)
fail ArgumentError, 'aggregate_prefixes is mandatory' unless aggregate_prefixes.present?

event_stream = Sequent.configuration.event_record_class.where(event_type: event_types)
event_stream = event_stream.where(<<~SQL, aggregate_prefixes)
substring(aggregate_id::text from 1 for #{LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE}) in (?)
SQL
if exclude_already_replayed
event_stream = event_stream
.where("NOT EXISTS (SELECT 1 FROM #{ReplayedIds.table_name} WHERE event_id = event_records.id)")
if minimum_xact_id_inclusive && maximum_xact_id_exclusive
event_stream = event_stream.where(
'xact_id >= ? AND xact_id < ?',
minimum_xact_id_inclusive,
maximum_xact_id_exclusive,
)
elsif minimum_xact_id_inclusive
event_stream = event_stream.where('xact_id >= ?', minimum_xact_id_inclusive)
elsif maximum_xact_id_exclusive
event_stream = event_stream.where('xact_id IS NULL OR xact_id < ?', maximum_xact_id_exclusive)
end
event_stream = event_stream.where('event_records.created_at > ?', 1.day.ago) if exclude_already_replayed
event_stream
.order('aggregate_id ASC, sequence_number ASC')
.select('id, event_type, event_json, sequence_number')
Expand Down
38 changes: 38 additions & 0 deletions lib/sequent/rake/migration_tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,44 @@ def register_tasks!
end
end

desc <<-EOS
Shows the current status of the migrations
EOS
task status: ['sequent:init', :init] do
ensure_sequent_env_set!
db_config = Sequent::Support::Database.read_config(@env)
view_schema = Sequent::Migrations::ViewSchema.new(db_config: db_config)

latest_done_version = Sequent::Migrations::Versions.done.latest
latest_version = Sequent::Migrations::Versions.latest
pending_version = Sequent.new_version
case latest_version.status
when Sequent::Migrations::Versions::DONE
if pending_version == latest_version.version
puts "Current version #{latest_version.version}, no pending changes"
else
puts "Current version #{latest_version.version}, pending version #{pending_version}"
end
when Sequent::Migrations::Versions::MIGRATE_ONLINE_RUNNING
puts "Online migration from #{latest_done_version.version} to #{latest_version.version} is running"
when Sequent::Migrations::Versions::MIGRATE_ONLINE_FINISHED
projectors = view_schema.plan.projectors
event_types = projectors.flat_map { |projector| projector.message_mapping.keys }.uniq.map(&:name)

current_snapshot_xmin_xact_id = Sequent::Migrations::Versions.current_snapshot_xmin_xact_id
pending_events = Sequent.configuration.event_record_class
.where(event_type: event_types)
.where('xact_id >= ?', current_snapshot_xmin_xact_id)
.count
print <<~EOS
Online migration from #{latest_done_version.version} to #{latest_version.version} is finished.
#{current_snapshot_xmin_xact_id - latest_version.xmin_xact_id} transactions behind current state (#{pending_events} pending events).
EOS
when Sequent::Migrations::Versions::MIGRATE_OFFLINE_RUNNING
puts "Offline migration from #{latest_done_version.version} to #{latest_version.version} is running"
end
end

desc <<~EOS
Migrates the Projectors while the app is running. Call +sequent:migrate:offline+ after this successfully completed.
EOS
Expand Down
Loading

0 comments on commit 9b5d5b6

Please sign in to comment.