diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d10f25d..eb2350c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Changelog 7.0.x (changes since 7.0.0) +- Replaying all events for the view schema (using `sequent:migrate:online` and `sequent:migrate:offline`) now make use of the PostgreSQL committed transaction id to track events that have already been replayed. The replayed ids table (specified by the removed `Sequent::configuration.replayed_ids_table_name` option) is no longer used and can be dropped from your database. - The `MessageDispatcher` class has been removed. - Instance-of routes in projectors and other message handlers now use an optimized lookup mechanism. These are the most common handlers (`on MyEvent do ... end`). - Many optimizations were applied to the `ReplayOptimizedPostgresPersistor`: diff --git a/db/sequent_schema.rb b/db/sequent_schema.rb index 4c1ebdca..38d82b37 100644 --- a/db/sequent_schema.rb +++ b/db/sequent_schema.rb @@ -8,8 +8,12 @@ t.text "event_json", :null => false t.integer "command_record_id", :null => false t.integer "stream_record_id", :null => false + t.bigint "xact_id" end + execute %Q{ +ALTER TABLE event_records ALTER COLUMN xact_id SET DEFAULT pg_current_xact_id()::text::bigint +} execute %Q{ CREATE UNIQUE INDEX unique_event_per_aggregate ON event_records ( aggregate_id, @@ -24,6 +28,7 @@ add_index "event_records", ["command_record_id"], :name => "index_event_records_on_command_record_id" add_index "event_records", ["event_type"], :name => "index_event_records_on_event_type" add_index "event_records", ["created_at"], :name => "index_event_records_on_created_at" + add_index "event_records", ["xact_id"], :name => "index_event_records_on_xact_id" create_table "command_records", :force => true do |t| t.string "user_id" diff --git a/docs/docs/concepts/configuration.md b/docs/docs/concepts/configuration.md index 09dadf9d..569752bd 100644 --- a/docs/docs/concepts/configuration.md +++ b/docs/docs/concepts/configuration.md @@ -49,7 +49,7 @@ Autoregistered classes will be appended to any already manually registered `comm Sequent detects duplicates it will currently fail. When setting `enable_autoregistration` to `true` in your `initializer` any [CommandHandlers](command-handler.html), [Projectors](projector.html) and [Workflows](workflow.html) are -automatically registered in your Sequent configuration. +automatically registered in your Sequent configuration. When you have base classes that you don't want to have automatically registered you can set `self.abstract_class = true` for these classes. Another option to skip autoregistration is to set `self.skip_autoregister` to `true`. @@ -135,9 +135,8 @@ For the latest configuration possibilities please check the `Sequent::Configurat | number_of_replay_processes | The [number of process](#number_of_replay_processes) used while offline migration | `4` | | offline_replay_persistor_class | The class used to persist the `Projector`s during the offline migration part. | `Sequent::Core::Persistors::ActiveRecordPersistor` | | online_replay_persistor_class | The class used to persist the `Projector`s. | `Sequent::Core::Persistors::ActiveRecordPersistor` | -| primary_database_key | A symbol indicating the primary database if multiple databases are specified within the provided db_config | `:primary` | +| primary_database_key | A symbol indicating the primary database if multiple databases are specified within the provided db_config | `:primary` | | primary_database_role | A symbol indicating the primary database role if using multiple databases with active record | `:writing` | -| replayed_ids_table_name | The name of the table in which Sequent keeps track of which events are already replayed during a [migration](migrations.html) | `'sequent_replayed_ids'` | | snapshot_event_class | The event class marking something as a [Snapshot event](snapshotting.html) | `Sequent::Core::SnapshotEvent` | | stream_record_class | The [class](event_store.html) mapped to the `stream_records` table | `Sequent::Core::StreamRecord` | | strict_check_attributes_on_apply_events | Whether or not sequent should fail on calling `apply` with invalid attributes. | `false`. Will be enabled by default in the next major release. | @@ -146,4 +145,4 @@ For the latest configuration possibilities please check the `Sequent::Configurat | uuid_generator | The UUID Generator used. Mainly useful for testing | `Sequent::Core::RandomUuidGenerator` | | versions_table_name | The name of the table in which Sequent checks which [migration version](migrations.html) is currently active | `'sequent_versions'` | | view_schema_name | The name of the view_schema in which the projections are created. | `'view_schema'` | -| enable_autoregistration | Enable autoregistration. This will autoregister `Sequent::CommandHandler`s, `Sequent::Projector`s and `Sequent::Workflow`s | `false` | \ No newline at end of file +| enable_autoregistration | Enable autoregistration. This will autoregister `Sequent::CommandHandler`s, `Sequent::Projector`s and `Sequent::Workflow`s | `false` | diff --git a/lib/sequent/configuration.rb b/lib/sequent/configuration.rb index 9f47ee1a..4c28755f 100644 --- a/lib/sequent/configuration.rb +++ b/lib/sequent/configuration.rb @@ -10,7 +10,6 @@ module Sequent class Configuration DEFAULT_VERSIONS_TABLE_NAME = 'sequent_versions' - DEFAULT_REPLAYED_IDS_TABLE_NAME = 'sequent_replayed_ids' DEFAULT_MIGRATION_SQL_FILES_DIRECTORY = 'db/tables' DEFAULT_DATABASE_CONFIG_DIRECTORY = 'db' @@ -68,8 +67,7 @@ class Configuration :enable_autoregistration attr_reader :migrations_class_name, - :versions_table_name, - :replayed_ids_table_name + :versions_table_name def self.instance @instance ||= new @@ -104,7 +102,6 @@ def initialize self.event_publisher = Sequent::Core::EventPublisher.new self.disable_event_handlers = false self.versions_table_name = DEFAULT_VERSIONS_TABLE_NAME - self.replayed_ids_table_name = DEFAULT_REPLAYED_IDS_TABLE_NAME self.migration_sql_files_directory = DEFAULT_MIGRATION_SQL_FILES_DIRECTORY self.view_schema_name = DEFAULT_VIEW_SCHEMA_NAME self.event_store_schema_name = DEFAULT_EVENT_STORE_SCHEMA_NAME @@ -135,13 +132,6 @@ def can_use_multiple_databases? enable_multiple_database_support && ActiveRecord.version > Gem::Version.new('6.1.0') end - def replayed_ids_table_name=(table_name) - fail ArgumentError, 'table_name can not be nil' unless table_name - - @replayed_ids_table_name = table_name - Sequent::Migrations::ReplayedIds.table_name = table_name - end - def versions_table_name=(table_name) fail ArgumentError, 'table_name can not be nil' unless table_name diff --git a/lib/sequent/core/event_record.rb b/lib/sequent/core/event_record.rb index 16c4bfc7..9f5e5882 100644 --- a/lib/sequent/core/event_record.rb +++ b/lib/sequent/core/event_record.rb @@ -76,6 +76,7 @@ class EventRecord < Sequent::ApplicationRecord include SerializesEvent self.table_name = 'event_records' + self.ignored_columns = %w[xact_id] belongs_to :stream_record belongs_to :command_record diff --git a/lib/sequent/migrations/replayed_ids.rb b/lib/sequent/migrations/replayed_ids.rb deleted file mode 100644 index 470f53d2..00000000 --- a/lib/sequent/migrations/replayed_ids.rb +++ /dev/null @@ -1,13 +0,0 @@ -# frozen_string_literal: true - -module Sequent - module Migrations - class ReplayedIds < Sequent::ApplicationRecord - def self.migration_sql - <<~SQL.chomp - CREATE TABLE IF NOT EXISTS #{table_name} (event_id bigint NOT NULL, CONSTRAINT event_id_pk PRIMARY KEY(event_id)); - SQL - end - end - end -end diff --git a/lib/sequent/migrations/versions.rb b/lib/sequent/migrations/versions.rb index c6a84aa6..7f3e1f63 100644 --- a/lib/sequent/migrations/versions.rb +++ b/lib/sequent/migrations/versions.rb @@ -13,6 +13,7 @@ def self.migration_sql CREATE TABLE IF NOT EXISTS #{table_name} (version integer NOT NULL, CONSTRAINT version_pk PRIMARY KEY(version)); ALTER TABLE #{table_name} drop constraint if exists only_one_running; ALTER TABLE #{table_name} ADD COLUMN IF NOT EXISTS status INTEGER DEFAULT NULL CONSTRAINT only_one_running CHECK (status in (1,2,3)); + ALTER TABLE #{table_name} ADD COLUMN IF NOT EXISTS xmin_xact_id BIGINT; DROP INDEX IF EXISTS single_migration_running; CREATE UNIQUE INDEX single_migration_running ON #{table_name} ((status * 0)) where status is not null; SQL @@ -33,11 +34,15 @@ def self.version_currently_migrating end def self.latest_version - order('version desc').limit(1).first&.version + latest&.version + end + + def self.latest + order('version desc').limit(1).first end def self.start_online!(new_version) - create!(version: new_version, status: MIGRATE_ONLINE_RUNNING) + create!(version: new_version, status: MIGRATE_ONLINE_RUNNING, xmin_xact_id: current_snapshot_xmin_xact_id) rescue ActiveRecord::RecordNotUnique raise ConcurrentMigration, "Migration for version #{new_version} is already running" end @@ -68,6 +73,10 @@ def self.start_offline!(new_version) def self.end_offline!(new_version) find_by!(version: new_version, status: MIGRATE_OFFLINE_RUNNING).update(status: DONE) end + + def self.current_snapshot_xmin_xact_id + connection.execute('SELECT pg_snapshot_xmin(pg_current_snapshot())::text::bigint AS xmin').first['xmin'] + end end end end diff --git a/lib/sequent/migrations/view_schema.rb b/lib/sequent/migrations/view_schema.rb index b7622889..12bf1afd 100644 --- a/lib/sequent/migrations/view_schema.rb +++ b/lib/sequent/migrations/view_schema.rb @@ -13,7 +13,6 @@ require_relative 'executor' require_relative 'sql' require_relative 'versions' -require_relative 'replayed_ids' module Sequent module Migrations @@ -199,13 +198,17 @@ def migrate_online in_view_schema do Versions.start_online!(Sequent.new_version) - truncate_replay_ids_table! - drop_old_tables(Sequent.new_version) executor.execute_online(plan) end - replay!(Sequent.configuration.online_replay_persistor_class.new, groups: groups) if plan.projectors.any? + if plan.projectors.any? + replay!( + Sequent.configuration.online_replay_persistor_class.new, + groups: groups, + maximum_xact_id_exclusive: Versions.running.first.xmin_xact_id, + ) + end in_view_schema do executor.create_indexes_after_execute_online(plan) @@ -237,7 +240,7 @@ def migrate_online # 2.1 Rename current tables with the +current version+ as SUFFIX # 2.2 Rename the new tables and remove the +new version+ suffix # 2.3 Add the new version in the +Versions+ table - # 3. Performs cleanup of replayed event ids + # 3. Update the versions table to complete the migration # # If anything fails an exception is raised and everything is rolled back # @@ -256,8 +259,8 @@ def migrate_offline if plan.projectors.any? replay!( Sequent.configuration.offline_replay_persistor_class.new, - exclude_ids: true, groups: groups(group_exponent: 1), + minimum_xact_id_inclusive: Versions.running.first.xmin_xact_id, ) end @@ -268,9 +271,6 @@ def migrate_offline # 2.3 Create migration record Versions.end_offline!(Sequent.new_version) end - - # 3. Truncate replayed ids - truncate_replay_ids_table! end logger.info "Migrated to version #{Sequent.new_version}" rescue ConcurrentMigration @@ -291,7 +291,7 @@ def ensure_valid_plan! def migrate_metadata_tables Sequent::ApplicationRecord.transaction do in_view_schema do - exec_sql([ReplayedIds.migration_sql, Versions.migration_sql].join("\n")) + exec_sql([Versions.migration_sql].join("\n")) end end end @@ -306,7 +306,13 @@ def ensure_version_correct! end end - def replay!(replay_persistor, groups:, projectors: plan.projectors, exclude_ids: false) + def replay!( + replay_persistor, + groups:, + projectors: plan.projectors, + minimum_xact_id_inclusive: nil, + maximum_xact_id_exclusive: nil + ) logger.info "groups: #{groups.size}" with_sequent_config(replay_persistor, projectors) do @@ -327,7 +333,14 @@ def replay!(replay_persistor, groups:, projectors: plan.projectors, exclude_ids: Group (#{aggregate_prefixes.first}-#{aggregate_prefixes.last}) #{index + 1}/#{groups.size} replayed EOS time(msg) do - replay_events(aggregate_prefixes, event_types, exclude_ids, replay_persistor, &insert_ids) + replay_events( + aggregate_prefixes, + event_types, + minimum_xact_id_inclusive, + maximum_xact_id_exclusive, + replay_persistor, + &on_progress + ) end nil rescue StandardError => e @@ -342,12 +355,19 @@ def replay!(replay_persistor, groups:, projectors: plan.projectors, exclude_ids: end end - def replay_events(aggregate_prefixes, event_types, exclude_already_replayed, replay_persistor, &on_progress) - replay_persistor.prepare - + def replay_events( + aggregate_prefixes, + event_types, + minimum_xact_id_inclusive, + maximum_xact_id_exclusive, + replay_persistor, + &on_progress + ) Sequent.configuration.event_store.replay_events_from_cursor( block_size: 1000, - get_events: -> { event_stream(aggregate_prefixes, event_types, exclude_already_replayed) }, + get_events: -> { + event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive) + }, on_progress: on_progress, ) @@ -362,15 +382,10 @@ def rollback_migration establish_connection drop_old_tables(Sequent.new_version) - truncate_replay_ids_table! executor.reset_table_names(plan) Versions.rollback!(Sequent.new_version) end - def truncate_replay_ids_table! - exec_sql("truncate table #{ReplayedIds.table_name}") - end - def groups(group_exponent: 3, limit: nil, offset: nil) number_of_groups = 16**group_exponent groups = groups_of_aggregate_id_prefixes(number_of_groups) @@ -411,15 +426,8 @@ def drop_old_tables(new_version) end end - def insert_ids + def on_progress ->(progress, done, ids) do - unless ids.empty? - exec_sql( - "insert into #{ReplayedIds.table_name} (event_id) values #{ids.map do |id| - "(#{id})" - end.join(',')}", - ) - end Sequent::Core::EventStore::PRINT_PROGRESS[progress, done, ids] if progress > 0 end end @@ -442,18 +450,24 @@ def with_sequent_config(replay_persistor, projectors, &block) Sequent::Configuration.restore(old_config) end - def event_stream(aggregate_prefixes, event_types, exclude_already_replayed) + def event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive) fail ArgumentError, 'aggregate_prefixes is mandatory' unless aggregate_prefixes.present? event_stream = Sequent.configuration.event_record_class.where(event_type: event_types) event_stream = event_stream.where(<<~SQL, aggregate_prefixes) substring(aggregate_id::text from 1 for #{LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE}) in (?) SQL - if exclude_already_replayed - event_stream = event_stream - .where("NOT EXISTS (SELECT 1 FROM #{ReplayedIds.table_name} WHERE event_id = event_records.id)") + if minimum_xact_id_inclusive && maximum_xact_id_exclusive + event_stream = event_stream.where( + 'xact_id >= ? AND xact_id < ?', + minimum_xact_id_inclusive, + maximum_xact_id_exclusive, + ) + elsif minimum_xact_id_inclusive + event_stream = event_stream.where('xact_id >= ?', minimum_xact_id_inclusive) + elsif maximum_xact_id_exclusive + event_stream = event_stream.where('xact_id IS NULL OR xact_id < ?', maximum_xact_id_exclusive) end - event_stream = event_stream.where('event_records.created_at > ?', 1.day.ago) if exclude_already_replayed event_stream .order('aggregate_id ASC, sequence_number ASC') .select('id, event_type, event_json, sequence_number') diff --git a/lib/sequent/rake/migration_tasks.rb b/lib/sequent/rake/migration_tasks.rb index 2ac13cc8..c7164a74 100644 --- a/lib/sequent/rake/migration_tasks.rb +++ b/lib/sequent/rake/migration_tasks.rb @@ -125,6 +125,44 @@ def register_tasks! end end + desc <<-EOS + Shows the current status of the migrations + EOS + task status: ['sequent:init', :init] do + ensure_sequent_env_set! + db_config = Sequent::Support::Database.read_config(@env) + view_schema = Sequent::Migrations::ViewSchema.new(db_config: db_config) + + latest_done_version = Sequent::Migrations::Versions.done.latest + latest_version = Sequent::Migrations::Versions.latest + pending_version = Sequent.new_version + case latest_version.status + when Sequent::Migrations::Versions::DONE + if pending_version == latest_version.version + puts "Current version #{latest_version.version}, no pending changes" + else + puts "Current version #{latest_version.version}, pending version #{pending_version}" + end + when Sequent::Migrations::Versions::MIGRATE_ONLINE_RUNNING + puts "Online migration from #{latest_done_version.version} to #{latest_version.version} is running" + when Sequent::Migrations::Versions::MIGRATE_ONLINE_FINISHED + projectors = view_schema.plan.projectors + event_types = projectors.flat_map { |projector| projector.message_mapping.keys }.uniq.map(&:name) + + current_snapshot_xmin_xact_id = Sequent::Migrations::Versions.current_snapshot_xmin_xact_id + pending_events = Sequent.configuration.event_record_class + .where(event_type: event_types) + .where('xact_id >= ?', current_snapshot_xmin_xact_id) + .count + print <<~EOS + Online migration from #{latest_done_version.version} to #{latest_version.version} is finished. + #{current_snapshot_xmin_xact_id - latest_version.xmin_xact_id} transactions behind current state (#{pending_events} pending events). + EOS + when Sequent::Migrations::Versions::MIGRATE_OFFLINE_RUNNING + puts "Offline migration from #{latest_done_version.version} to #{latest_version.version} is running" + end + end + desc <<~EOS Migrates the Projectors while the app is running. Call +sequent:migrate:offline+ after this successfully completed. EOS diff --git a/spec/lib/sequent/migrations/view_schema_spec.rb b/spec/lib/sequent/migrations/view_schema_spec.rb index 668d4a01..22d42ff6 100644 --- a/spec/lib/sequent/migrations/view_schema_spec.rb +++ b/spec/lib/sequent/migrations/view_schema_spec.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require 'spec_helper' -require 'timecop' require 'active_support/hash_with_indifferent_access' require_relative '../fixtures/spec_migrations' @@ -165,7 +164,7 @@ expect(Sequent::ApplicationRecord.connection).to have_view_schema_table('account_records_1') end - it 'replays the data and keeps track of the migrated ids' do + it 'replays the data and keeps track of the lowest transaction id of the currently in-progress transactions' do insert_events( 'Account', [ @@ -182,17 +181,21 @@ MessageSet.new(aggregate_id: message_aggregate_id, sequence_number: 2, message: 'Foobar'), ], ) + + before_migration_xact_id = Sequent::Migrations::Versions.current_snapshot_xmin_xact_id + migrator.migrate_online + after_migration_xact_id = Sequent::Migrations::Versions.current_snapshot_xmin_xact_id + expect(AccountRecord.table_name).to eq 'account_records' expect(AccountRecord.connection.select_value('select count(*) from account_records_1')).to eq 2 expect(MessageRecord.table_name).to eq 'message_records' expect(AccountRecord.connection.select_value('select count(*) from message_records_1')).to eq 1 - expect( - Sequent::Migrations::ReplayedIds.pluck(:event_id), - ).to match_array Sequent.configuration.event_record_class.pluck(:id) + expect(Sequent::Migrations::Versions.running.first.xmin_xact_id) + .to (be > before_migration_xact_id).and(be < after_migration_xact_id) end context 'specific projectors' do @@ -229,20 +232,6 @@ expect(MessageRecord.table_name).to eq 'message_records' expect(Sequent::ApplicationRecord.connection).to_not have_view_schema_table('message_records_1') - - expect( - Sequent::Migrations::ReplayedIds.pluck(:event_id), - ).to match_array( - Sequent - .configuration - .event_record_class - .where( - aggregate_id: [ - account_1, account_2 - ], - ) - .pluck(:id), - ) end end @@ -287,7 +276,6 @@ expect { migrator.migrate_online }.to raise_error(Parallel::UndumpableException) expect(Sequent::ApplicationRecord.connection).to_not have_view_schema_table('account_records_1') - expect(Sequent::Migrations::ReplayedIds.count).to eq 0 expect(Sequent::Migrations::Versions.count).to eq 1 expect(Sequent::Migrations::Versions.first.version).to eq 0 end @@ -425,31 +413,6 @@ expect(AccountRecord.table_name).to eq 'account_records' expect(MessageRecord.table_name).to eq 'message_records' end - - context 'offline replaying with older events' do - after :each do - Timecop.return - end - - it 'does not replay events older than 1 day' do - Timecop.freeze(1.week.ago) - - old_account_id = Sequent.new_uuid - old_account_created = AccountCreated.new(aggregate_id: old_account_id, sequence_number: 1) - insert_events('Account', [old_account_created]) - - Timecop.return - - new_account_id = Sequent.new_uuid - new_account_created = AccountCreated.new(aggregate_id: new_account_id, sequence_number: 1) - insert_events('Account', [new_account_created]) - - migrator.migrate_offline - - expect(AccountRecord.pluck(:aggregate_id)).to_not include(old_account_id) - expect(AccountRecord.pluck(:aggregate_id)).to include(new_account_id) - end - end end context 'error handling' do @@ -483,7 +446,6 @@ expect(Sequent::ApplicationRecord.connection).to_not have_view_schema_table('message_records') expect(Sequent::ApplicationRecord.connection).to_not have_view_schema_table('account_records') - expect(Sequent::Migrations::ReplayedIds.count).to eq 0 expect(Sequent::Migrations::Versions.count).to eq 1 expect(Sequent::Migrations::Versions.running.count).to eq 0 end @@ -531,7 +493,6 @@ expect(Sequent::ApplicationRecord.connection).to_not have_view_schema_table('message_records_2') expect(Sequent::ApplicationRecord.connection).to_not have_view_schema_table('account_records_2') - expect(Sequent::Migrations::ReplayedIds.count).to eq 0 expect(Sequent::Migrations::Versions.maximum(:version)).to eq 1 expect(AccountRecord.count).to eq(1)