From 069a13cba7556c7716804618207628eec19ff7e2 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Thu, 25 Apr 2024 16:01:56 +0100 Subject: [PATCH 01/21] First impl of extended dirty tracker, including max_age, subblocks and limit --- pkg/storage/dirtytracker/dirty_tracker.go | 169 ++++++++++++++++++---- 1 file changed, 141 insertions(+), 28 deletions(-) diff --git a/pkg/storage/dirtytracker/dirty_tracker.go b/pkg/storage/dirtytracker/dirty_tracker.go index 23c62f2d..0f0cacb7 100644 --- a/pkg/storage/dirtytracker/dirty_tracker.go +++ b/pkg/storage/dirtytracker/dirty_tracker.go @@ -1,7 +1,9 @@ package dirtytracker import ( + "sort" "sync" + "time" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/util" @@ -14,13 +16,15 @@ import ( */ type DirtyTracker struct { - prov storage.StorageProvider - size uint64 - block_size int - num_blocks int - dirty_log *util.Bitfield - tracking *util.Bitfield - write_lock sync.RWMutex + prov storage.StorageProvider + size uint64 + block_size int + num_blocks int + dirty_log *util.Bitfield + tracking *util.Bitfield + tracking_times map[uint]time.Time + tracking_lock sync.Mutex + write_lock sync.RWMutex } type DirtyTrackerLocal struct { @@ -75,28 +79,136 @@ func NewDirtyTracker(prov storage.StorageProvider, blockSize int) (*DirtyTracker size := int(prov.Size()) numBlocks := (size + blockSize - 1) / blockSize dt := &DirtyTracker{ - size: prov.Size(), - block_size: blockSize, - num_blocks: numBlocks, - prov: prov, - tracking: util.NewBitfield(numBlocks), - dirty_log: util.NewBitfield(numBlocks), + size: prov.Size(), + block_size: blockSize, + num_blocks: numBlocks, + prov: prov, + tracking: util.NewBitfield(numBlocks), + dirty_log: util.NewBitfield(numBlocks), + tracking_times: make(map[uint]time.Time), } return &DirtyTrackerLocal{dt: dt}, &DirtyTrackerRemote{dt: dt} } -func (i *DirtyTrackerRemote) Sync() *util.Bitfield { +func (i *DirtyTracker) trackArea(length int64, offset int64) { + end := uint64(offset + length) + if end > i.size { + end = i.size + } + + b_start := uint(offset / int64(i.block_size)) + b_end := uint((end-1)/uint64(i.block_size)) + 1 + + // Enable tracking for this area + i.tracking.SetBits(b_start, b_end) +} + +func (i *DirtyTracker) GetDirtyBlocks(max_age time.Duration, limit int, group_by_shift int) []uint { + grouped_blocks := make(map[uint][]uint) + + // First look for any dirty blocks past max_age + i.tracking_lock.Lock() + for b, t := range i.tracking_times { + if time.Since(t) > max_age { + grouped_b := b >> group_by_shift + v, ok := grouped_blocks[grouped_b] + if ok { + grouped_blocks[grouped_b] = append(v, b) + } else { + grouped_blocks[grouped_b] = []uint{b} + } + if len(grouped_blocks) == limit { + break + } + } + } + i.tracking_lock.Unlock() + + if len(grouped_blocks) < limit { + grouped_blocks_changed := make(map[uint][]uint) + // Now we also look for extra blocks based on how much they have changed... + bls := i.dirty_log.Collect(0, uint(i.num_blocks)) + for _, b := range bls { + grouped_b := b >> group_by_shift + v, ok := grouped_blocks_changed[grouped_b] + if ok { + grouped_blocks_changed[grouped_b] = append(v, b) + } else { + grouped_blocks_changed[grouped_b] = []uint{b} + } + } + + // Now sort by how much changed + keys := make([]uint, 0, len(grouped_blocks_changed)) + for key := range grouped_blocks { + keys = append(keys, key) + } + + // Sort the blocks by how many sub-blocks are dirty. + sort.SliceStable(keys, func(i, j int) bool { + return len(grouped_blocks[keys[i]]) < len(grouped_blocks[keys[j]]) + }) + + // Now add them into grouped_blocks if we can... + for { + if len(grouped_blocks) == limit { + break + } + if len(keys) == 0 { + break + } + // Pick one out of grouped_blocks_changed, and try to add it + k := keys[0] + keys = keys[1:] + // This may overwrite an existing entry which is max_age, but we'll have the same block + others here + grouped_blocks[k] = grouped_blocks_changed[k] + } + } + + // Clear out the tracking data here... It'll get added for tracking again on a readAt() + i.write_lock.Lock() + defer i.write_lock.Unlock() + + rblocks := make([]uint, 0) + for rb, blocks := range grouped_blocks { + for _, b := range blocks { + i.tracking.ClearBit(int(b)) + i.dirty_log.ClearBit(int(b)) + i.tracking_lock.Lock() + delete(i.tracking_times, b) + i.tracking_lock.Unlock() + } + rblocks = append(rblocks, rb) + } + + return rblocks +} + +func (i *DirtyTracker) GetAllDirtyBlocks() *util.Bitfield { // Prevent any writes while we do the Sync() - i.dt.write_lock.Lock() - defer i.dt.write_lock.Unlock() + i.write_lock.Lock() + defer i.write_lock.Unlock() - info := i.dt.dirty_log.Clone() + info := i.dirty_log.Clone() // Remove the dirty blocks from tracking... (They will get added again when a Read is performed to migrate the data) - i.dt.tracking.ClearBitsIf(info, 0, uint(i.dt.num_blocks)) + i.tracking.ClearBitsIf(info, 0, uint(i.num_blocks)) // Clear the dirty log. - i.dt.dirty_log.Clear() + i.dirty_log.Clear() + + blocks := info.Collect(0, info.Length()) + i.tracking_lock.Lock() + for _, b := range blocks { + delete(i.tracking_times, b) + } + i.tracking_lock.Unlock() + + return info +} + +func (i *DirtyTrackerRemote) Sync() *util.Bitfield { + info := i.dt.GetAllDirtyBlocks() return info } @@ -120,6 +232,14 @@ func (i *DirtyTracker) localWriteAt(buffer []byte, offset int64) (int, error) { b_end := uint((end-1)/uint64(i.block_size)) + 1 i.dirty_log.SetBitsIf(i.tracking, b_start, b_end) + + // Update tracking times for last block write + i.tracking_lock.Lock() + now := time.Now() + for b := b_start; b < b_end; b++ { + i.tracking_times[b] = now + } + i.tracking_lock.Unlock() } return n, err } @@ -133,16 +253,9 @@ func (i *DirtyTracker) localSize() uint64 { } func (i *DirtyTracker) remoteReadAt(buffer []byte, offset int64) (int, error) { - end := uint64(offset + int64(len(buffer))) - if end > i.size { - end = i.size - } - b_start := uint(offset / int64(i.block_size)) - b_end := uint((end-1)/uint64(i.block_size)) + 1 - - // Enable tracking for this area - i.tracking.SetBits(b_start, b_end) + // Start tracking dirty on the area we read. + i.trackArea(int64(len(buffer)), offset) // NB: A WriteAt could occur here, which would result in an incorrect dirty marking. // TODO: Do something to mitigate this without affecting performance. // Note though, that this is still preferable to tracking everything before it's been read for migration. From c74fa3075b8de51e725b1958aaeb622950eacdb8 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 26 Apr 2024 11:25:38 +0100 Subject: [PATCH 02/21] Moved GetDirtyBlocks, added min_changed param --- pkg/storage/dirtytracker/dirty_tracker.go | 32 +++++++++++++---------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/pkg/storage/dirtytracker/dirty_tracker.go b/pkg/storage/dirtytracker/dirty_tracker.go index 0f0cacb7..7a11b691 100644 --- a/pkg/storage/dirtytracker/dirty_tracker.go +++ b/pkg/storage/dirtytracker/dirty_tracker.go @@ -103,12 +103,12 @@ func (i *DirtyTracker) trackArea(length int64, offset int64) { i.tracking.SetBits(b_start, b_end) } -func (i *DirtyTracker) GetDirtyBlocks(max_age time.Duration, limit int, group_by_shift int) []uint { +func (i *DirtyTrackerRemote) GetDirtyBlocks(max_age time.Duration, limit int, group_by_shift int, min_changed int) []uint { grouped_blocks := make(map[uint][]uint) // First look for any dirty blocks past max_age - i.tracking_lock.Lock() - for b, t := range i.tracking_times { + i.dt.tracking_lock.Lock() + for b, t := range i.dt.tracking_times { if time.Since(t) > max_age { grouped_b := b >> group_by_shift v, ok := grouped_blocks[grouped_b] @@ -122,12 +122,13 @@ func (i *DirtyTracker) GetDirtyBlocks(max_age time.Duration, limit int, group_by } } } - i.tracking_lock.Unlock() + i.dt.tracking_lock.Unlock() + // Now we look for changed blocks, and sort by how much. if len(grouped_blocks) < limit { grouped_blocks_changed := make(map[uint][]uint) // Now we also look for extra blocks based on how much they have changed... - bls := i.dirty_log.Collect(0, uint(i.num_blocks)) + bls := i.dt.dirty_log.Collect(0, uint(i.dt.num_blocks)) for _, b := range bls { grouped_b := b >> group_by_shift v, ok := grouped_blocks_changed[grouped_b] @@ -160,23 +161,26 @@ func (i *DirtyTracker) GetDirtyBlocks(max_age time.Duration, limit int, group_by // Pick one out of grouped_blocks_changed, and try to add it k := keys[0] keys = keys[1:] - // This may overwrite an existing entry which is max_age, but we'll have the same block + others here - grouped_blocks[k] = grouped_blocks_changed[k] + + if len(grouped_blocks_changed[k]) >= min_changed { + // This may overwrite an existing entry which is max_age, but we'll have the same block + others here + grouped_blocks[k] = grouped_blocks_changed[k] + } } } // Clear out the tracking data here... It'll get added for tracking again on a readAt() - i.write_lock.Lock() - defer i.write_lock.Unlock() + i.dt.write_lock.Lock() + defer i.dt.write_lock.Unlock() rblocks := make([]uint, 0) for rb, blocks := range grouped_blocks { for _, b := range blocks { - i.tracking.ClearBit(int(b)) - i.dirty_log.ClearBit(int(b)) - i.tracking_lock.Lock() - delete(i.tracking_times, b) - i.tracking_lock.Unlock() + i.dt.tracking.ClearBit(int(b)) + i.dt.dirty_log.ClearBit(int(b)) + i.dt.tracking_lock.Lock() + delete(i.dt.tracking_times, b) + i.dt.tracking_lock.Unlock() } rblocks = append(rblocks, rb) } From 638d2b853ee1cea669e5cfa1e6ce7f680ed44e21 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 26 Apr 2024 11:46:44 +0100 Subject: [PATCH 03/21] Couple fixes, and start on tests --- pkg/storage/dirtytracker/dirty_tracker.go | 4 +- .../dirtytracker/dirty_tracker_test.go | 83 +++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/pkg/storage/dirtytracker/dirty_tracker.go b/pkg/storage/dirtytracker/dirty_tracker.go index 7a11b691..a2e45d86 100644 --- a/pkg/storage/dirtytracker/dirty_tracker.go +++ b/pkg/storage/dirtytracker/dirty_tracker.go @@ -141,13 +141,13 @@ func (i *DirtyTrackerRemote) GetDirtyBlocks(max_age time.Duration, limit int, gr // Now sort by how much changed keys := make([]uint, 0, len(grouped_blocks_changed)) - for key := range grouped_blocks { + for key := range grouped_blocks_changed { keys = append(keys, key) } // Sort the blocks by how many sub-blocks are dirty. sort.SliceStable(keys, func(i, j int) bool { - return len(grouped_blocks[keys[i]]) < len(grouped_blocks[keys[j]]) + return len(grouped_blocks_changed[keys[i]]) < len(grouped_blocks_changed[keys[j]]) }) // Now add them into grouped_blocks if we can... diff --git a/pkg/storage/dirtytracker/dirty_tracker_test.go b/pkg/storage/dirtytracker/dirty_tracker_test.go index 04664673..661b86b2 100644 --- a/pkg/storage/dirtytracker/dirty_tracker_test.go +++ b/pkg/storage/dirtytracker/dirty_tracker_test.go @@ -1,7 +1,9 @@ package dirtytracker import ( + "slices" "testing" + "time" "github.com/loopholelabs/silo/pkg/storage/modules" "github.com/loopholelabs/silo/pkg/storage/sources" @@ -46,3 +48,84 @@ func TestReadDirtyTracker(t *testing.T) { // There should be no dirty blocks assert.Equal(t, 0, b.Count(0, b.Length())) } + +func setupDirty(t *testing.T) *DirtyTrackerRemote { + // Create a new block storage, backed by memory storage + size := 1024 * 1024 * 32 + mem := sources.NewMemoryStorage(size) + trackerLocal, trackerRemote := NewDirtyTracker(mem, 4096) + + b := trackerRemote.Sync() + + // There should be no dirty blocks + assert.Equal(t, 0, b.Count(0, b.Length())) + + // Perform a read to start tracking dirty writes all over the place + buffer := make([]byte, size) + _, err := trackerRemote.ReadAt(buffer, 0) + assert.NoError(t, err) + + // Now do a few writes to make dirty blocks... + locs := []int64{10, 30, 10000, 40000} + for _, l := range locs { + w_buffer := make([]byte, 9000) + _, err = trackerLocal.WriteAt(w_buffer, l) + assert.NoError(t, err) + } + + return trackerRemote +} + +func TestReadDirtyTrackerLimits(t *testing.T) { + trackerRemote := setupDirty(t) + + // Check the dirty blocks + blocks := trackerRemote.GetDirtyBlocks(0, 100, 1, 0) + slices.Sort(blocks) + expected_blocks := []uint{0, 1, 2, 4, 5} + assert.Equal(t, expected_blocks, blocks) + + blocks = trackerRemote.GetDirtyBlocks(0, 100, 1, 0) + + // There should be no dirty blocks + assert.Equal(t, 0, len(blocks)) +} + +func TestReadDirtyTrackerMaxAge(t *testing.T) { + trackerRemote := setupDirty(t) + + // Check the dirty blocks + blocks := trackerRemote.GetDirtyBlocks(10*time.Millisecond, 100, 1, 1000) + assert.Equal(t, 0, len(blocks)) + + time.Sleep(10 * time.Millisecond) + + // Things should expire now... + blocks = trackerRemote.GetDirtyBlocks(10*time.Millisecond, 100, 1, 1000) + slices.Sort(blocks) + expected_blocks := []uint{0, 1, 2, 4, 5} + assert.Equal(t, expected_blocks, blocks) +} + +func TestReadDirtyTrackerMinChange(t *testing.T) { + trackerRemote := setupDirty(t) + + // Check the dirty blocks + blocks := trackerRemote.GetDirtyBlocks(time.Minute, 100, 1, 0) + slices.Sort(blocks) + expected_blocks := []uint{0, 1, 2, 4, 5} + assert.Equal(t, expected_blocks, blocks) + + trackerRemote = setupDirty(t) + blocks = trackerRemote.GetDirtyBlocks(time.Minute, 100, 1, 2) + slices.Sort(blocks) + expected_blocks = []uint{0, 1, 5} + assert.Equal(t, expected_blocks, blocks) + + trackerRemote = setupDirty(t) + blocks = trackerRemote.GetDirtyBlocks(time.Minute, 100, 2, 2) + slices.Sort(blocks) + expected_blocks = []uint{0, 2} + assert.Equal(t, expected_blocks, blocks) + +} From b3de8cd01113c5f18788a9e8ef03ea554a2dc5e4 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 26 Apr 2024 12:58:04 +0100 Subject: [PATCH 04/21] Added cmd to sync device to s3 --- cmd/sync.go | 257 ++++++++++++++++++ pkg/storage/dirtytracker/dirty_tracker.go | 6 + .../dirtytracker/dirty_tracker_test.go | 1 - 3 files changed, 263 insertions(+), 1 deletion(-) create mode 100644 cmd/sync.go diff --git a/cmd/sync.go b/cmd/sync.go new file mode 100644 index 00000000..e7bce529 --- /dev/null +++ b/cmd/sync.go @@ -0,0 +1,257 @@ +package main + +import ( + "fmt" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/blocks" + "github.com/loopholelabs/silo/pkg/storage/config" + "github.com/loopholelabs/silo/pkg/storage/device" + "github.com/loopholelabs/silo/pkg/storage/dirtytracker" + "github.com/loopholelabs/silo/pkg/storage/migrator" + "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/loopholelabs/silo/pkg/storage/volatilitymonitor" + "github.com/spf13/cobra" +) + +var ( + cmdSync = &cobra.Command{ + Use: "sync", + Short: "Continuous sync to s3", + Long: ``, + Run: runSync, + } +) + +var sync_conf string +var sync_endpoint string +var sync_access string +var sync_secret string +var sync_bucket string +var sync_block_size int + +var sync_exposed []storage.ExposedStorage +var sync_storage []*storageInfo + +func init() { + rootCmd.AddCommand(cmdSync) + cmdSync.Flags().StringVarP(&sync_conf, "conf", "c", "silo.conf", "Configuration file") + cmdSync.Flags().StringVarP(&sync_endpoint, "endpoint", "e", "", "S3 endpoint") + cmdSync.Flags().StringVarP(&sync_access, "access", "a", "", "S3 access") + cmdSync.Flags().StringVarP(&sync_secret, "secret", "s", "", "S3 secret") + cmdSync.Flags().StringVarP(&sync_bucket, "bucket", "b", "", "S3 bucket") + cmdSync.Flags().IntVarP(&sync_block_size, "blocksize", "l", 4*1024*1024, "S3 block size") +} + +func runSync(ccmd *cobra.Command, args []string) { + + sync_exposed = make([]storage.ExposedStorage, 0) + sync_storage = make([]*storageInfo, 0) + fmt.Printf("Starting silo s3 sync\n") + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + sync_shutdown_everything() + os.Exit(1) + }() + + siloConf, err := config.ReadSchema(sync_conf) + if err != nil { + panic(err) + } + + for i, s := range siloConf.Device { + + fmt.Printf("Setup storage %d [%s] size %s - %d\n", i, s.Name, s.Size, s.ByteSize()) + sinfo, err := setupSyncStorageDevice(s) + if err != nil { + panic(fmt.Sprintf("Could not setup storage. %v", err)) + } + src_storage = append(src_storage, sinfo) + } + + // Lets go through each of the things we want to migrate/sync + + var wg sync.WaitGroup + + for i, s := range src_storage { + wg.Add(1) + go func(index int, src *storageInfo) { + err := migrateDeviceS3(uint32(index), src.name, src) + if err != nil { + fmt.Printf("There was an issue migrating the storage %d %v\n", index, err) + } + wg.Done() + }(i, s) + } + wg.Wait() + + sync_shutdown_everything() +} + +func setupSyncStorageDevice(conf *config.DeviceSchema) (*storageInfo, error) { + block_size := sync_block_size // 1024 * 128 + + num_blocks := (int(conf.ByteSize()) + block_size - 1) / block_size + + source, ex, err := device.NewDevice(conf) + if err != nil { + return nil, err + } + if ex != nil { + fmt.Printf("Device %s exposed as %s\n", conf.Name, ex.Device()) + sync_exposed = append(sync_exposed, ex) + } + sourceMetrics := modules.NewMetrics(source) + sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(sourceMetrics, block_size) + sourceMonitor := volatilitymonitor.NewVolatilityMonitor(sourceDirtyLocal, block_size, 10*time.Second) + sourceStorage := modules.NewLockable(sourceMonitor) + + if ex != nil { + ex.SetProvider(sourceStorage) + } + + // Start monitoring blocks. + + go func() { + ticker := time.NewTicker(time.Second * 5) + for { + select { + case <-ticker.C: + fmt.Printf("Dirty %d blocks\n", sourceDirtyRemote.MeasureDirty()) + } + } + }() + + var primary_orderer storage.BlockOrder + primary_orderer = sourceMonitor + + if serve_any_order { + primary_orderer = blocks.NewAnyBlockOrder(num_blocks, nil) + } + orderer := blocks.NewPriorityBlockOrder(num_blocks, primary_orderer) + orderer.AddAll() + + return &storageInfo{ + tracker: sourceDirtyRemote, + lockable: sourceStorage, + orderer: orderer, + block_size: block_size, + num_blocks: num_blocks, + name: conf.Name, + }, nil +} + +func sync_shutdown_everything() { + // first unlock everything + fmt.Printf("Unlocking devices...\n") + for _, i := range sync_storage { + i.lockable.Unlock() + i.tracker.Close() + } + + fmt.Printf("Shutting down devices cleanly...\n") + for _, p := range sync_exposed { + device := p.Device() + + fmt.Printf("Shutdown nbd device %s\n", device) + _ = p.Shutdown() + } +} + +// Migrate a device +func migrateDeviceS3(dev_id uint32, name string, + sinfo *storageInfo) error { + + dest, err := sources.NewS3StorageCreate(sync_endpoint, + sync_access, + sync_secret, + sync_bucket, + sinfo.name, + sinfo.lockable.Size(), + sync_block_size) + + if err != nil { + return err + } + + conf := migrator.NewMigratorConfig().WithBlockSize(sync_block_size) + conf.Locker_handler = func() { + sinfo.lockable.Lock() + } + conf.Unlocker_handler = func() { + sinfo.lockable.Unlock() + } + conf.Concurrency = map[int]int{ + storage.BlockTypeAny: 1000000, + } + conf.Integrity = false + + conf.Progress_handler = func(p *migrator.MigrationProgress) { + fmt.Printf("[%s] Progress Moved: %d/%d %.2f%% Clean: %d/%d %.2f%% InProgress: %d\n", + name, p.Migrated_blocks, p.Total_blocks, p.Migrated_blocks_perc, + p.Ready_blocks, p.Total_blocks, p.Ready_blocks_perc, + p.Active_blocks) + } + conf.Error_handler = func(b *storage.BlockInfo, err error) { + fmt.Printf("[%s] Error for block %d error %v\n", name, b.Block, err) + } + + log_dest := modules.NewLogger(dest, "S3") + + mig, err := migrator.NewMigrator(sinfo.tracker, log_dest, sinfo.orderer, conf) + + if err != nil { + return err + } + + fmt.Printf("Doing migration...\n") + + // Now do the initial migration... + num_blocks := (sinfo.tracker.Size() + uint64(sync_block_size) - 1) / uint64(sync_block_size) + err = mig.Migrate(int(num_blocks)) + if err != nil { + return err + } + + fmt.Printf("Waiting...\n") + + // Wait for completion. + err = mig.WaitForCompletion() + if err != nil { + return err + } + + // Enter a loop looking for more dirty blocks to migrate... + + fmt.Printf("Dirty loop...\n") + + for { + blocks := mig.GetLatestDirty() // + + if blocks != nil { + err = mig.MigrateDirty(blocks) + if err != nil { + return err + } + } else { + mig.Unlock() + } + time.Sleep(100 * time.Millisecond) + } + + err = mig.WaitForCompletion() + if err != nil { + return err + } + + return nil +} diff --git a/pkg/storage/dirtytracker/dirty_tracker.go b/pkg/storage/dirtytracker/dirty_tracker.go index a2e45d86..36ecd879 100644 --- a/pkg/storage/dirtytracker/dirty_tracker.go +++ b/pkg/storage/dirtytracker/dirty_tracker.go @@ -103,6 +103,12 @@ func (i *DirtyTracker) trackArea(length int64, offset int64) { i.tracking.SetBits(b_start, b_end) } +func (i *DirtyTrackerRemote) MeasureDirty() int { + i.dt.tracking_lock.Lock() + defer i.dt.tracking_lock.Unlock() + return len(i.dt.tracking_times) +} + func (i *DirtyTrackerRemote) GetDirtyBlocks(max_age time.Duration, limit int, group_by_shift int, min_changed int) []uint { grouped_blocks := make(map[uint][]uint) diff --git a/pkg/storage/dirtytracker/dirty_tracker_test.go b/pkg/storage/dirtytracker/dirty_tracker_test.go index 661b86b2..6dc827c6 100644 --- a/pkg/storage/dirtytracker/dirty_tracker_test.go +++ b/pkg/storage/dirtytracker/dirty_tracker_test.go @@ -127,5 +127,4 @@ func TestReadDirtyTrackerMinChange(t *testing.T) { slices.Sort(blocks) expected_blocks = []uint{0, 2} assert.Equal(t, expected_blocks, blocks) - } From cd6da17b7ab6190c562926228d5367b8b2cd3a52 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 26 Apr 2024 13:28:08 +0100 Subject: [PATCH 05/21] Baseline basic s3 sync cmd --- cmd/sync.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index e7bce529..b79d320a 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -121,16 +121,6 @@ func setupSyncStorageDevice(conf *config.DeviceSchema) (*storageInfo, error) { // Start monitoring blocks. - go func() { - ticker := time.NewTicker(time.Second * 5) - for { - select { - case <-ticker.C: - fmt.Printf("Dirty %d blocks\n", sourceDirtyRemote.MeasureDirty()) - } - } - }() - var primary_orderer storage.BlockOrder primary_orderer = sourceMonitor @@ -183,6 +173,8 @@ func migrateDeviceS3(dev_id uint32, name string, return err } + dest_metrics := modules.NewMetrics(dest) + conf := migrator.NewMigratorConfig().WithBlockSize(sync_block_size) conf.Locker_handler = func() { sinfo.lockable.Lock() @@ -200,12 +192,13 @@ func migrateDeviceS3(dev_id uint32, name string, name, p.Migrated_blocks, p.Total_blocks, p.Migrated_blocks_perc, p.Ready_blocks, p.Total_blocks, p.Ready_blocks_perc, p.Active_blocks) + dest_metrics.ShowStats("S3") } conf.Error_handler = func(b *storage.BlockInfo, err error) { fmt.Printf("[%s] Error for block %d error %v\n", name, b.Block, err) } - log_dest := modules.NewLogger(dest, "S3") + log_dest := modules.NewLogger(dest_metrics, "S3") mig, err := migrator.NewMigrator(sinfo.tracker, log_dest, sinfo.orderer, conf) @@ -238,6 +231,7 @@ func migrateDeviceS3(dev_id uint32, name string, blocks := mig.GetLatestDirty() // if blocks != nil { + fmt.Printf("Dirty blocks %v\n", blocks) err = mig.MigrateDirty(blocks) if err != nil { return err From 4220c2165e51311be7c49f5d33cc643687aacc25 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Mon, 29 Apr 2024 11:45:34 +0100 Subject: [PATCH 06/21] Migrator can now get dirty using func --- cmd/sync.go | 21 ++++++++++++++------ pkg/storage/migrator/migrator.go | 34 +++++++++++++++++++++++++------- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index b79d320a..0a175029 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -206,13 +206,22 @@ func migrateDeviceS3(dev_id uint32, name string, return err } - fmt.Printf("Doing migration...\n") - - // Now do the initial migration... num_blocks := (sinfo.tracker.Size() + uint64(sync_block_size) - 1) / uint64(sync_block_size) - err = mig.Migrate(int(num_blocks)) - if err != nil { - return err + + // NB: We only need to do this for existing sources. + /* + fmt.Printf("Doing migration...\n") + + // Now do the initial migration... + err = mig.Migrate(int(num_blocks)) + if err != nil { + return err + } + */ + + // Since it's a new source, it's all zeros. We don't need to do an initial migration. + for b := 0; b < int(num_blocks); b++ { + mig.SetMigratedBlock(b) } fmt.Printf("Waiting...\n") diff --git a/pkg/storage/migrator/migrator.go b/pkg/storage/migrator/migrator.go index 105b3e61..9b327fe9 100644 --- a/pkg/storage/migrator/migrator.go +++ b/pkg/storage/migrator/migrator.go @@ -117,6 +117,19 @@ func NewMigrator(source storage.TrackingStorageProvider, return m, nil } +/** + * Set a block to already migrated state + */ +func (m *Migrator) SetMigratedBlock(block int) { + m.block_locks[block].Lock() + defer m.block_locks[block].Unlock() + // Mark it as done + m.migrated_blocks.SetBit(block) + m.clean_blocks.SetBit(block) + + m.reportProgress(false) +} + /** * Migrate storage to dest. */ @@ -159,16 +172,23 @@ func (m *Migrator) Migrate(num_blocks int) error { * If there a no more dirty blocks, we leave the src locked. */ func (m *Migrator) GetLatestDirty() []uint { - // Queue up some dirty blocks + getter := func() []uint { + blocks := m.source_tracker.Sync() + return blocks.Collect(0, blocks.Length()) + } + return m.GetLatestDirtyFunc(getter) +} + +/** + * Get the latest dirty blocks. + * If there a no more dirty blocks, we leave the src locked. + */ +func (m *Migrator) GetLatestDirtyFunc(getter func() []uint) []uint { m.source_lock_fn() - // Check for any dirty blocks to be added on - blocks := m.source_tracker.Sync() - changed := blocks.Count(0, blocks.Length()) - if changed != 0 { + block_nos := getter() + if len(block_nos) != 0 { m.source_unlock_fn() - - block_nos := blocks.Collect(0, blocks.Length()) return block_nos } return nil From 99802fe4976af7c6a501061d01561b31812b7488 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Tue, 30 Apr 2024 12:35:36 +0100 Subject: [PATCH 07/21] Added CancelWrites for high latency sources eg S3 --- cmd/sync.go | 199 +++++++++++++++--- pkg/storage/dirtytracker/dirty_tracker.go | 78 +++++-- pkg/storage/expose/nbd.go | 4 + pkg/storage/modules/artificial_latency.go | 4 + pkg/storage/modules/block_splitter.go | 4 + pkg/storage/modules/copy_on_write.go | 4 + .../modules/filter_redundant_writes.go | 4 + pkg/storage/modules/hooks.go | 4 + pkg/storage/modules/lockable.go | 4 + pkg/storage/modules/logger.go | 5 + pkg/storage/modules/metrics.go | 4 + pkg/storage/modules/metrics_test.go | 2 +- pkg/storage/modules/nothing.go | 13 +- pkg/storage/modules/raid.go | 4 + pkg/storage/modules/read_only_gate.go | 4 + pkg/storage/modules/sharded_storage.go | 4 + pkg/storage/protocol/to_protocol.go | 4 + pkg/storage/sources/file_storage.go | 2 + pkg/storage/sources/file_storage_sparse.go | 2 + pkg/storage/sources/memory_storage.go | 2 + pkg/storage/sources/s3_storage.go | 56 ++++- pkg/storage/storage.go | 1 + .../volatilitymonitor/volatility_monitor.go | 4 + .../waitingcache/waiting_cache_local.go | 4 + .../waitingcache/waiting_cache_remote.go | 4 + 25 files changed, 361 insertions(+), 59 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index 0a175029..2b667ea1 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -1,7 +1,10 @@ package main import ( + "context" + crand "crypto/rand" "fmt" + "math/rand" "os" "os/signal" "sync" @@ -35,9 +38,26 @@ var sync_access string var sync_secret string var sync_bucket string var sync_block_size int +var sync_time_limit time.Duration +var sync_random_writes bool +var sync_write_period time.Duration +var sync_dirty_block_shift int +var sync_dummy bool var sync_exposed []storage.ExposedStorage -var sync_storage []*storageInfo +var sync_storage []*syncStorageInfo + +var write_rand_source *rand.Rand + +type syncStorageInfo struct { + tracker *dirtytracker.DirtyTrackerRemote + lockable storage.LockableStorageProvider + orderer *blocks.PriorityBlockOrder + num_blocks int + block_size int + name string + destMetrics *modules.Metrics +} func init() { rootCmd.AddCommand(cmdSync) @@ -47,12 +67,19 @@ func init() { cmdSync.Flags().StringVarP(&sync_secret, "secret", "s", "", "S3 secret") cmdSync.Flags().StringVarP(&sync_bucket, "bucket", "b", "", "S3 bucket") cmdSync.Flags().IntVarP(&sync_block_size, "blocksize", "l", 4*1024*1024, "S3 block size") + cmdSync.Flags().DurationVarP(&sync_time_limit, "timelimit", "t", 10*time.Second, "Sync time limit") + cmdSync.Flags().BoolVarP(&sync_random_writes, "randomwrites", "r", false, "Perform random writes") + cmdSync.Flags().DurationVarP(&sync_write_period, "writeperiod", "w", 100*time.Millisecond, "Random write period") + cmdSync.Flags().IntVarP(&sync_dirty_block_shift, "dirtyshift", "d", 10, "Dirty tracker block shift") + cmdSync.Flags().BoolVarP(&sync_dummy, "dummy", "y", false, "Dummy destination") } func runSync(ccmd *cobra.Command, args []string) { + write_rand_source = rand.New(rand.NewSource(1)) + sync_exposed = make([]storage.ExposedStorage, 0) - sync_storage = make([]*storageInfo, 0) + sync_storage = make([]*syncStorageInfo, 0) fmt.Printf("Starting silo s3 sync\n") c := make(chan os.Signal, 1) @@ -75,16 +102,16 @@ func runSync(ccmd *cobra.Command, args []string) { if err != nil { panic(fmt.Sprintf("Could not setup storage. %v", err)) } - src_storage = append(src_storage, sinfo) + sync_storage = append(sync_storage, sinfo) } // Lets go through each of the things we want to migrate/sync var wg sync.WaitGroup - for i, s := range src_storage { + for i, s := range sync_storage { wg.Add(1) - go func(index int, src *storageInfo) { + go func(index int, src *syncStorageInfo) { err := migrateDeviceS3(uint32(index), src.name, src) if err != nil { fmt.Printf("There was an issue migrating the storage %d %v\n", index, err) @@ -97,7 +124,11 @@ func runSync(ccmd *cobra.Command, args []string) { sync_shutdown_everything() } -func setupSyncStorageDevice(conf *config.DeviceSchema) (*storageInfo, error) { +/** + * Setup a storage device for sync command + * + */ +func setupSyncStorageDevice(conf *config.DeviceSchema) (*syncStorageInfo, error) { block_size := sync_block_size // 1024 * 128 num_blocks := (int(conf.ByteSize()) + block_size - 1) / block_size @@ -111,7 +142,10 @@ func setupSyncStorageDevice(conf *config.DeviceSchema) (*storageInfo, error) { sync_exposed = append(sync_exposed, ex) } sourceMetrics := modules.NewMetrics(source) - sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(sourceMetrics, block_size) + + dirty_block_size := block_size >> sync_dirty_block_shift + + sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(sourceMetrics, dirty_block_size) sourceMonitor := volatilitymonitor.NewVolatilityMonitor(sourceDirtyLocal, block_size, 10*time.Second) sourceStorage := modules.NewLockable(sourceMonitor) @@ -130,13 +164,32 @@ func setupSyncStorageDevice(conf *config.DeviceSchema) (*storageInfo, error) { orderer := blocks.NewPriorityBlockOrder(num_blocks, primary_orderer) orderer.AddAll() - return &storageInfo{ - tracker: sourceDirtyRemote, - lockable: sourceStorage, - orderer: orderer, - block_size: block_size, - num_blocks: num_blocks, - name: conf.Name, + var dest storage.StorageProvider + + if sync_dummy { + dest = modules.NewNothing(sourceStorage.Size()) + } else { + dest, err = sources.NewS3StorageCreate(sync_endpoint, + sync_access, + sync_secret, + sync_bucket, + conf.Name, + sourceStorage.Size(), + sync_block_size) + + if err != nil { + return nil, err + } + } + + return &syncStorageInfo{ + tracker: sourceDirtyRemote, + lockable: sourceStorage, + orderer: orderer, + block_size: block_size, + num_blocks: num_blocks, + name: conf.Name, + destMetrics: modules.NewMetrics(dest), }, nil } @@ -146,6 +199,8 @@ func sync_shutdown_everything() { for _, i := range sync_storage { i.lockable.Unlock() i.tracker.Close() + // Show some stats + i.destMetrics.ShowStats(i.name) } fmt.Printf("Shutting down devices cleanly...\n") @@ -159,21 +214,9 @@ func sync_shutdown_everything() { // Migrate a device func migrateDeviceS3(dev_id uint32, name string, - sinfo *storageInfo) error { + sinfo *syncStorageInfo) error { - dest, err := sources.NewS3StorageCreate(sync_endpoint, - sync_access, - sync_secret, - sync_bucket, - sinfo.name, - sinfo.lockable.Size(), - sync_block_size) - - if err != nil { - return err - } - - dest_metrics := modules.NewMetrics(dest) + dest_metrics := modules.NewMetrics(sinfo.destMetrics) conf := migrator.NewMigratorConfig().WithBlockSize(sync_block_size) conf.Locker_handler = func() { @@ -183,7 +226,7 @@ func migrateDeviceS3(dev_id uint32, name string, sinfo.lockable.Unlock() } conf.Concurrency = map[int]int{ - storage.BlockTypeAny: 1000000, + storage.BlockTypeAny: 8, } conf.Integrity = false @@ -206,6 +249,41 @@ func migrateDeviceS3(dev_id uint32, name string, return err } + ctx, cancelFn := context.WithCancel(context.TODO()) + + // Do random writes to the device for testing + // + if sync_random_writes { + go func() { + dev_size := int(sinfo.tracker.Size()) + t := time.NewTicker(sync_write_period) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + // Do a random write to the device... + sizes := []int{4 * 1024, 8 * 1024, 16 * 1024, 32 * 1024} + size := sizes[write_rand_source.Intn(len(sizes))] + offset := write_rand_source.Intn(dev_size) + // Now align, and shift if needed + if offset+size > dev_size { + offset = dev_size - size + } + offset = 4096 * (offset / 4096) + // Now do a write... + buffer := make([]byte, size) + crand.Read(buffer) + fmt.Printf("-Write- %d %d\n", offset, len(buffer)) + _, err := sinfo.lockable.WriteAt(buffer, int64(offset)) + if err != nil { + panic(err) + } + } + } + }() + } + num_blocks := (sinfo.tracker.Size() + uint64(sync_block_size) - 1) / uint64(sync_block_size) // NB: We only need to do this for existing sources. @@ -224,6 +302,8 @@ func migrateDeviceS3(dev_id uint32, name string, mig.SetMigratedBlock(b) } + sinfo.tracker.TrackAt(0, int64(sinfo.tracker.Size())) + fmt.Printf("Waiting...\n") // Wait for completion. @@ -236,11 +316,45 @@ func migrateDeviceS3(dev_id uint32, name string, fmt.Printf("Dirty loop...\n") + // Dirty block selection params go here + max_age := 5000 * time.Millisecond + limit := 4 + min_changed := 8 + // + + getter := func() []uint { + return sinfo.tracker.GetDirtyBlocks(max_age, limit, sync_dirty_block_shift, min_changed) + } + + ctime := time.Now() + + block_histo := make(map[uint]int) + for { - blocks := mig.GetLatestDirty() // + if time.Since(ctime) > sync_time_limit { + break + } + + // Show dirty status... + ood := sinfo.tracker.MeasureDirty() + ood_age := sinfo.tracker.MeasureDirtyAge() + fmt.Printf("DIRTY STATUS %dms old, with %d blocks\n", time.Since(ood_age).Milliseconds(), ood) + + blocks := mig.GetLatestDirtyFunc(getter) // if blocks != nil { fmt.Printf("Dirty blocks %v\n", blocks) + + // Update the histogram for blocks... + for _, b := range blocks { + _, ok := block_histo[b] + if ok { + block_histo[b]++ + } else { + block_histo[b] = 1 + } + } + err = mig.MigrateDirty(blocks) if err != nil { return err @@ -251,10 +365,35 @@ func migrateDeviceS3(dev_id uint32, name string, time.Sleep(100 * time.Millisecond) } + cancelFn() // Stop the write loop + err = mig.WaitForCompletion() if err != nil { return err } + // Check the histogram... + counts := make(map[int]int) + for _, count := range block_histo { + _, ok := counts[count] + if ok { + counts[count]++ + } else { + counts[count] = 1 + } + } + + for c, cc := range counts { + fmt.Printf("Write histo %d - %d\n", c, cc) + } + + ood := sinfo.tracker.MeasureDirty() + ood_age := sinfo.tracker.MeasureDirtyAge() + + fmt.Printf("DIRTY STATUS %dms old, with %d blocks\n", time.Since(ood_age).Milliseconds(), ood) + + // Check how many dirty blocks were left, and how out of date we are... + //left_blocks := sinfo.tracker.GetDirtyBlocks(0, 1000000, sync_dirty_block_shift, 0) + return nil } diff --git a/pkg/storage/dirtytracker/dirty_tracker.go b/pkg/storage/dirtytracker/dirty_tracker.go index 36ecd879..1220dbd4 100644 --- a/pkg/storage/dirtytracker/dirty_tracker.go +++ b/pkg/storage/dirtytracker/dirty_tracker.go @@ -1,6 +1,7 @@ package dirtytracker import ( + "fmt" "sort" "sync" "time" @@ -51,6 +52,10 @@ func (dtl *DirtyTrackerLocal) Close() error { return dtl.dt.prov.Close() } +func (dtl *DirtyTrackerLocal) CancelWrites(offset int64, length int64) { + dtl.dt.prov.CancelWrites(offset, length) +} + type DirtyTrackerRemote struct { dt *DirtyTracker } @@ -75,6 +80,10 @@ func (dtl *DirtyTrackerRemote) Close() error { return dtl.dt.prov.Close() } +func (dtl *DirtyTrackerRemote) CancelWrites(offset int64, length int64) { + dtl.dt.prov.CancelWrites(offset, length) +} + func NewDirtyTracker(prov storage.StorageProvider, blockSize int) (*DirtyTrackerLocal, *DirtyTrackerRemote) { size := int(prov.Size()) numBlocks := (size + blockSize - 1) / blockSize @@ -103,13 +112,56 @@ func (i *DirtyTracker) trackArea(length int64, offset int64) { i.tracking.SetBits(b_start, b_end) } +/** + * Start tracking at the given offset and length + * + */ +func (i *DirtyTrackerRemote) TrackAt(offset int64, length int64) { + i.dt.trackArea(length, offset) +} + +/** + * Get a quick measure of how many blocks are currently dirty + * + */ func (i *DirtyTrackerRemote) MeasureDirty() int { i.dt.tracking_lock.Lock() defer i.dt.tracking_lock.Unlock() + + fmt.Printf("Dirty blocks=%d size=%d %d / %d\n", i.dt.num_blocks, i.dt.size, i.dt.tracking.Count(0, uint(i.dt.num_blocks)), i.dt.dirty_log.Count(0, uint(i.dt.num_blocks))) + return len(i.dt.tracking_times) } +/** + * Get a quick measure of the oldest dirty block + * + */ +func (i *DirtyTrackerRemote) MeasureDirtyAge() time.Time { + i.dt.tracking_lock.Lock() + defer i.dt.tracking_lock.Unlock() + min_age := time.Now() + for _, t := range i.dt.tracking_times { + if t.Before(min_age) { + min_age = t + } + } + return min_age +} + +/** + * Get some dirty blocks using the given criteria + * + * - max_age - Returns blocks older than the given duration as priority + * - limit - Limits how many blocks are returned + * - group_by_shift - Groups subblocks into blocks using the given shift + * - min_changed - Minimum subblock changes in a block + */ func (i *DirtyTrackerRemote) GetDirtyBlocks(max_age time.Duration, limit int, group_by_shift int, min_changed int) []uint { + // Prevent any writes while we get dirty blocks + i.dt.write_lock.Lock() + defer i.dt.write_lock.Unlock() + grouped_blocks := make(map[uint][]uint) // First look for any dirty blocks past max_age @@ -153,7 +205,7 @@ func (i *DirtyTrackerRemote) GetDirtyBlocks(max_age time.Duration, limit int, gr // Sort the blocks by how many sub-blocks are dirty. sort.SliceStable(keys, func(i, j int) bool { - return len(grouped_blocks_changed[keys[i]]) < len(grouped_blocks_changed[keys[j]]) + return len(grouped_blocks_changed[keys[i]]) > len(grouped_blocks_changed[keys[j]]) }) // Now add them into grouped_blocks if we can... @@ -176,9 +228,6 @@ func (i *DirtyTrackerRemote) GetDirtyBlocks(max_age time.Duration, limit int, gr } // Clear out the tracking data here... It'll get added for tracking again on a readAt() - i.dt.write_lock.Lock() - defer i.dt.write_lock.Unlock() - rblocks := make([]uint, 0) for rb, blocks := range grouped_blocks { for _, b := range blocks { @@ -190,35 +239,34 @@ func (i *DirtyTrackerRemote) GetDirtyBlocks(max_age time.Duration, limit int, gr } rblocks = append(rblocks, rb) } - return rblocks } -func (i *DirtyTracker) GetAllDirtyBlocks() *util.Bitfield { +func (i *DirtyTrackerRemote) GetAllDirtyBlocks() *util.Bitfield { // Prevent any writes while we do the Sync() - i.write_lock.Lock() - defer i.write_lock.Unlock() + i.dt.write_lock.Lock() + defer i.dt.write_lock.Unlock() - info := i.dirty_log.Clone() + info := i.dt.dirty_log.Clone() // Remove the dirty blocks from tracking... (They will get added again when a Read is performed to migrate the data) - i.tracking.ClearBitsIf(info, 0, uint(i.num_blocks)) + i.dt.tracking.ClearBitsIf(info, 0, uint(i.dt.num_blocks)) // Clear the dirty log. - i.dirty_log.Clear() + i.dt.dirty_log.Clear() blocks := info.Collect(0, info.Length()) - i.tracking_lock.Lock() + i.dt.tracking_lock.Lock() for _, b := range blocks { - delete(i.tracking_times, b) + delete(i.dt.tracking_times, b) } - i.tracking_lock.Unlock() + i.dt.tracking_lock.Unlock() return info } func (i *DirtyTrackerRemote) Sync() *util.Bitfield { - info := i.dt.GetAllDirtyBlocks() + info := i.GetAllDirtyBlocks() return info } diff --git a/pkg/storage/expose/nbd.go b/pkg/storage/expose/nbd.go index 78afef2b..91fefaa2 100644 --- a/pkg/storage/expose/nbd.go +++ b/pkg/storage/expose/nbd.go @@ -73,6 +73,10 @@ func (i *ExposedStorageNBDNL) Close() error { return i.prov.Close() } +func (i *ExposedStorageNBDNL) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} + func (n *ExposedStorageNBDNL) Device() string { return fmt.Sprintf("nbd%d", n.device_index) } diff --git a/pkg/storage/modules/artificial_latency.go b/pkg/storage/modules/artificial_latency.go index e5584169..cf9ccc84 100644 --- a/pkg/storage/modules/artificial_latency.go +++ b/pkg/storage/modules/artificial_latency.go @@ -66,3 +66,7 @@ func (i *ArtificialLatency) Size() uint64 { func (i *ArtificialLatency) Close() error { return i.prov.Close() } + +func (i *ArtificialLatency) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} diff --git a/pkg/storage/modules/block_splitter.go b/pkg/storage/modules/block_splitter.go index 4c42c813..9a6a8479 100644 --- a/pkg/storage/modules/block_splitter.go +++ b/pkg/storage/modules/block_splitter.go @@ -148,3 +148,7 @@ func (i *BlockSplitter) Size() uint64 { func (i *BlockSplitter) Close() error { return i.prov.Close() } + +func (i *BlockSplitter) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} diff --git a/pkg/storage/modules/copy_on_write.go b/pkg/storage/modules/copy_on_write.go index bee4c7e4..1d136dda 100644 --- a/pkg/storage/modules/copy_on_write.go +++ b/pkg/storage/modules/copy_on_write.go @@ -250,3 +250,7 @@ func (i *CopyOnWrite) Close() error { i.Close_fn() return nil } + +func (i *CopyOnWrite) CancelWrites(offset int64, length int64) { + i.cache.CancelWrites(offset, length) +} diff --git a/pkg/storage/modules/filter_redundant_writes.go b/pkg/storage/modules/filter_redundant_writes.go index 87de6afb..4127413e 100644 --- a/pkg/storage/modules/filter_redundant_writes.go +++ b/pkg/storage/modules/filter_redundant_writes.go @@ -82,3 +82,7 @@ func (i *FilterRedundantWrites) Size() uint64 { func (i *FilterRedundantWrites) Close() error { return i.prov.Close() } + +func (i *FilterRedundantWrites) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} diff --git a/pkg/storage/modules/hooks.go b/pkg/storage/modules/hooks.go index 69e9416d..7ebf9f5c 100644 --- a/pkg/storage/modules/hooks.go +++ b/pkg/storage/modules/hooks.go @@ -61,3 +61,7 @@ func (i *Hooks) Size() uint64 { func (i *Hooks) Close() error { return i.prov.Close() } + +func (i *Hooks) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} diff --git a/pkg/storage/modules/lockable.go b/pkg/storage/modules/lockable.go index 5c573252..03985869 100644 --- a/pkg/storage/modules/lockable.go +++ b/pkg/storage/modules/lockable.go @@ -57,6 +57,10 @@ func (i *Lockable) Close() error { return i.prov.Close() } +func (i *Lockable) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} + func (i *Lockable) Unlock() { i.lock.L.Lock() defer i.lock.L.Unlock() diff --git a/pkg/storage/modules/logger.go b/pkg/storage/modules/logger.go index caf24b32..8079af9d 100644 --- a/pkg/storage/modules/logger.go +++ b/pkg/storage/modules/logger.go @@ -58,3 +58,8 @@ func (i *Logger) Size() uint64 { func (i *Logger) Close() error { return i.prov.Close() } + +func (i *Logger) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) + // TODO: Implement +} diff --git a/pkg/storage/modules/metrics.go b/pkg/storage/modules/metrics.go index 87a1670c..eab82e69 100644 --- a/pkg/storage/modules/metrics.go +++ b/pkg/storage/modules/metrics.go @@ -160,3 +160,7 @@ func (i *Metrics) Size() uint64 { func (i *Metrics) Close() error { return i.prov.Close() } + +func (i *Metrics) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} diff --git a/pkg/storage/modules/metrics_test.go b/pkg/storage/modules/metrics_test.go index 782dfb62..0c408807 100644 --- a/pkg/storage/modules/metrics_test.go +++ b/pkg/storage/modules/metrics_test.go @@ -7,7 +7,7 @@ import ( ) func TestMetrics(t *testing.T) { - met := NewMetrics(NewNothing()) + met := NewMetrics(NewNothing(0)) // Do some things... diff --git a/pkg/storage/modules/nothing.go b/pkg/storage/modules/nothing.go index bfa804ab..c5890075 100644 --- a/pkg/storage/modules/nothing.go +++ b/pkg/storage/modules/nothing.go @@ -6,10 +6,13 @@ package modules */ type Nothing struct { + size uint64 } -func NewNothing() *Nothing { - return &Nothing{} +func NewNothing(size uint64) *Nothing { + return &Nothing{ + size: size, + } } func (i *Nothing) ReadAt(buffer []byte, offset int64) (int, error) { @@ -25,9 +28,13 @@ func (i *Nothing) Flush() error { } func (i *Nothing) Size() uint64 { - return 0 + return i.size } func (i *Nothing) Close() error { return nil } + +func (i *Nothing) CancelWrites(offset int64, length int64) { + // TODO: Implement +} diff --git a/pkg/storage/modules/raid.go b/pkg/storage/modules/raid.go index 08b94044..13d68fcd 100644 --- a/pkg/storage/modules/raid.go +++ b/pkg/storage/modules/raid.go @@ -93,3 +93,7 @@ func (r *Raid) Close() error { } return err } + +func (i *Raid) CancelWrites(offset int64, length int64) { + // TODO: Implement +} diff --git a/pkg/storage/modules/read_only_gate.go b/pkg/storage/modules/read_only_gate.go index 9d16cb90..5bcf8b84 100644 --- a/pkg/storage/modules/read_only_gate.go +++ b/pkg/storage/modules/read_only_gate.go @@ -50,6 +50,10 @@ func (i *ReadOnlyGate) Close() error { return i.prov.Close() } +func (i *ReadOnlyGate) CancelWrites(offset int64, length int64) { + // TODO: Implement +} + func (i *ReadOnlyGate) Unlock() { i.lock.L.Lock() i.locked = false diff --git a/pkg/storage/modules/sharded_storage.go b/pkg/storage/modules/sharded_storage.go index db48a538..9e84e251 100644 --- a/pkg/storage/modules/sharded_storage.go +++ b/pkg/storage/modules/sharded_storage.go @@ -164,3 +164,7 @@ func (i *ShardedStorage) Close() error { } return err } + +func (i *ShardedStorage) CancelWrites(offset int64, length int64) { + // TODO: Implement +} diff --git a/pkg/storage/protocol/to_protocol.go b/pkg/storage/protocol/to_protocol.go index 0d6d357e..e181b043 100644 --- a/pkg/storage/protocol/to_protocol.go +++ b/pkg/storage/protocol/to_protocol.go @@ -145,6 +145,10 @@ func (i *ToProtocol) Close() error { return nil } +func (i *ToProtocol) CancelWrites(offset int64, length int64) { + // TODO: Implement +} + // Handle any NeedAt commands, and send to an orderer... func (i *ToProtocol) HandleNeedAt(cb func(offset int64, length int32)) error { for { diff --git a/pkg/storage/sources/file_storage.go b/pkg/storage/sources/file_storage.go index e49a2c75..26b91c47 100644 --- a/pkg/storage/sources/file_storage.go +++ b/pkg/storage/sources/file_storage.go @@ -78,3 +78,5 @@ func (i *FileStorage) Close() error { i.wg.Wait() return i.fp.Close() } + +func (i *FileStorage) CancelWrites(offset int64, length int64) {} diff --git a/pkg/storage/sources/file_storage_sparse.go b/pkg/storage/sources/file_storage_sparse.go index 2df7f1ed..fbc38b9a 100644 --- a/pkg/storage/sources/file_storage_sparse.go +++ b/pkg/storage/sources/file_storage_sparse.go @@ -267,3 +267,5 @@ func (i *FileStorageSparse) Flush() error { func (i *FileStorageSparse) Size() uint64 { return uint64(i.size) } + +func (i *FileStorageSparse) CancelWrites(offset int64, length int64) {} diff --git a/pkg/storage/sources/memory_storage.go b/pkg/storage/sources/memory_storage.go index e48fd30a..6d44866a 100644 --- a/pkg/storage/sources/memory_storage.go +++ b/pkg/storage/sources/memory_storage.go @@ -45,3 +45,5 @@ func (i *MemoryStorage) Size() uint64 { func (i *MemoryStorage) Close() error { return nil } + +func (i *MemoryStorage) CancelWrites(offset int64, length int64) {} diff --git a/pkg/storage/sources/s3_storage.go b/pkg/storage/sources/s3_storage.go index 1e31908e..a7d0b236 100644 --- a/pkg/storage/sources/s3_storage.go +++ b/pkg/storage/sources/s3_storage.go @@ -19,12 +19,14 @@ var ( */ type S3Storage struct { - client *minio.Client - bucket string - prefix string - size uint64 - block_size int - lockers []sync.RWMutex + client *minio.Client + bucket string + prefix string + size uint64 + block_size int + lockers []sync.RWMutex + contexts []context.CancelFunc + contexts_lock sync.Mutex } func NewS3Storage(endpoint string, @@ -53,6 +55,7 @@ func NewS3Storage(endpoint string, bucket: bucket, prefix: prefix, lockers: make([]sync.RWMutex, numBlocks), + contexts: make([]context.CancelFunc, numBlocks), }, nil } @@ -105,9 +108,26 @@ func NewS3StorageCreate(endpoint string, bucket: bucket, prefix: prefix, lockers: make([]sync.RWMutex, numBlocks), + contexts: make([]context.CancelFunc, numBlocks), }, nil } +func (i *S3Storage) setContext(block int, cancel context.CancelFunc) { + i.contexts_lock.Lock() + ex := i.contexts[block] + if ex != nil { + ex() // Cancel any existing context for this block + } + i.contexts[block] = cancel // Set to the new context + i.contexts_lock.Unlock() +} + +func (i *S3Storage) clearContext(block int) { + i.contexts_lock.Lock() + i.contexts[block] = nil + i.contexts_lock.Unlock() +} + func (i *S3Storage) ReadAt(buffer []byte, offset int64) (int, error) { // Split the read up into blocks, and concurrenty perform the reads... end := uint64(offset + int64(len(buffer))) @@ -185,8 +205,9 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { errs := make(chan error, blocks) getData := func(buff []byte, off int64) (int, error) { + ctx := context.TODO() i.lockers[off/int64(i.block_size)].RLock() - obj, err := i.client.GetObject(context.TODO(), i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{}) + obj, err := i.client.GetObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), minio.GetObjectOptions{}) i.lockers[off/int64(i.block_size)].RUnlock() if err != nil { return 0, err @@ -195,11 +216,24 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { } putData := func(buff []byte, off int64) (int, error) { - i.lockers[off/int64(i.block_size)].Lock() - obj, err := i.client.PutObject(context.TODO(), i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), + block := off / int64(i.block_size) + ctx, cancelFn := context.WithCancel(context.TODO()) + i.lockers[block].Lock() + + i.setContext(int(block), cancelFn) + + obj, err := i.client.PutObject(ctx, i.bucket, fmt.Sprintf("%s-%d", i.prefix, off), bytes.NewReader(buff), int64(i.block_size), minio.PutObjectOptions{}) - i.lockers[off/int64(i.block_size)].Unlock() + + // err might be that the context is cancelled + if err == context.Canceled { + fmt.Printf("*** PUT CANCELLED [%d]***\n", block) + return len(buff), nil + } + + i.clearContext(int(block)) + i.lockers[block].Unlock() if err != nil { return 0, err } @@ -266,3 +300,5 @@ func (i *S3Storage) Size() uint64 { func (i *S3Storage) Close() error { return nil } + +func (i *S3Storage) CancelWrites(offset int64, length int64) {} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index f6b61a37..0cbe55b1 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -13,6 +13,7 @@ type StorageProvider interface { Size() uint64 Flush() error Close() error + CancelWrites(offset int64, length int64) } type LockableStorageProvider interface { diff --git a/pkg/storage/volatilitymonitor/volatility_monitor.go b/pkg/storage/volatilitymonitor/volatility_monitor.go index b4304bfe..1c7efd2e 100644 --- a/pkg/storage/volatilitymonitor/volatility_monitor.go +++ b/pkg/storage/volatilitymonitor/volatility_monitor.go @@ -150,3 +150,7 @@ func (i *VolatilityMonitor) Size() uint64 { func (i *VolatilityMonitor) Close() error { return i.prov.Close() } + +func (i *VolatilityMonitor) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} diff --git a/pkg/storage/waitingcache/waiting_cache_local.go b/pkg/storage/waitingcache/waiting_cache_local.go index 7fbe7b1a..a01a64f3 100644 --- a/pkg/storage/waitingcache/waiting_cache_local.go +++ b/pkg/storage/waitingcache/waiting_cache_local.go @@ -108,3 +108,7 @@ func (wcl *WaitingCacheLocal) DirtyBlocks(blocks []uint) { wcl.wc.markUnavailableRemoteBlock(v) } } + +func (wcl *WaitingCacheLocal) CancelWrites(offset int64, length int64) { + wcl.wc.prov.CancelWrites(offset, length) +} diff --git a/pkg/storage/waitingcache/waiting_cache_remote.go b/pkg/storage/waitingcache/waiting_cache_remote.go index 4198829b..7048eec5 100644 --- a/pkg/storage/waitingcache/waiting_cache_remote.go +++ b/pkg/storage/waitingcache/waiting_cache_remote.go @@ -85,3 +85,7 @@ func (wcl *WaitingCacheRemote) Size() uint64 { func (wcl *WaitingCacheRemote) Close() error { return wcl.wc.prov.Close() } + +func (wcl *WaitingCacheRemote) CancelWrites(offset int64, length int64) { + wcl.wc.prov.CancelWrites(offset, length) +} From 16610e2ebca21ac511b6e83b58ae34de0f7f6fce Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Tue, 30 Apr 2024 12:41:42 +0100 Subject: [PATCH 08/21] Impl cancelWrites for S3, and test --- pkg/storage/sources/s3_storage.go | 29 +++++++++++++------------- pkg/storage/sources/s3_storage_test.go | 26 +++++++++++++++++++++++ 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/pkg/storage/sources/s3_storage.go b/pkg/storage/sources/s3_storage.go index a7d0b236..1eb947d2 100644 --- a/pkg/storage/sources/s3_storage.go +++ b/pkg/storage/sources/s3_storage.go @@ -122,12 +122,6 @@ func (i *S3Storage) setContext(block int, cancel context.CancelFunc) { i.contexts_lock.Unlock() } -func (i *S3Storage) clearContext(block int) { - i.contexts_lock.Lock() - i.contexts[block] = nil - i.contexts_lock.Unlock() -} - func (i *S3Storage) ReadAt(buffer []byte, offset int64) (int, error) { // Split the read up into blocks, and concurrenty perform the reads... end := uint64(offset + int64(len(buffer))) @@ -226,13 +220,7 @@ func (i *S3Storage) WriteAt(buffer []byte, offset int64) (int, error) { bytes.NewReader(buff), int64(i.block_size), minio.PutObjectOptions{}) - // err might be that the context is cancelled - if err == context.Canceled { - fmt.Printf("*** PUT CANCELLED [%d]***\n", block) - return len(buff), nil - } - - i.clearContext(int(block)) + i.setContext(int(block), nil) i.lockers[block].Unlock() if err != nil { return 0, err @@ -301,4 +289,17 @@ func (i *S3Storage) Close() error { return nil } -func (i *S3Storage) CancelWrites(offset int64, length int64) {} +func (i *S3Storage) CancelWrites(offset int64, length int64) { + end := uint64(offset + length) + if end > i.size { + end = i.size + } + + b_start := uint(offset / int64(i.block_size)) + b_end := uint((end-1)/uint64(i.block_size)) + 1 + + for b := b_start; b < b_end; b++ { + // Cancel any writes for the given block... + i.setContext(int(b), nil) + } +} diff --git a/pkg/storage/sources/s3_storage_test.go b/pkg/storage/sources/s3_storage_test.go index 8a0258de..233fea32 100644 --- a/pkg/storage/sources/s3_storage_test.go +++ b/pkg/storage/sources/s3_storage_test.go @@ -1,9 +1,11 @@ package sources_test import ( + "context" "crypto/rand" "fmt" "testing" + "time" "github.com/loopholelabs/silo/pkg/storage/sources" "github.com/loopholelabs/silo/pkg/testutils" @@ -36,3 +38,27 @@ func TestS3Storage(t *testing.T) { err = s3store.Close() assert.NoError(t, err) } + +func TestS3StorageCancelWrites(t *testing.T) { + PORT_9000 := testutils.SetupMinio(t.Cleanup) + + size := 64 * 1024 + blockSize := 1024 + s3store, err := sources.NewS3StorageCreate(fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) + assert.NoError(t, err) + + buffer := make([]byte, 32*1024) + _, err = rand.Read(buffer) + assert.NoError(t, err) + + // Cancel writes just after the WriteAt() + time.AfterFunc(10*time.Millisecond, func() { + s3store.CancelWrites(0, int64(size)) // Cancel ALL writes! + }) + + _, err = s3store.WriteAt(buffer, 80) + assert.ErrorIs(t, err, context.Canceled) + + err = s3store.Close() + assert.NoError(t, err) +} From 7f9063dbf5fbd1e155241e8f95bf35732d37a5f6 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Tue, 30 Apr 2024 13:22:30 +0100 Subject: [PATCH 09/21] migrator can now cancel dirty writes --- pkg/storage/migrator/migrator.go | 130 ++++++++++++++++---------- pkg/storage/migrator/migrator_test.go | 2 +- 2 files changed, 80 insertions(+), 52 deletions(-) diff --git a/pkg/storage/migrator/migrator.go b/pkg/storage/migrator/migrator.go index 9b327fe9..415be8c3 100644 --- a/pkg/storage/migrator/migrator.go +++ b/pkg/storage/migrator/migrator.go @@ -1,6 +1,7 @@ package migrator import ( + "context" "crypto/sha256" "errors" "sync" @@ -20,6 +21,7 @@ type MigratorConfig struct { Progress_handler func(p *MigrationProgress) Concurrency map[int]int Integrity bool + Recent_write_age time.Duration } func NewMigratorConfig() *MigratorConfig { @@ -35,7 +37,8 @@ func NewMigratorConfig() *MigratorConfig { storage.BlockTypeDirty: 100, storage.BlockTypePriority: 16, }, - Integrity: false, + Integrity: false, + Recent_write_age: time.Minute, } } @@ -45,36 +48,41 @@ func (mc *MigratorConfig) WithBlockSize(bs int) *MigratorConfig { } type MigrationProgress struct { - Total_blocks int // Total blocks - Migrated_blocks int // Number of blocks that have been migrated - Migrated_blocks_perc float64 - Ready_blocks int // Number of blocks which are up to date (clean). May go down as well as up. - Ready_blocks_perc float64 - Active_blocks int // Number of blocks in progress now + Total_blocks int // Total blocks + Migrated_blocks int // Number of blocks that have been migrated + Migrated_blocks_perc float64 + Ready_blocks int // Number of blocks which are up to date (clean). May go down as well as up. + Ready_blocks_perc float64 + Active_blocks int // Number of blocks in progress now + Total_Canceled_blocks int // Total blocks that were cancelled + Total_Migrated_blocks int // Total blocks that were migrated } type Migrator struct { - source_tracker storage.TrackingStorageProvider // Tracks writes so we know which are dirty - dest storage.StorageProvider - source_lock_fn func() - source_unlock_fn func() - error_fn func(block *storage.BlockInfo, err error) - progress_fn func(*MigrationProgress) - progress_lock sync.Mutex - progress_last time.Time - progress_last_status *MigrationProgress - block_size int - num_blocks int - metric_blocks_moved int64 - block_locks []sync.Mutex - moving_blocks *util.Bitfield - migrated_blocks *util.Bitfield - clean_blocks *util.Bitfield - block_order storage.BlockOrder - ctime time.Time - concurrency map[int]chan bool - wg sync.WaitGroup - integrity *integrity.IntegrityChecker + source_tracker storage.TrackingStorageProvider // Tracks writes so we know which are dirty + dest storage.StorageProvider + source_lock_fn func() + source_unlock_fn func() + error_fn func(block *storage.BlockInfo, err error) + progress_fn func(*MigrationProgress) + progress_lock sync.Mutex + progress_last time.Time + progress_last_status *MigrationProgress + block_size int + num_blocks int + metric_blocks_migrated int64 + metric_blocks_canceled int64 + block_locks []sync.Mutex + moving_blocks *util.Bitfield + migrated_blocks *util.Bitfield + clean_blocks *util.Bitfield + last_written_blocks []time.Time + block_order storage.BlockOrder + ctime time.Time + concurrency map[int]chan bool + wg sync.WaitGroup + integrity *integrity.IntegrityChecker + recent_write_age time.Duration } func NewMigrator(source storage.TrackingStorageProvider, @@ -84,22 +92,25 @@ func NewMigrator(source storage.TrackingStorageProvider, num_blocks := (int(source.Size()) + config.Block_size - 1) / config.Block_size m := &Migrator{ - dest: dest, - source_tracker: source, - source_lock_fn: config.Locker_handler, - source_unlock_fn: config.Unlocker_handler, - error_fn: config.Error_handler, - progress_fn: config.Progress_handler, - block_size: config.Block_size, - num_blocks: num_blocks, - metric_blocks_moved: 0, - block_order: block_order, - moving_blocks: util.NewBitfield(num_blocks), - migrated_blocks: util.NewBitfield(num_blocks), - clean_blocks: util.NewBitfield(num_blocks), - block_locks: make([]sync.Mutex, num_blocks), - concurrency: make(map[int]chan bool), - progress_last_status: &MigrationProgress{}, + dest: dest, + source_tracker: source, + source_lock_fn: config.Locker_handler, + source_unlock_fn: config.Unlocker_handler, + error_fn: config.Error_handler, + progress_fn: config.Progress_handler, + block_size: config.Block_size, + num_blocks: num_blocks, + metric_blocks_migrated: 0, + metric_blocks_canceled: 0, + block_order: block_order, + moving_blocks: util.NewBitfield(num_blocks), + migrated_blocks: util.NewBitfield(num_blocks), + clean_blocks: util.NewBitfield(num_blocks), + block_locks: make([]sync.Mutex, num_blocks), + concurrency: make(map[int]chan bool), + progress_last_status: &MigrationProgress{}, + last_written_blocks: make([]time.Time, num_blocks), + recent_write_age: config.Recent_write_age, } if m.dest.Size() != m.source_tracker.Size() { @@ -127,6 +138,8 @@ func (m *Migrator) SetMigratedBlock(block int) { m.migrated_blocks.SetBit(block) m.clean_blocks.SetBit(block) + m.last_written_blocks[block] = time.Now() + m.reportProgress(false) } @@ -200,11 +213,18 @@ func (m *Migrator) Unlock() { /** * MigrateDirty migrates a list of dirty blocks. + * An attempt is made to cancel any existing writes for the blocks first. */ func (m *Migrator) MigrateDirty(blocks []uint) error { for _, pos := range blocks { i := &storage.BlockInfo{Block: int(pos), Type: storage.BlockTypeDirty} + // Check if there has been a successful write to the block recently + // If there has, then cancel any in progress write + if time.Since(m.last_written_blocks[pos]) < m.recent_write_age { + m.dest.CancelWrites(int64(pos*uint(m.block_size)), int64(m.block_size)) + } + cc, ok := m.concurrency[i.Type] if !ok { cc = m.concurrency[storage.BlockTypeAny] @@ -267,12 +287,14 @@ func (m *Migrator) reportProgress(forced bool) { m.progress_last = time.Now() m.progress_last_status = &MigrationProgress{ - Total_blocks: m.num_blocks, - Migrated_blocks: migrated, - Migrated_blocks_perc: perc_mig, - Ready_blocks: completed, - Ready_blocks_perc: perc_complete, - Active_blocks: m.moving_blocks.Count(0, uint(m.num_blocks)), + Total_blocks: m.num_blocks, + Migrated_blocks: migrated, + Migrated_blocks_perc: perc_mig, + Ready_blocks: completed, + Ready_blocks_perc: perc_complete, + Active_blocks: m.moving_blocks.Count(0, uint(m.num_blocks)), + Total_Canceled_blocks: int(atomic.LoadInt64(&m.metric_blocks_canceled)), + Total_Migrated_blocks: int(atomic.LoadInt64(&m.metric_blocks_migrated)), } // Callback m.progress_fn(m.progress_last_status) @@ -304,15 +326,21 @@ func (m *Migrator) migrateBlock(block int) error { // Now write it to destStorage n, err = m.dest.WriteAt(buff, int64(offset)) if n != len(buff) || err != nil { + if errors.Is(err, context.Canceled) { + atomic.AddInt64(&m.metric_blocks_canceled, 1) + } return err } + // Set the last successful write for this block + m.last_written_blocks[block] = time.Now() + // If we have an integrity check setup, lets hash the block for later... if m.integrity != nil { m.integrity.HashBlock(uint(block), buff) } - atomic.AddInt64(&m.metric_blocks_moved, 1) + atomic.AddInt64(&m.metric_blocks_migrated, 1) // Mark it as done m.migrated_blocks.SetBit(block) m.clean_blocks.SetBit(block) diff --git a/pkg/storage/migrator/migrator_test.go b/pkg/storage/migrator/migrator_test.go index 7903621f..073384e1 100644 --- a/pkg/storage/migrator/migrator_test.go +++ b/pkg/storage/migrator/migrator_test.go @@ -505,7 +505,7 @@ func TestMigratorWithReaderWriterWrite(t *testing.T) { err = mig.WaitForCompletion() assert.NoError(t, err) - assert.Equal(t, int64(num_blocks-num_local_blocks), mig.metric_blocks_moved) + assert.Equal(t, int64(num_blocks-num_local_blocks), mig.metric_blocks_migrated) } func TestMigratorSimpleCowSparse(t *testing.T) { From 8acb1bbeadd88bceca84e61ac4a32d90e5b8d2ce Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Tue, 30 Apr 2024 16:31:17 +0100 Subject: [PATCH 10/21] Improved metrics output --- pkg/storage/modules/metrics.go | 53 +++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/pkg/storage/modules/metrics.go b/pkg/storage/modules/metrics.go index eab82e69..227f2204 100644 --- a/pkg/storage/modules/metrics.go +++ b/pkg/storage/modules/metrics.go @@ -47,44 +47,61 @@ func NewMetrics(prov storage.StorageProvider) *Metrics { } } +func formatDuration(d time.Duration) string { + if d < time.Millisecond { + return fmt.Sprintf("%dns", d.Nanoseconds()) + } else if d < time.Second { + return fmt.Sprintf("%dms", d.Milliseconds()) + } else { + return fmt.Sprintf("%.3fs", float64(d)/float64(time.Second)) + } +} + func (i *Metrics) ShowStats(prefix string) { read_ops := atomic.LoadUint64(&i.metric_read_ops) + read_errors := atomic.LoadUint64(&i.metric_read_errors) read_time := atomic.LoadUint64(&i.metric_read_time) read_avg_time := uint64(0) - if read_ops > 0 { - read_avg_time = read_time / read_ops + if read_ops-read_errors > 0 { + read_avg_time = read_time / (read_ops - read_errors) } - fmt.Printf("%s: Reads=%d (%d bytes) avg latency %dns, %d errors, ", + read_avg_time_f := formatDuration(time.Duration(read_avg_time)) + fmt.Printf("%s: Reads=%d (%d bytes) avg latency %s, %d errors, ", prefix, read_ops, atomic.LoadUint64(&i.metric_read_bytes), - read_avg_time, - atomic.LoadUint64(&i.metric_read_errors), + read_avg_time_f, + read_errors, ) write_ops := atomic.LoadUint64(&i.metric_write_ops) + write_errors := atomic.LoadUint64(&i.metric_write_errors) write_time := atomic.LoadUint64(&i.metric_write_time) write_avg_time := uint64(0) - if write_ops > 0 { - write_avg_time = write_time / write_ops + if write_ops-write_errors > 0 { + write_avg_time = write_time / (write_ops - write_errors) } - fmt.Printf("Writes=%d (%d bytes) avg latency %dns, %d errors, ", + write_avg_time_f := formatDuration(time.Duration(write_avg_time)) + fmt.Printf("Writes=%d (%d bytes) avg latency %s, %d errors, ", write_ops, atomic.LoadUint64(&i.metric_write_bytes), - write_avg_time, - atomic.LoadUint64(&i.metric_write_errors), + write_avg_time_f, + write_errors, ) flush_ops := atomic.LoadUint64(&i.metric_flush_ops) + flush_errors := atomic.LoadUint64(&i.metric_flush_errors) flush_time := atomic.LoadUint64(&i.metric_flush_time) flush_avg_time := uint64(0) - if flush_ops > 0 { - flush_avg_time = flush_time / flush_ops + if flush_ops-flush_errors > 0 { + flush_avg_time = flush_time / (flush_ops - flush_errors) } - fmt.Printf("Flushes=%d avg latency %dns, %d errors\n", + flush_avg_time_f := formatDuration(time.Duration(flush_avg_time)) + + fmt.Printf("Flushes=%d avg latency %s, %d errors\n", flush_ops, - flush_avg_time, - atomic.LoadUint64(&i.metric_flush_errors), + flush_avg_time_f, + flush_errors, ) } @@ -123,9 +140,10 @@ func (i *Metrics) ReadAt(buffer []byte, offset int64) (int, error) { atomic.AddUint64(&i.metric_read_bytes, uint64(len(buffer))) ctime := time.Now() n, e := i.prov.ReadAt(buffer, offset) - atomic.AddUint64(&i.metric_read_time, uint64(time.Since(ctime).Nanoseconds())) if e != nil { atomic.AddUint64(&i.metric_read_errors, 1) + } else { + atomic.AddUint64(&i.metric_read_time, uint64(time.Since(ctime).Nanoseconds())) } return n, e } @@ -135,9 +153,10 @@ func (i *Metrics) WriteAt(buffer []byte, offset int64) (int, error) { atomic.AddUint64(&i.metric_write_bytes, uint64(len(buffer))) ctime := time.Now() n, e := i.prov.WriteAt(buffer, offset) - atomic.AddUint64(&i.metric_write_time, uint64(time.Since(ctime).Nanoseconds())) if e != nil { atomic.AddUint64(&i.metric_write_errors, 1) + } else { + atomic.AddUint64(&i.metric_write_time, uint64(time.Since(ctime).Nanoseconds())) } return n, e } From 79aed806f9886a2adfc632baa99e17a4066ba156 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Wed, 1 May 2024 14:23:40 +0100 Subject: [PATCH 11/21] Added binlog+replay so that we can capture real world workloads and replay for testing/optimization/debugging etc --- pkg/storage/device/device.go | 11 ++- pkg/storage/modules/binlog.go | 110 +++++++++++++++++++++++++++ pkg/storage/modules/binlog_replay.go | 79 +++++++++++++++++++ 3 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 pkg/storage/modules/binlog.go create mode 100644 pkg/storage/modules/binlog_replay.go diff --git a/pkg/storage/device/device.go b/pkg/storage/device/device.go index 4d5da863..36cd118f 100644 --- a/pkg/storage/device/device.go +++ b/pkg/storage/device/device.go @@ -70,7 +70,7 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose } } else if ds.System == SYSTEM_S3 { // - return nil, nil, fmt.Errorf("S3 Not Supported yet") + return nil, nil, fmt.Errorf("S3 Not Supported in device yet") } else if ds.System == SYSTEM_SPARSE_FILE { file, err := os.Open(ds.Location) if err != nil { @@ -203,7 +203,16 @@ func NewDevice(ds *config.DeviceSchema) (storage.StorageProvider, storage.Expose } } + // Optionally binlog this dev to a file + if ds.Binlog != "" { + prov, err = modules.NewBinLog(prov, ds.Binlog) + if err != nil { + return nil, nil, err + } + } + // Now optionaly expose the device + // NB You may well need to call ex.SetProvider if you wish to insert other things in the chain. var ex storage.ExposedStorage if ds.Expose { ex = expose.NewExposedStorageNBDNL(prov, 8, 0, prov.Size(), expose.NBD_DEFAULT_BLOCK_SIZE, true) diff --git a/pkg/storage/modules/binlog.go b/pkg/storage/modules/binlog.go new file mode 100644 index 00000000..2558d10b --- /dev/null +++ b/pkg/storage/modules/binlog.go @@ -0,0 +1,110 @@ +package modules + +import ( + "encoding/binary" + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/protocol/packets" +) + +type BinLog struct { + prov storage.StorageProvider + filename string + ctime time.Time + set_ctime bool + fp *os.File + write_lock sync.Mutex + readsEnabled atomic.Bool + writesEnabled atomic.Bool +} + +func NewBinLog(prov storage.StorageProvider, filename string) (*BinLog, error) { + fp, err := os.Create(filename) + if err != nil { + return nil, err + } + + l := &BinLog{ + prov: prov, + filename: filename, + fp: fp, + ctime: time.Now(), + set_ctime: true, + } + // By default, we only log writes. + l.readsEnabled.Store(false) + l.writesEnabled.Store(true) + return l, nil +} + +func (i *BinLog) writeLog(data []byte) { + now := time.Now() + i.write_lock.Lock() + defer i.write_lock.Unlock() + + if i.set_ctime { + i.ctime = now + i.set_ctime = false + } + // Write a short header, then the packet data + dt := now.Sub(i.ctime).Nanoseconds() + header := make([]byte, 12) + binary.LittleEndian.PutUint64(header, uint64(dt)) + binary.LittleEndian.PutUint32(header[8:], uint32(len(data))) + + // Write to the binlog... + _, err := i.fp.Write(header) + if err != nil { + panic(fmt.Sprintf("Could not write to binlog %v", err)) + } + _, err = i.fp.Write(data) + if err != nil { + panic(fmt.Sprintf("Could not write to binlog %v", err)) + } +} + +func (i *BinLog) SetLogging(reads bool, writes bool) { + i.readsEnabled.Store(reads) + i.writesEnabled.Store(writes) +} + +func (i *BinLog) ReadAt(buffer []byte, offset int64) (int, error) { + // Write it to the binlog... + if i.readsEnabled.Load() { + b := packets.EncodeReadAt(offset, int32(len(buffer))) + i.writeLog(b) + } + n, err := i.prov.ReadAt(buffer, offset) + return n, err +} + +func (i *BinLog) WriteAt(buffer []byte, offset int64) (int, error) { + if i.writesEnabled.Load() { + b := packets.EncodeWriteAt(offset, buffer) + i.writeLog(b) + } + n, err := i.prov.WriteAt(buffer, offset) + return n, err +} + +func (i *BinLog) Flush() error { + return i.prov.Flush() +} + +func (i *BinLog) Size() uint64 { + return i.prov.Size() +} + +func (i *BinLog) Close() error { + i.fp.Close() // Close the binlog. + return i.prov.Close() +} + +func (i *BinLog) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} diff --git a/pkg/storage/modules/binlog_replay.go b/pkg/storage/modules/binlog_replay.go new file mode 100644 index 00000000..7a1ff773 --- /dev/null +++ b/pkg/storage/modules/binlog_replay.go @@ -0,0 +1,79 @@ +package modules + +import ( + "encoding/binary" + "os" + "time" + + "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/protocol/packets" +) + +type BinLogReplay struct { + prov storage.StorageProvider + fp *os.File + ctime time.Time + set_ctime bool +} + +func NewBinLogReplay(filename string, prov storage.StorageProvider) (*BinLogReplay, error) { + fp, err := os.Open(filename) + if err != nil { + return nil, err + } + return &BinLogReplay{ + fp: fp, + ctime: time.Now(), + set_ctime: true, + prov: prov, + }, nil +} + +func (i *BinLogReplay) ExecuteNext() error { + // Read a header + header := make([]byte, 12) + _, err := i.fp.Read(header) + if err != nil { + return err + } + + if i.set_ctime { + i.ctime = time.Now() + i.set_ctime = false + } + + // If we need to, we'll wait here until the next log should be replayed. + target_dt := time.Duration(binary.LittleEndian.Uint64(header)) + replay_dt := time.Since(i.ctime) + if replay_dt < target_dt { + time.Sleep(target_dt - replay_dt) + } + + length := binary.LittleEndian.Uint32(header[8:]) + data := make([]byte, length) + _, err = i.fp.Read(data) + if err != nil { + return err + } + + // Dispatch the command + if data[0] == packets.COMMAND_READ_AT { + offset, length, err := packets.DecodeReadAt(data) + if err != nil { + return err + } + buffer := make([]byte, length) + _, err = i.prov.ReadAt(buffer, offset) + return err + } else if data[0] == packets.COMMAND_WRITE_AT { + offset, buffer, err := packets.DecodeWriteAt(data) + if err != nil { + return err + } + _, err = i.prov.WriteAt(buffer, offset) + return err + } else { + panic("Unknown packet in binlog") + } + return nil +} From 676c5047643be7c4fd6fe2dea1832b457bcdf9f0 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Wed, 1 May 2024 14:25:35 +0100 Subject: [PATCH 12/21] Added SetBitIfClear to bitfield, useful as non blocking lock --- pkg/storage/util/bitfield.go | 14 ++++++++++++++ pkg/storage/util/bitfield_test.go | 17 +++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/pkg/storage/util/bitfield.go b/pkg/storage/util/bitfield.go index d88ec75b..d58c9ab5 100644 --- a/pkg/storage/util/bitfield.go +++ b/pkg/storage/util/bitfield.go @@ -99,6 +99,20 @@ func (bf *Bitfield) BitSet(i int) bool { return (old & f) != 0 } +/** + * Set the bit. Returns the previous state of the bit + * + */ +func (bf *Bitfield) SetBitIfClear(i int) bool { + f := uint64(1 << (i & 63)) + p := i >> 6 + old := atomic.LoadUint64(&bf.data[p]) + for !atomic.CompareAndSwapUint64(&bf.data[p], old, old|f) { + old = atomic.LoadUint64(&bf.data[p]) + } + return (old & f) != 0 +} + /** * Get the length of the bitfield * diff --git a/pkg/storage/util/bitfield_test.go b/pkg/storage/util/bitfield_test.go index 932db2d9..46ad07f9 100644 --- a/pkg/storage/util/bitfield_test.go +++ b/pkg/storage/util/bitfield_test.go @@ -164,3 +164,20 @@ func TestBitfieldCollect(t *testing.T) { assert.Equal(t, []uint{100, 101, 102, 103, 200}, c) } + +func TestBitfieldSetBitIfClear(t *testing.T) { + bf := NewBitfield(1000) + + bf.SetBit(99) + + assert.Equal(t, true, bf.BitSet(99)) + assert.Equal(t, false, bf.BitSet(100)) + + s99 := bf.SetBitIfClear(99) + assert.True(t, s99) + s100 := bf.SetBitIfClear(100) + assert.False(t, s100) + + assert.Equal(t, true, bf.BitSet(99)) + assert.Equal(t, true, bf.BitSet(100)) +} From c17f3449bd0a6811e3f7431f7769387000b6f918 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Wed, 1 May 2024 14:26:17 +0100 Subject: [PATCH 13/21] Updated silo conf with optional binlog --- pkg/storage/config/silo.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/config/silo.go b/pkg/storage/config/silo.go index d6034d72..78ef1c99 100644 --- a/pkg/storage/config/silo.go +++ b/pkg/storage/config/silo.go @@ -25,6 +25,7 @@ type DeviceSchema struct { Expose bool `hcl:"expose,optional"` Location string `hcl:"location,optional"` ROSource *DeviceSchema `hcl:"source,block"` + Binlog string `hcl:"binlog,optional"` } type LocationSchema struct { From 59f3aa0e549cb80e4b19c90a249a7e1515a55525 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Wed, 1 May 2024 14:27:11 +0100 Subject: [PATCH 14/21] Migrator can now cancel, and can dedupe dirty block writes --- pkg/storage/migrator/migrator.go | 139 ++++++++++++++++++------------- 1 file changed, 79 insertions(+), 60 deletions(-) diff --git a/pkg/storage/migrator/migrator.go b/pkg/storage/migrator/migrator.go index 415be8c3..b35dc2c6 100644 --- a/pkg/storage/migrator/migrator.go +++ b/pkg/storage/migrator/migrator.go @@ -21,6 +21,7 @@ type MigratorConfig struct { Progress_handler func(p *MigrationProgress) Concurrency map[int]int Integrity bool + Cancel_writes bool Recent_write_age time.Duration } @@ -38,6 +39,7 @@ func NewMigratorConfig() *MigratorConfig { storage.BlockTypePriority: 16, }, Integrity: false, + Cancel_writes: true, Recent_write_age: time.Minute, } } @@ -48,41 +50,45 @@ func (mc *MigratorConfig) WithBlockSize(bs int) *MigratorConfig { } type MigrationProgress struct { - Total_blocks int // Total blocks - Migrated_blocks int // Number of blocks that have been migrated - Migrated_blocks_perc float64 - Ready_blocks int // Number of blocks which are up to date (clean). May go down as well as up. - Ready_blocks_perc float64 - Active_blocks int // Number of blocks in progress now - Total_Canceled_blocks int // Total blocks that were cancelled - Total_Migrated_blocks int // Total blocks that were migrated + Total_blocks int // Total blocks + Migrated_blocks int // Number of blocks that have been migrated + Migrated_blocks_perc float64 + Ready_blocks int // Number of blocks which are up to date (clean). May go down as well as up. + Ready_blocks_perc float64 + Active_blocks int // Number of blocks in progress now + Total_Canceled_blocks int // Total blocks that were cancelled + Total_Migrated_blocks int // Total blocks that were migrated + Total_Duplicated_blocks int } type Migrator struct { - source_tracker storage.TrackingStorageProvider // Tracks writes so we know which are dirty - dest storage.StorageProvider - source_lock_fn func() - source_unlock_fn func() - error_fn func(block *storage.BlockInfo, err error) - progress_fn func(*MigrationProgress) - progress_lock sync.Mutex - progress_last time.Time - progress_last_status *MigrationProgress - block_size int - num_blocks int - metric_blocks_migrated int64 - metric_blocks_canceled int64 - block_locks []sync.Mutex - moving_blocks *util.Bitfield - migrated_blocks *util.Bitfield - clean_blocks *util.Bitfield - last_written_blocks []time.Time - block_order storage.BlockOrder - ctime time.Time - concurrency map[int]chan bool - wg sync.WaitGroup - integrity *integrity.IntegrityChecker - recent_write_age time.Duration + source_tracker storage.TrackingStorageProvider // Tracks writes so we know which are dirty + dest storage.StorageProvider + source_lock_fn func() + source_unlock_fn func() + error_fn func(block *storage.BlockInfo, err error) + progress_fn func(*MigrationProgress) + progress_lock sync.Mutex + progress_last time.Time + progress_last_status *MigrationProgress + block_size int + num_blocks int + metric_blocks_migrated int64 + metric_blocks_canceled int64 + metric_blocks_duplicates int64 + block_locks []sync.Mutex + moving_blocks *util.Bitfield + migrated_blocks *util.Bitfield + clean_blocks *util.Bitfield + waiting_blocks *util.Bitfield + last_written_blocks []time.Time + block_order storage.BlockOrder + ctime time.Time + concurrency map[int]chan bool + wg sync.WaitGroup + integrity *integrity.IntegrityChecker + cancel_writes bool + recent_write_age time.Duration } func NewMigrator(source storage.TrackingStorageProvider, @@ -92,25 +98,28 @@ func NewMigrator(source storage.TrackingStorageProvider, num_blocks := (int(source.Size()) + config.Block_size - 1) / config.Block_size m := &Migrator{ - dest: dest, - source_tracker: source, - source_lock_fn: config.Locker_handler, - source_unlock_fn: config.Unlocker_handler, - error_fn: config.Error_handler, - progress_fn: config.Progress_handler, - block_size: config.Block_size, - num_blocks: num_blocks, - metric_blocks_migrated: 0, - metric_blocks_canceled: 0, - block_order: block_order, - moving_blocks: util.NewBitfield(num_blocks), - migrated_blocks: util.NewBitfield(num_blocks), - clean_blocks: util.NewBitfield(num_blocks), - block_locks: make([]sync.Mutex, num_blocks), - concurrency: make(map[int]chan bool), - progress_last_status: &MigrationProgress{}, - last_written_blocks: make([]time.Time, num_blocks), - recent_write_age: config.Recent_write_age, + dest: dest, + source_tracker: source, + source_lock_fn: config.Locker_handler, + source_unlock_fn: config.Unlocker_handler, + error_fn: config.Error_handler, + progress_fn: config.Progress_handler, + block_size: config.Block_size, + num_blocks: num_blocks, + metric_blocks_migrated: 0, + metric_blocks_canceled: 0, + metric_blocks_duplicates: 0, + block_order: block_order, + moving_blocks: util.NewBitfield(num_blocks), + migrated_blocks: util.NewBitfield(num_blocks), + clean_blocks: util.NewBitfield(num_blocks), + waiting_blocks: util.NewBitfield(num_blocks), + block_locks: make([]sync.Mutex, num_blocks), + concurrency: make(map[int]chan bool), + progress_last_status: &MigrationProgress{}, + last_written_blocks: make([]time.Time, num_blocks), + recent_write_age: config.Recent_write_age, + cancel_writes: config.Cancel_writes, } if m.dest.Size() != m.source_tracker.Size() { @@ -221,10 +230,17 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { // Check if there has been a successful write to the block recently // If there has, then cancel any in progress write - if time.Since(m.last_written_blocks[pos]) < m.recent_write_age { + if m.cancel_writes && time.Since(m.last_written_blocks[pos]) < m.recent_write_age { m.dest.CancelWrites(int64(pos*uint(m.block_size)), int64(m.block_size)) } + // TODO: If there's already one waiting here for this block, we want to do nothing. + if m.waiting_blocks.SetBitIfClear(int(pos)) { + // Someone else is already waiting here. We will quit. + atomic.AddInt64(&m.metric_blocks_duplicates, 1) + return nil + } + cc, ok := m.concurrency[i.Type] if !ok { cc = m.concurrency[storage.BlockTypeAny] @@ -247,6 +263,8 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { cc = m.concurrency[storage.BlockTypeAny] } <-cc + + m.waiting_blocks.ClearBit(int(block_no.Block)) // Allow more writes for this block now. }(i) } @@ -287,14 +305,15 @@ func (m *Migrator) reportProgress(forced bool) { m.progress_last = time.Now() m.progress_last_status = &MigrationProgress{ - Total_blocks: m.num_blocks, - Migrated_blocks: migrated, - Migrated_blocks_perc: perc_mig, - Ready_blocks: completed, - Ready_blocks_perc: perc_complete, - Active_blocks: m.moving_blocks.Count(0, uint(m.num_blocks)), - Total_Canceled_blocks: int(atomic.LoadInt64(&m.metric_blocks_canceled)), - Total_Migrated_blocks: int(atomic.LoadInt64(&m.metric_blocks_migrated)), + Total_blocks: m.num_blocks, + Migrated_blocks: migrated, + Migrated_blocks_perc: perc_mig, + Ready_blocks: completed, + Ready_blocks_perc: perc_complete, + Active_blocks: m.moving_blocks.Count(0, uint(m.num_blocks)), + Total_Canceled_blocks: int(atomic.LoadInt64(&m.metric_blocks_canceled)), + Total_Migrated_blocks: int(atomic.LoadInt64(&m.metric_blocks_migrated)), + Total_Duplicated_blocks: int(atomic.LoadInt64(&m.metric_blocks_duplicates)), } // Callback m.progress_fn(m.progress_last_status) From 5ff608a2f4452322700ecbcc9294442ff02bc323 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Thu, 2 May 2024 09:09:44 +0100 Subject: [PATCH 15/21] BinLogReplay can now do immediate, or scaled playback --- pkg/storage/modules/binlog_replay.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/storage/modules/binlog_replay.go b/pkg/storage/modules/binlog_replay.go index 7a1ff773..d40b292f 100644 --- a/pkg/storage/modules/binlog_replay.go +++ b/pkg/storage/modules/binlog_replay.go @@ -29,7 +29,11 @@ func NewBinLogReplay(filename string, prov storage.StorageProvider) (*BinLogRepl }, nil } -func (i *BinLogReplay) ExecuteNext() error { +/** + * Execute the next read/write. + * If speed is 0 then it's executed immediately, else we wait. + */ +func (i *BinLogReplay) ExecuteNext(speed float64) error { // Read a header header := make([]byte, 12) _, err := i.fp.Read(header) @@ -45,8 +49,9 @@ func (i *BinLogReplay) ExecuteNext() error { // If we need to, we'll wait here until the next log should be replayed. target_dt := time.Duration(binary.LittleEndian.Uint64(header)) replay_dt := time.Since(i.ctime) - if replay_dt < target_dt { - time.Sleep(target_dt - replay_dt) + delay := speed * float64(target_dt-replay_dt) + if delay > 0 { + time.Sleep(time.Duration(delay)) } length := binary.LittleEndian.Uint32(header[8:]) From f5563efdb5433a929af7047ec507325d3bf975d7 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Thu, 2 May 2024 10:14:39 +0100 Subject: [PATCH 16/21] Cleaned up sync, and added couple of example binlogs --- cmd/sync.go | 260 ++++++++++++++++-------------------- silo_binlog_32m_hello_world | Bin 0 -> 41170 bytes silo_binlog_32m_mkfs.ext4 | Bin 0 -> 4465543 bytes 3 files changed, 115 insertions(+), 145 deletions(-) create mode 100644 silo_binlog_32m_hello_world create mode 100644 silo_binlog_32m_mkfs.ext4 diff --git a/cmd/sync.go b/cmd/sync.go index 2b667ea1..c0937eab 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -2,9 +2,9 @@ package main import ( "context" - crand "crypto/rand" + "errors" "fmt" - "math/rand" + "io" "os" "os/signal" "sync" @@ -32,6 +32,7 @@ var ( } ) +// Configuration options var sync_conf string var sync_endpoint string var sync_access string @@ -39,24 +40,27 @@ var sync_secret string var sync_bucket string var sync_block_size int var sync_time_limit time.Duration -var sync_random_writes bool -var sync_write_period time.Duration +var sync_replay bool var sync_dirty_block_shift int +var sync_block_max_age time.Duration +var sync_dirty_min_changed int +var sync_dirty_period time.Duration +var sync_dirty_limit int var sync_dummy bool +// Keep track of these for tidy up var sync_exposed []storage.ExposedStorage var sync_storage []*syncStorageInfo -var write_rand_source *rand.Rand - type syncStorageInfo struct { - tracker *dirtytracker.DirtyTrackerRemote - lockable storage.LockableStorageProvider - orderer *blocks.PriorityBlockOrder - num_blocks int - block_size int - name string - destMetrics *modules.Metrics + tracker *dirtytracker.DirtyTrackerRemote + lockable storage.LockableStorageProvider + orderer *blocks.PriorityBlockOrder + num_blocks int + block_size int + name string + dest_metrics *modules.Metrics + replay_log string } func init() { @@ -66,18 +70,22 @@ func init() { cmdSync.Flags().StringVarP(&sync_access, "access", "a", "", "S3 access") cmdSync.Flags().StringVarP(&sync_secret, "secret", "s", "", "S3 secret") cmdSync.Flags().StringVarP(&sync_bucket, "bucket", "b", "", "S3 bucket") - cmdSync.Flags().IntVarP(&sync_block_size, "blocksize", "l", 4*1024*1024, "S3 block size") - cmdSync.Flags().DurationVarP(&sync_time_limit, "timelimit", "t", 10*time.Second, "Sync time limit") - cmdSync.Flags().BoolVarP(&sync_random_writes, "randomwrites", "r", false, "Perform random writes") - cmdSync.Flags().DurationVarP(&sync_write_period, "writeperiod", "w", 100*time.Millisecond, "Random write period") + cmdSync.Flags().IntVarP(&sync_block_size, "blocksize", "l", 1*1024*1024, "S3 block size") + cmdSync.Flags().DurationVarP(&sync_time_limit, "timelimit", "t", 30*time.Second, "Sync time limit") + cmdSync.Flags().BoolVarP(&sync_replay, "replay", "r", false, "Replay existing binlog(s)") cmdSync.Flags().IntVarP(&sync_dirty_block_shift, "dirtyshift", "d", 10, "Dirty tracker block shift") + cmdSync.Flags().DurationVarP(&sync_block_max_age, "dirtymaxage", "", 1*time.Second, "Dirty block max age") + cmdSync.Flags().IntVarP(&sync_dirty_min_changed, "dirtyminchanged", "", 4, "Dirty block min subblock changes") + cmdSync.Flags().DurationVarP(&sync_dirty_period, "dirtyperiod", "", 100*time.Millisecond, "Dirty block check period") + cmdSync.Flags().IntVarP(&sync_dirty_limit, "dirtylimit", "", 16, "Dirty block limit per period") cmdSync.Flags().BoolVarP(&sync_dummy, "dummy", "y", false, "Dummy destination") } +/** + * Run sync command + * + */ func runSync(ccmd *cobra.Command, args []string) { - - write_rand_source = rand.New(rand.NewSource(1)) - sync_exposed = make([]storage.ExposedStorage, 0) sync_storage = make([]*syncStorageInfo, 0) fmt.Printf("Starting silo s3 sync\n") @@ -90,29 +98,29 @@ func runSync(ccmd *cobra.Command, args []string) { os.Exit(1) }() + // Load the configuration siloConf, err := config.ReadSchema(sync_conf) if err != nil { panic(err) } + // Go through and setup each device in turn for i, s := range siloConf.Device { - fmt.Printf("Setup storage %d [%s] size %s - %d\n", i, s.Name, s.Size, s.ByteSize()) - sinfo, err := setupSyncStorageDevice(s) + sinfo, err := sync_setup_device(s) if err != nil { panic(fmt.Sprintf("Could not setup storage. %v", err)) } sync_storage = append(sync_storage, sinfo) } - // Lets go through each of the things we want to migrate/sync - + // Now lets go through each of the things we want to migrate/sync var wg sync.WaitGroup for i, s := range sync_storage { wg.Add(1) go func(index int, src *syncStorageInfo) { - err := migrateDeviceS3(uint32(index), src.name, src) + err := sync_migrate_s3(uint32(index), src.name, src) if err != nil { fmt.Printf("There was an issue migrating the storage %d %v\n", index, err) } @@ -128,11 +136,19 @@ func runSync(ccmd *cobra.Command, args []string) { * Setup a storage device for sync command * */ -func setupSyncStorageDevice(conf *config.DeviceSchema) (*syncStorageInfo, error) { +func sync_setup_device(conf *config.DeviceSchema) (*syncStorageInfo, error) { block_size := sync_block_size // 1024 * 128 num_blocks := (int(conf.ByteSize()) + block_size - 1) / block_size + replay_log := "" + + // Get this from the conf if we are operating in replay mode. + if sync_replay { + replay_log = conf.Binlog + conf.Binlog = "" + } + source, ex, err := device.NewDevice(conf) if err != nil { return nil, err @@ -149,23 +165,17 @@ func setupSyncStorageDevice(conf *config.DeviceSchema) (*syncStorageInfo, error) sourceMonitor := volatilitymonitor.NewVolatilityMonitor(sourceDirtyLocal, block_size, 10*time.Second) sourceStorage := modules.NewLockable(sourceMonitor) + // Make sure any exposition is wired to go to the right place through the chain. if ex != nil { ex.SetProvider(sourceStorage) } - // Start monitoring blocks. - - var primary_orderer storage.BlockOrder - primary_orderer = sourceMonitor - - if serve_any_order { - primary_orderer = blocks.NewAnyBlockOrder(num_blocks, nil) - } - orderer := blocks.NewPriorityBlockOrder(num_blocks, primary_orderer) + // Setup a block order + orderer := blocks.NewPriorityBlockOrder(num_blocks, sourceMonitor) orderer.AddAll() + // Create a destination to migrate to var dest storage.StorageProvider - if sync_dummy { dest = modules.NewNothing(sourceStorage.Size()) } else { @@ -176,47 +186,54 @@ func setupSyncStorageDevice(conf *config.DeviceSchema) (*syncStorageInfo, error) conf.Name, sourceStorage.Size(), sync_block_size) - if err != nil { return nil, err } } + // Return everything we need return &syncStorageInfo{ - tracker: sourceDirtyRemote, - lockable: sourceStorage, - orderer: orderer, - block_size: block_size, - num_blocks: num_blocks, - name: conf.Name, - destMetrics: modules.NewMetrics(dest), + tracker: sourceDirtyRemote, + lockable: sourceStorage, + orderer: orderer, + block_size: block_size, + num_blocks: num_blocks, + name: conf.Name, + dest_metrics: modules.NewMetrics(dest), + replay_log: replay_log, }, nil } +/** + * Shutdown a device + * + */ func sync_shutdown_everything() { // first unlock everything - fmt.Printf("Unlocking devices...\n") + fmt.Printf("Unlocking and closing devices...\n") for _, i := range sync_storage { i.lockable.Unlock() i.tracker.Close() // Show some stats - i.destMetrics.ShowStats(i.name) + i.dest_metrics.ShowStats(i.name) } - fmt.Printf("Shutting down devices cleanly...\n") + fmt.Printf("Shutting down exposed devices cleanly...\n") for _, p := range sync_exposed { device := p.Device() - fmt.Printf("Shutdown nbd device %s\n", device) _ = p.Shutdown() } } -// Migrate a device -func migrateDeviceS3(dev_id uint32, name string, +/** + * Migrate a device to S3 + * + */ +func sync_migrate_s3(dev_id uint32, name string, sinfo *syncStorageInfo) error { - dest_metrics := modules.NewMetrics(sinfo.destMetrics) + dest_metrics := modules.NewMetrics(sinfo.dest_metrics) conf := migrator.NewMigratorConfig().WithBlockSize(sync_block_size) conf.Locker_handler = func() { @@ -226,21 +243,23 @@ func migrateDeviceS3(dev_id uint32, name string, sinfo.lockable.Unlock() } conf.Concurrency = map[int]int{ - storage.BlockTypeAny: 8, + storage.BlockTypeAny: 16, } conf.Integrity = false + conf.Cancel_writes = true conf.Progress_handler = func(p *migrator.MigrationProgress) { - fmt.Printf("[%s] Progress Moved: %d/%d %.2f%% Clean: %d/%d %.2f%% InProgress: %d\n", + fmt.Printf("[%s] Progress Moved: %d/%d %.2f%% Clean: %d/%d %.2f%% InProgress: %d Total Mig: %d Canceled: %d Dupes: %d\n", name, p.Migrated_blocks, p.Total_blocks, p.Migrated_blocks_perc, p.Ready_blocks, p.Total_blocks, p.Ready_blocks_perc, - p.Active_blocks) + p.Active_blocks, p.Total_Migrated_blocks, p.Total_Canceled_blocks, p.Total_Duplicated_blocks) dest_metrics.ShowStats("S3") } conf.Error_handler = func(b *storage.BlockInfo, err error) { fmt.Printf("[%s] Error for block %d error %v\n", name, b.Block, err) } + // Show logging for S3 writes log_dest := modules.NewLogger(dest_metrics, "S3") mig, err := migrator.NewMigrator(sinfo.tracker, log_dest, sinfo.orderer, conf) @@ -251,34 +270,28 @@ func migrateDeviceS3(dev_id uint32, name string, ctx, cancelFn := context.WithCancel(context.TODO()) - // Do random writes to the device for testing - // - if sync_random_writes { + // If we are replaying a log for this device, do it here + if sinfo.replay_log != "" { + fmt.Printf("Replay from binlog %s\n", sinfo.replay_log) + // Open up a binlog, and replay it + blr, err := modules.NewBinLogReplay(sinfo.replay_log, sinfo.lockable) + if err != nil { + return err + } + + // Replay the binlog go func() { - dev_size := int(sinfo.tracker.Size()) - t := time.NewTicker(sync_write_period) for { select { case <-ctx.Done(): - return - case <-t.C: - // Do a random write to the device... - sizes := []int{4 * 1024, 8 * 1024, 16 * 1024, 32 * 1024} - size := sizes[write_rand_source.Intn(len(sizes))] - offset := write_rand_source.Intn(dev_size) - // Now align, and shift if needed - if offset+size > dev_size { - offset = dev_size - size - } - offset = 4096 * (offset / 4096) - // Now do a write... - buffer := make([]byte, size) - crand.Read(buffer) - fmt.Printf("-Write- %d %d\n", offset, len(buffer)) - _, err := sinfo.lockable.WriteAt(buffer, int64(offset)) - if err != nil { - panic(err) - } + break + default: + } + err := blr.ExecuteNext(1) + if errors.Is(err, io.EOF) { + break + } else if err != nil { + panic(err) } } }() @@ -286,8 +299,16 @@ func migrateDeviceS3(dev_id uint32, name string, num_blocks := (sinfo.tracker.Size() + uint64(sync_block_size) - 1) / uint64(sync_block_size) - // NB: We only need to do this for existing sources. - /* + is_new := true + + if is_new { + // Since it's a new source, it's all zeros. We don't need to do an initial migration. + for b := 0; b < int(num_blocks); b++ { + mig.SetMigratedBlock(b) + } + + sinfo.tracker.TrackAt(0, int64(sinfo.tracker.Size())) + } else { fmt.Printf("Doing migration...\n") // Now do the initial migration... @@ -295,66 +316,34 @@ func migrateDeviceS3(dev_id uint32, name string, if err != nil { return err } - */ - // Since it's a new source, it's all zeros. We don't need to do an initial migration. - for b := 0; b < int(num_blocks); b++ { - mig.SetMigratedBlock(b) - } - - sinfo.tracker.TrackAt(0, int64(sinfo.tracker.Size())) + fmt.Printf("Waiting...\n") - fmt.Printf("Waiting...\n") - - // Wait for completion. - err = mig.WaitForCompletion() - if err != nil { - return err + // Wait for completion. + err = mig.WaitForCompletion() + if err != nil { + return err + } } - // Enter a loop looking for more dirty blocks to migrate... + // Now enter a loop looking for more dirty blocks to migrate... fmt.Printf("Dirty loop...\n") - // Dirty block selection params go here - max_age := 5000 * time.Millisecond - limit := 4 - min_changed := 8 - // - getter := func() []uint { - return sinfo.tracker.GetDirtyBlocks(max_age, limit, sync_dirty_block_shift, min_changed) + return sinfo.tracker.GetDirtyBlocks(sync_block_max_age, sync_dirty_limit, sync_dirty_block_shift, sync_dirty_min_changed) } ctime := time.Now() - block_histo := make(map[uint]int) - for { if time.Since(ctime) > sync_time_limit { break } - // Show dirty status... - ood := sinfo.tracker.MeasureDirty() - ood_age := sinfo.tracker.MeasureDirtyAge() - fmt.Printf("DIRTY STATUS %dms old, with %d blocks\n", time.Since(ood_age).Milliseconds(), ood) - - blocks := mig.GetLatestDirtyFunc(getter) // + blocks := mig.GetLatestDirtyFunc(getter) if blocks != nil { - fmt.Printf("Dirty blocks %v\n", blocks) - - // Update the histogram for blocks... - for _, b := range blocks { - _, ok := block_histo[b] - if ok { - block_histo[b]++ - } else { - block_histo[b] = 1 - } - } - err = mig.MigrateDirty(blocks) if err != nil { return err @@ -362,38 +351,19 @@ func migrateDeviceS3(dev_id uint32, name string, } else { mig.Unlock() } - time.Sleep(100 * time.Millisecond) + time.Sleep(sync_dirty_period) } cancelFn() // Stop the write loop - err = mig.WaitForCompletion() - if err != nil { - return err - } - - // Check the histogram... - counts := make(map[int]int) - for _, count := range block_histo { - _, ok := counts[count] - if ok { - counts[count]++ - } else { - counts[count] = 1 - } - } - - for c, cc := range counts { - fmt.Printf("Write histo %d - %d\n", c, cc) - } - ood := sinfo.tracker.MeasureDirty() ood_age := sinfo.tracker.MeasureDirtyAge() - fmt.Printf("DIRTY STATUS %dms old, with %d blocks\n", time.Since(ood_age).Milliseconds(), ood) - // Check how many dirty blocks were left, and how out of date we are... - //left_blocks := sinfo.tracker.GetDirtyBlocks(0, 1000000, sync_dirty_block_shift, 0) + err = mig.WaitForCompletion() + if err != nil { + return err + } return nil } diff --git a/silo_binlog_32m_hello_world b/silo_binlog_32m_hello_world new file mode 100644 index 0000000000000000000000000000000000000000..e3fe1c96e21e8804ed758cc504a88be1827e5ed2 GIT binary patch literal 41170 zcmeI$p=v^57zW^fHkWnqIMt7_054PL6={r^920t5&UAV7cs0RjXF5FkJx zmcZ$I+sTcL02oNAZfB*pk1PBlyK%gkF`&_Pb$S6>x5+Fc;009C72oNAZ zfB*pku>^)chjVUZ6o_?KZ3qw`K!5-N0t5&UAV7dXQJ}Yc?cV*Si~>a}0RjXF5FkK+ z009C72oN9;OJFr^O^gDuZmJCd0t5&UAV7cs0RjXF5U3^a*gVdS0=1}w2oNAZfB*pk n1PBlyK!89jfy>+G%P0`*rrHo7K!5-N0t5&UAV7csfm#ArZhd)oeGh8s$N009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7e?9DzT7Jv53cvZviS zr&S990t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z%Lsh(<3nR>MfSA2mlq&p*PK=@1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72rMIT`s2f6Yen|7&1HrgN`L?X0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfWRDq56(R{iYl_F-8-jM z3jqQI2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0Rqbi z9N&FpY^}(icKq)9{G3)T1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72rMIT-76oCtrgkR zo?B+Pp#%sJAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PII#*thGR_TUUCIegXst5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfWW`)-CKxNRU8N4 zbYa6gd)F7DZr%FDW5{Wk_XogDzMJ(L;2hCmYHN$qXtgdMQ%0 z3zR`6iBQr_vdjmOYKVFWTkD(|Ju}XN$;p}4@msOZ*?X_GFTc+-yYt_OLx2DQ0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7csf&ZDnbz9ceuBa_k z4yZSgDe3*wmupmy1Pf7Z-r}z4L+7sp z^xywm2HgMK<3&%5J-w&r>Ziv(E%tpaAJojiXuNeG+ z80`xf@n!x{-+!w0kAL5{puyjN(s!Zpv5%yEwl{tL&;O+H7^D69 zumAj4hrIvQZ&>|5{{z4Ob;gT6d4yj8*X&C9SH{;PZM5?rIkP1K1Wq)8WfyO$MQ+=c z{-%INfB*pk1PBlyK!5-N0t5)0FajsVq*G%*JNBK^dZ+h47+Bo%%ay&;r+l1)_;t^$ z?%bH|1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfWW^KXo_EnF#X?UHk$x}x{0l*V009C72oNAZ zfWWW>(!cjNDO9Sn<9mI%TAdwN(l0#vrlY-j>+N^l-L>-edsg3AuNS-$AV7cs0RjXF z5FkK+009E!2uup$yPLLu{aC8=8|(jHI_mX*IZ0e80RjXF5FkK+009C72oNBU6G-*{ z`+uCfI*s>s z_Q29bn|ik|eChc|PFp`?$=c0v!jSWh+R+9A1PBlyK!5-N0t5&UAV6Tu1TJ5*rFQPN zSo&APwKET2Ncv>}EwT4K892P%0s#U92oNAZfB*pk1PBly&>R9+&D>fG(iR^HRKt$t z&5=q90t5&UAV7cs0RjXF5FkJxE71DsulY`v2;!8(%IyhyVl#5FkK+009C72oNAZ zfIwCt)&Eo5!j0Jp1_%%!K!5-N0t5&UAV7csf#L;H{eRoO7iSh9L;wN=2oNAZfB*pk z1PBlyKp-oS>i-L_JL!k)1Oo&J5FkK+009C72oNAZfI#sAss8W$d;Estg9t!?009C7 z2oNAZfB*pk1PEjWQvLtn)f<*)Cm0|=fB*pk1PBlyK!5-N0tAW|NcI0&-QS*3d=LQ$ z5FkK+009C72oNAZfB=E4K&t=mf8yTV*$D;+5FkK+009C72oNAZfB=Ew1ycRLYR3J~ z6dyzY0t5&UAV7cs0RjXF5FkJxE0F5{i`IR=G&{in0RjXF5FkK+009C72oNApyg;h| zuetA`Gm8%*009C72oNAZfB*pk1PBlykQGSv|0!SW{w_Pg009C72oNAZfB*pk1PBly zP`p5@|Env$dan2&0uUfTfB*pk1PBlyK!5-N0$G7n|KG4{&9dwS0|W>VAV7cs0RjXF z5FkK+K=A^p{{L-j&nd+R5r6;z0t5&UAV7cs0RjXF5XcIo`v0}Ic{{Tc3=kkdfB*pk z1PBlyK!5-N0>ulY`hV-jOP(%1hyVl#5FkK+009C72oNAZfIwCt)&F}=eRgqnf&l^q z2oNAZfB*pk1PBlyK%jVmRR8z<-97c837^!mv`r47T6`)22oM;#KxaHe>^+qbc1{bS zcY6Jh=>}1|Njw}EQtymCws+>vSsB8iLyHeqLfoZqy*5-^PfHWiOB|dRZ!{mogWS>@ z!UIbeZR*{=@TKP;Ic@!nC2KdQDgS+IIqHxT8n$%JT>NM2yhVR*zH7tlZ-0NsvPxxs znAA6(a<%{765^#EZtZa?Wl8THv7a7$BntcDX=V+12b~{BkF?H+mW&3s*0`}}@W#T| k$B#R<0R8^Qwzs`TBCu+~IiHVYfBO<3K!5-N0%Z#P1AM9WrvLx| literal 0 HcmV?d00001 From 52c69e82cc993b7aac055fa347b6bee6f796c6d3 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 3 May 2024 12:24:08 +0100 Subject: [PATCH 17/21] Lint fixes --- cmd/sync.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/sync.go b/cmd/sync.go index c0937eab..74d37f06 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -230,7 +230,7 @@ func sync_shutdown_everything() { * Migrate a device to S3 * */ -func sync_migrate_s3(dev_id uint32, name string, +func sync_migrate_s3(_ uint32, name string, sinfo *syncStorageInfo) error { dest_metrics := modules.NewMetrics(sinfo.dest_metrics) @@ -276,6 +276,7 @@ func sync_migrate_s3(dev_id uint32, name string, // Open up a binlog, and replay it blr, err := modules.NewBinLogReplay(sinfo.replay_log, sinfo.lockable) if err != nil { + cancelFn() return err } @@ -291,6 +292,7 @@ func sync_migrate_s3(dev_id uint32, name string, if errors.Is(err, io.EOF) { break } else if err != nil { + cancelFn() panic(err) } } @@ -314,6 +316,7 @@ func sync_migrate_s3(dev_id uint32, name string, // Now do the initial migration... err = mig.Migrate(int(num_blocks)) if err != nil { + cancelFn() return err } @@ -322,6 +325,7 @@ func sync_migrate_s3(dev_id uint32, name string, // Wait for completion. err = mig.WaitForCompletion() if err != nil { + cancelFn() return err } } @@ -346,6 +350,7 @@ func sync_migrate_s3(dev_id uint32, name string, if blocks != nil { err = mig.MigrateDirty(blocks) if err != nil { + cancelFn() return err } } else { From c10fad6ed271fbf664ff3370122dd93e20ff6494 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 3 May 2024 12:25:41 +0100 Subject: [PATCH 18/21] Lint --- pkg/storage/modules/binlog_replay.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/storage/modules/binlog_replay.go b/pkg/storage/modules/binlog_replay.go index d40b292f..0d6354b4 100644 --- a/pkg/storage/modules/binlog_replay.go +++ b/pkg/storage/modules/binlog_replay.go @@ -80,5 +80,4 @@ func (i *BinLogReplay) ExecuteNext(speed float64) error { } else { panic("Unknown packet in binlog") } - return nil } From 32dac2e9785fcccbfa26c5a50f1ee82feba831c3 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 3 May 2024 12:58:03 +0100 Subject: [PATCH 19/21] Changed migrator to default cancel_writes false --- pkg/storage/migrator/migrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/migrator/migrator.go b/pkg/storage/migrator/migrator.go index b35dc2c6..b4496486 100644 --- a/pkg/storage/migrator/migrator.go +++ b/pkg/storage/migrator/migrator.go @@ -39,7 +39,7 @@ func NewMigratorConfig() *MigratorConfig { storage.BlockTypePriority: 16, }, Integrity: false, - Cancel_writes: true, + Cancel_writes: false, Recent_write_age: time.Minute, } } From e8129ead280e6651b2b5011a2bb798fde753d260 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 3 May 2024 13:04:18 +0100 Subject: [PATCH 20/21] Added some debug for flaky test --- pkg/storage/migrator/migrator_s3_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/storage/migrator/migrator_s3_test.go b/pkg/storage/migrator/migrator_s3_test.go index 02d20ef3..7d347518 100644 --- a/pkg/storage/migrator/migrator_s3_test.go +++ b/pkg/storage/migrator/migrator_s3_test.go @@ -65,6 +65,9 @@ func TestMigratorToS3(t *testing.T) { conf := NewMigratorConfig().WithBlockSize(blockSize) conf.Locker_handler = sourceStorage.Lock conf.Unlocker_handler = sourceStorage.Unlock + conf.Error_handler = func(b *storage.BlockInfo, err error) { + assert.Fail(t, fmt.Sprintf("Error migrating block %d: %v", b.Block, err)) + } mig, err := NewMigrator(sourceDirtyRemote, destStorage, @@ -95,6 +98,9 @@ func TestMigratorToS3(t *testing.T) { err = mig.WaitForCompletion() assert.NoError(t, err) + assert.Equal(t, int64(0), mig.metric_blocks_canceled) + assert.Equal(t, int64(0), mig.metric_blocks_duplicates) + // This will end with migration completed, and consumer Locked. eq, err := storage.Equals(sourceStorageMem, destStorage, blockSize) assert.NoError(t, err) From 8e14c75c75bb0fa50739105e813d917a718d3144 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Fri, 3 May 2024 13:16:14 +0100 Subject: [PATCH 21/21] Default migrator now dedupes writes only during the concurrency wait, and disabled by default --- cmd/sync.go | 1 + pkg/storage/migrator/migrator.go | 14 ++++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index 74d37f06..816f48cd 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -247,6 +247,7 @@ func sync_migrate_s3(_ uint32, name string, } conf.Integrity = false conf.Cancel_writes = true + conf.Dedupe_writes = true conf.Progress_handler = func(p *migrator.MigrationProgress) { fmt.Printf("[%s] Progress Moved: %d/%d %.2f%% Clean: %d/%d %.2f%% InProgress: %d Total Mig: %d Canceled: %d Dupes: %d\n", diff --git a/pkg/storage/migrator/migrator.go b/pkg/storage/migrator/migrator.go index b4496486..bceef62b 100644 --- a/pkg/storage/migrator/migrator.go +++ b/pkg/storage/migrator/migrator.go @@ -22,6 +22,7 @@ type MigratorConfig struct { Concurrency map[int]int Integrity bool Cancel_writes bool + Dedupe_writes bool Recent_write_age time.Duration } @@ -40,6 +41,7 @@ func NewMigratorConfig() *MigratorConfig { }, Integrity: false, Cancel_writes: false, + Dedupe_writes: false, Recent_write_age: time.Minute, } } @@ -88,6 +90,7 @@ type Migrator struct { wg sync.WaitGroup integrity *integrity.IntegrityChecker cancel_writes bool + dedupe_writes bool recent_write_age time.Duration } @@ -120,6 +123,7 @@ func NewMigrator(source storage.TrackingStorageProvider, last_written_blocks: make([]time.Time, num_blocks), recent_write_age: config.Recent_write_age, cancel_writes: config.Cancel_writes, + dedupe_writes: config.Dedupe_writes, } if m.dest.Size() != m.source_tracker.Size() { @@ -234,8 +238,10 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { m.dest.CancelWrites(int64(pos*uint(m.block_size)), int64(m.block_size)) } - // TODO: If there's already one waiting here for this block, we want to do nothing. - if m.waiting_blocks.SetBitIfClear(int(pos)) { + one_waiting := m.waiting_blocks.SetBitIfClear(int(pos)) + + // If there's already one waiting here for this block, we want to do nothing. + if m.dedupe_writes && one_waiting { // Someone else is already waiting here. We will quit. atomic.AddInt64(&m.metric_blocks_duplicates, 1) return nil @@ -247,6 +253,8 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { } cc <- true + m.waiting_blocks.ClearBit(int(pos)) // Allow more writes for this to come in now. + m.wg.Add(1) m.clean_blocks.ClearBit(int(pos)) @@ -263,8 +271,6 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { cc = m.concurrency[storage.BlockTypeAny] } <-cc - - m.waiting_blocks.ClearBit(int(block_no.Block)) // Allow more writes for this block now. }(i) }