Skip to content

Commit

Permalink
Add ability to run background migrations in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
fatkodima committed Jan 16, 2025
1 parent ed7e6e2 commit 9547786
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 28 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## master (unreleased)

- Add ability to run background migrations in parallel

## 0.23.0 (2025-01-13)

- Prevent multiple instances of schedulers from being running simultaneously
Expand Down
10 changes: 10 additions & 0 deletions docs/background_data_migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,13 @@ OnlineMigrations::ApplicationRecord.connects_to database: { writing: :shard_one

By default, ActiveRecord uses the database config named `:primary` (if exists) under the environment section from the `database.yml`.
Otherwise, the first config under the environment section is used.

By default, scheduler works on a single shard on each run. To run on many shards in parallel:

```ruby
[:shard_one, :shard_two, :shard_three].each do |shard|
every 1.minute do
runner "OnlineMigrations.run_background_data_migrations(shard: :#{shard})"
end
end
```
10 changes: 10 additions & 0 deletions docs/background_schema_migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,13 @@ OnlineMigrations::ApplicationRecord.connects_to database: { writing: :shard_one

By default, ActiveRecord uses the database config named `:primary` (if exists) under the environment section from the `database.yml`.
Otherwise, the first config under the environment section is used.

By default, scheduler works on a single shard on each run. To run on many shards in parallel:

```ruby
[:shard_one, :shard_two, :shard_three].each do |shard|
every 1.minute do
runner "OnlineMigrations.run_background_schema_migrations(shard: :#{shard})"
end
end
```
16 changes: 12 additions & 4 deletions lib/online_migrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,22 @@ def config
end

# Run background data migrations
def run_background_migrations
BackgroundMigrations::Scheduler.run
#
# @param shard [String, Symbol, nil] the name of the shard to run
# background data migrations on. By default runs on all shards.
#
def run_background_migrations(shard: nil)
BackgroundMigrations::Scheduler.run(shard: shard)
end
alias run_background_data_migrations run_background_migrations

# Run background schema migrations
def run_background_schema_migrations
BackgroundSchemaMigrations::Scheduler.run
#
# @param shard [String, Symbol, nil] the name of the shard to run
# background schema migrations on. By default runs on all shards.
#
def run_background_schema_migrations(shard: nil)
BackgroundSchemaMigrations::Scheduler.run(shard: shard)
end

def deprecator
Expand Down
32 changes: 24 additions & 8 deletions lib/online_migrations/background_migrations/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,50 @@ module BackgroundMigrations
# successive runs has passed.
#
# Scheduler should be configured to run periodically, for example, via cron.
#
# @example Run via whenever
# # add this to schedule.rb
# every 1.minute do
# runner "OnlineMigrations.run_background_migrations"
# runner "OnlineMigrations.run_background_data_migrations"
# end
#
# @example Run via whenever (specific shard)
# every 1.minute do
# runner "OnlineMigrations.run_background_data_migrations(shard: :shard_two)"
# end
#
class Scheduler
def self.run
new.run
def self.run(shard: nil)
new.run(shard: shard)
end

# Runs Scheduler
def run
def run(shard: nil)
active_migrations = Migration.runnable.active.queue_order
active_migrations = active_migrations.where(shard: shard) if shard
runnable_migration = active_migrations.select(&:interval_elapsed?).first

if runnable_migration
runner = MigrationRunner.new(runnable_migration)

try_with_lock do
runner.run_migration_job
if shard
runnable_migration.on_shard do
connection = runnable_migration.migration_model.connection
try_with_lock(connection: connection) do
runner.run_migration_job
end
end
else
try_with_lock do
runner.run_migration_job
end
end
end
end

private
def try_with_lock(&block)
lock = AdvisoryLock.new(name: "online_migrations_data_scheduler")
def try_with_lock(**options, &block)
lock = AdvisoryLock.new(name: "online_migrations_data_scheduler", **options)
lock.try_with_lock(&block)
end
end
Expand Down
11 changes: 6 additions & 5 deletions lib/online_migrations/background_schema_migrations/migration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ def run
end
end

# @private
def on_shard(&block)
shard = (self.shard || connection_class.default_shard).to_sym
connection_class.connected_to(shard: shard, role: :writing, &block)
end

private
def validate_children_statuses
if composite?
Expand Down Expand Up @@ -243,11 +249,6 @@ def set_defaults
self.statement_timeout ||= config.statement_timeout
end

def on_shard(&block)
shard = (self.shard || connection_class.default_shard).to_sym
connection_class.connected_to(shard: shard, role: :writing, &block)
end

def with_statement_timeout(connection, timeout)
return yield if timeout.nil?

Expand Down
37 changes: 28 additions & 9 deletions lib/online_migrations/background_schema_migrations/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,53 @@ module BackgroundSchemaMigrations
# running migration on the same table.
#
# Scheduler should be configured to run periodically, for example, via cron.
#
# @example Run via whenever
# # add this to schedule.rb
# every 1.minute do
# runner "OnlineMigrations.run_background_schema_migrations"
# end
#
# @example Run via whenever (specific shard)
# every 1.minute do
# runner "OnlineMigrations.run_background_schema_migrations(shard: :shard_two)"
# end
#
class Scheduler
def self.run
new.run
def self.run(shard: nil)
new.run(shard: shard)
end

# Runs Scheduler
def run
migration = find_migration
def run(shard: nil)
migration = find_migration(shard)
if migration
runner = MigrationRunner.new(migration)

try_with_lock do
runner.run
if shard
migration.on_shard do
connection = migration.connection_class.connection
try_with_lock(connection: connection) do
runner.run
end
end
else
try_with_lock do
runner.run
end
end
end
end

private
def find_migration
def find_migration(shard)
active_migrations = Migration.running.reject(&:stuck?)
runnable_migrations = Migration.runnable.enqueued.queue_order.to_a + Migration.retriable.queue_order.to_a

if shard
runnable_migrations = runnable_migrations.select { |migration| migration.shard.to_s == shard.to_s }
end

runnable_migrations.find do |runnable_migration|
active_migrations.none? do |active_migration|
active_migration.connection_class_name == runnable_migration.connection_class_name &&
Expand All @@ -44,8 +63,8 @@ def find_migration
end
end

def try_with_lock(&block)
lock = AdvisoryLock.new(name: "online_migrations_schema_scheduler")
def try_with_lock(**options, &block)
lock = AdvisoryLock.new(name: "online_migrations_schema_scheduler", **options)
lock.try_with_lock(&block)
end
end
Expand Down
26 changes: 24 additions & 2 deletions test/background_migrations/scheduler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ def setup
def teardown
@connection.drop_table(:users, if_exists: true)
OnlineMigrations::BackgroundMigrations::Migration.delete_all
on_each_shard { Dog.delete_all }
end

def test_run
user1 = User.create!
user2 = User.create!
user3 = User.create!

m = OnlineMigrations::BackgroundMigrations::Migration.create!(
m = create_migration(
migration_name: "MakeAllNonAdmins",
batch_size: 2,
sub_batch_size: 1,
Expand All @@ -49,11 +50,27 @@ def test_run
end
end

def test_run_specific_shard
on_each_shard { Dog.create! }

m = create_migration(migration_name: "MakeAllDogsNice")

scheduler = OnlineMigrations::BackgroundMigrations::Scheduler.new
scheduler.run(shard: :shard_two)
scheduler.run(shard: :shard_two) # finish

assert m.reload.running?

_child1, child2, child3 = m.children.to_a
assert child2.enqueued?
assert child3.succeeded?
end

def test_run_migration_has_stuck_job
user1 = User.create!
user2 = User.create!

m = OnlineMigrations::BackgroundMigrations::Migration.create!(
m = create_migration(
migration_name: "MakeAllNonAdmins",
batch_size: 1,
sub_batch_size: 1
Expand Down Expand Up @@ -81,5 +98,10 @@ def test_run_migration_has_stuck_job
scheduler.run # last run to ensure there are no more work
assert m.reload.completed?
end

private
def create_migration(migration_name:, **attributes)
@connection.create_background_data_migration(migration_name, **attributes)
end
end
end
20 changes: 20 additions & 0 deletions test/background_schema_migrations/scheduler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,26 @@ def test_run
assert m.reload.succeeded?
end

def test_run_specific_shard
m = create_migration(
name: "index_dogs_on_name",
table_name: "dogs",
definition: 'CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "index_dogs_on_name" ON "dogs" ("name")',
connection_class_name: "ShardRecord"
)

scheduler = OnlineMigrations::BackgroundSchemaMigrations::Scheduler.new
scheduler.run(shard: :shard_two)

assert m.reload.running?

shard_one_migration = m.children.find_by(shard: :shard_one)
assert shard_one_migration.enqueued?

shard_two_migration = m.children.find_by(shard: :shard_two)
assert shard_two_migration.succeeded?
end

def test_run_retries_failed_migrations
m = create_migration(
name: "index_dogs_on_name",
Expand Down

0 comments on commit 9547786

Please sign in to comment.