Skip to content

Commit

Permalink
Migrator now has progress callback
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 26, 2024
1 parent f0fbcb2 commit 64f9d80
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 29 deletions.
14 changes: 11 additions & 3 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ func runServe(ccmd *cobra.Command, args []string) {
conf := migrator.NewMigratorConfig().WithBlockSize(block_size)
conf.LockerHandler = sourceStorage.Lock
conf.UnlockerHandler = sourceStorage.Unlock
conf.ProgressHandler = func(p *migrator.MigrationProgress) {

fmt.Printf("Progress Moved: %d/%d %.2f%% Clean: %d/%d %.2f%%\n",
p.MigratedBlocks, p.TotalBlocks, p.MigratedBlocksPerc,
p.ReadyBlocks, p.TotalBlocks, p.ReadyBlocksPerc)
}

mig, err := migrator.NewMigrator(sourceDirty,
dest,
Expand All @@ -187,7 +193,11 @@ func runServe(ccmd *cobra.Command, args []string) {

// Now do the migration...
err = mig.Migrate(num_blocks)
mig.ShowProgress()

err = mig.WaitForCompletion()
if err != nil {
panic(err)
}

for {
blocks := mig.GetLatestDirty()
Expand All @@ -203,7 +213,6 @@ func runServe(ccmd *cobra.Command, args []string) {
panic(err)
}
fmt.Printf("DIRTY BLOCKS %d\n", len(blocks))
mig.ShowProgress()
}

err = mig.WaitForCompletion()
Expand All @@ -212,7 +221,6 @@ func runServe(ccmd *cobra.Command, args []string) {
}

fmt.Printf("MIGRATION DONE %v\n", err)
mig.ShowProgress()

c.Close()
}
Expand Down
46 changes: 25 additions & 21 deletions pkg/storage/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package migrator

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -16,6 +15,7 @@ type MigratorConfig struct {
LockerHandler func()
UnlockerHandler func()
ErrorHandler func(b *storage.BlockInfo, err error)
ProgressHandler func(p *MigrationProgress)
Concurrency map[int]int
}

Expand All @@ -25,6 +25,7 @@ func NewMigratorConfig() *MigratorConfig {
LockerHandler: func() {},
UnlockerHandler: func() {},
ErrorHandler: func(b *storage.BlockInfo, err error) {},
ProgressHandler: func(p *MigrationProgress) {},
Concurrency: map[int]int{
storage.BlockTypeAny: 32,
storage.BlockTypeStandard: 32,
Expand All @@ -39,12 +40,21 @@ func (mc *MigratorConfig) WithBlockSize(bs int) *MigratorConfig {
return mc
}

type MigrationProgress struct {
TotalBlocks int // Total blocks
MigratedBlocks int // Number of blocks that have been migrated
MigratedBlocksPerc float64
ReadyBlocks int // Number of blocks which are up to date (clean). May go down as well as up.
ReadyBlocksPerc float64
}

type Migrator struct {
src_track storage.TrackingStorageProvider // Tracks writes so we know which are dirty
dest storage.StorageProvider
src_lock_fn func()
src_unlock_fn func()
error_fn func(block *storage.BlockInfo, err error)
progress_fn func(*MigrationProgress)
block_size int
num_blocks int
metric_moved_blocks int64
Expand All @@ -69,6 +79,7 @@ func NewMigrator(source storage.TrackingStorageProvider,
src_lock_fn: config.LockerHandler,
src_unlock_fn: config.UnlockerHandler,
error_fn: config.ErrorHandler,
progress_fn: config.ProgressHandler,
block_size: config.BlockSize,
num_blocks: num_blocks,
metric_moved_blocks: 0,
Expand Down Expand Up @@ -177,6 +188,7 @@ func (m *Migrator) MigrateDirty(blocks []uint) error {

m.clean_blocks.ClearBit(int(pos))
}
m.reportProgress()
return nil
}

Expand All @@ -185,31 +197,21 @@ func (m *Migrator) WaitForCompletion() error {
return nil
}

/**
* Show progress...
*
*/
func (m *Migrator) ShowProgress() {
func (m *Migrator) reportProgress() {
migrated := m.migrated_blocks.Count(0, uint(m.num_blocks))
perc_mig := float64(migrated*100) / float64(m.num_blocks)

completed := m.clean_blocks.Count(0, uint(m.num_blocks))
perc_complete := float64(completed*100) / float64(m.num_blocks)
is_complete := ""
if completed == m.num_blocks {
is_complete = " COMPLETE"
}
moved := atomic.LoadInt64(&m.metric_moved_blocks)
fmt.Printf("Migration %dms Migrated (%d/%d) [%.2f%%] Clean (%d/%d) [%.2f%%] %d blocks moved %s\n",
time.Since(m.ctime).Milliseconds(),
migrated,
m.num_blocks,
perc_mig,
completed,
m.num_blocks,
perc_complete,
moved,
is_complete)

// Callback
m.progress_fn(&MigrationProgress{
TotalBlocks: m.num_blocks,
MigratedBlocks: migrated,
MigratedBlocksPerc: perc_mig,
ReadyBlocks: completed,
ReadyBlocksPerc: perc_complete,
})
}

/**
Expand All @@ -235,5 +237,7 @@ func (m *Migrator) migrateBlock(block int) error {
// Mark it as done
m.migrated_blocks.SetBit(block)
m.clean_blocks.SetBit(block)

m.reportProgress()
return nil
}
5 changes: 0 additions & 5 deletions pkg/storage/migrator/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func TestMigratorSimple(t *testing.T) {

err = mig.WaitForCompletion()
assert.NoError(t, err)
mig.ShowProgress()

// This will end with migration completed, and consumer Locked.
eq, err := storage.Equals(sourceStorageMem, destStorage, blockSize)
Expand Down Expand Up @@ -149,7 +148,6 @@ func TestMigratorSimplePipe(t *testing.T) {

err = mig.WaitForCompletion()
assert.NoError(t, err)
mig.ShowProgress()

// This will end with migration completed, and consumer Locked.
eq, err := storage.Equals(sourceStorageMem, destStorage, blockSize)
Expand Down Expand Up @@ -283,12 +281,10 @@ func TestMigratorWithReaderWriter(t *testing.T) {
fmt.Printf("Got %d dirty blocks to move...\n", len(blocks))
err := mig.MigrateDirty(blocks)
assert.NoError(t, err)
mig.ShowProgress()
}

err = mig.WaitForCompletion()
assert.NoError(t, err)
mig.ShowProgress()

// This will end with migration completed, and consumer Locked.
eq, err := storage.Equals(sourceStorageMem, destStorage, blockSize)
Expand Down Expand Up @@ -439,7 +435,6 @@ func TestMigratorWithReaderWriterWrite(t *testing.T) {

err = mig.WaitForCompletion()
assert.NoError(t, err)
mig.ShowProgress()

assert.Equal(t, int64(num_blocks-num_local_blocks), mig.metric_moved_blocks)

Expand Down

0 comments on commit 64f9d80

Please sign in to comment.