From 693a597c5b8aa981578d8788b3b482d104dd67e0 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Thu, 29 Feb 2024 13:18:51 +0000 Subject: [PATCH] migration chunk size now sent from src->dst --- cmd/connect.go | 10 ++++------ cmd/serve.go | 2 +- pkg/storage/migrator/migrator_test.go | 2 +- pkg/storage/modules/to_protocol.go | 7 ++++--- pkg/storage/protocol/dev_info.go | 24 ++++++++++++++---------- pkg/storage/protocol/protocol_test.go | 3 ++- testing/protocol_test.go | 10 +++++----- 7 files changed, 31 insertions(+), 27 deletions(-) diff --git a/cmd/connect.go b/cmd/connect.go index 85340a03..77ebd021 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -70,23 +70,21 @@ func runConnect(ccmd *cobra.Command, args []string) { // Handle a new incoming device func handleIncomingDevice(pro protocol.Protocol, dev uint32) { - // Size of migration blocks - // TODO: Configurable - block_size := 1024 * 64 - var destStorage storage.StorageProvider var destWaitingLocal *modules.WaitingCacheLocal var destWaitingRemote *modules.WaitingCacheRemote var dest *modules.FromProtocol destWaitingRemoteFactory := func(di *protocol.DevInfo) storage.StorageProvider { - fmt.Printf("Received DevInfo name=%s size=%d\n", di.Name, di.Size) + fmt.Printf("= %d = Received DevInfo name=%s size=%d blocksize=%d\n", dev, di.Name, di.Size, di.BlockSize) + cr := func(s int) storage.StorageProvider { return sources.NewMemoryStorage(s) } // Setup some sharded memory storage (for concurrent write speed) destStorage = modules.NewShardedStorage(int(di.Size), int(di.Size/1024), cr) - destWaitingLocal, destWaitingRemote = modules.NewWaitingCache(destStorage, block_size) + + destWaitingLocal, destWaitingRemote = modules.NewWaitingCache(destStorage, int(di.BlockSize)) // Connect the waitingCache to the FromProtocol destWaitingLocal.NeedAt = func(offset int64, length int32) { diff --git a/cmd/serve.go b/cmd/serve.go index 5369014f..f9a16c5d 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -140,7 +140,7 @@ func runServe(ccmd *cobra.Command, args []string) { pro := protocol.NewProtocolRW(context.TODO(), c, c, nil) dest := modules.NewToProtocol(uint64(serve_size), 777, pro) - dest.SendDevInfo("data") + dest.SendDevInfo("data", uint32(block_size)) go pro.Handle() diff --git a/pkg/storage/migrator/migrator_test.go b/pkg/storage/migrator/migrator_test.go index 85efb4d7..99a550b0 100644 --- a/pkg/storage/migrator/migrator_test.go +++ b/pkg/storage/migrator/migrator_test.go @@ -125,7 +125,7 @@ func TestMigratorSimplePipe(t *testing.T) { go destFrom.HandleWriteAt() go destFrom.HandleDevInfo() - destination.SendDevInfo("test") + destination.SendDevInfo("test", uint32(blockSize)) conf := NewMigratorConfig().WithBlockSize(blockSize) conf.LockerHandler = sourceStorage.Lock diff --git a/pkg/storage/modules/to_protocol.go b/pkg/storage/modules/to_protocol.go index 8ffc294e..9775fd38 100644 --- a/pkg/storage/modules/to_protocol.go +++ b/pkg/storage/modules/to_protocol.go @@ -37,10 +37,11 @@ func (i *ToProtocol) SendEvent(e protocol.EventType) error { return protocol.DecodeEventResponse(r) } -func (i *ToProtocol) SendDevInfo(name string) error { +func (i *ToProtocol) SendDevInfo(name string, block_size uint32) error { di := &protocol.DevInfo{ - Size: i.size, - Name: name, + Size: i.size, + BlockSize: block_size, + Name: name, } b := protocol.EncodeDevInfo(di) _, err := i.protocol.SendPacket(i.dev, protocol.ID_PICK_ANY, b) diff --git a/pkg/storage/protocol/dev_info.go b/pkg/storage/protocol/dev_info.go index 7ca5e2db..b4ac1778 100644 --- a/pkg/storage/protocol/dev_info.go +++ b/pkg/storage/protocol/dev_info.go @@ -6,28 +6,32 @@ import ( ) type DevInfo struct { - Size uint64 - Name string + Size uint64 + BlockSize uint32 + Name string } func EncodeDevInfo(di *DevInfo) []byte { - buff := make([]byte, 1+8+2+len(di.Name)) + buff := make([]byte, 1+8+4+2+len(di.Name)) buff[0] = COMMAND_DEV_INFO binary.LittleEndian.PutUint64(buff[1:], di.Size) - binary.LittleEndian.PutUint16(buff[9:], uint16(len(di.Name))) - copy(buff[11:], []byte(di.Name)) + binary.LittleEndian.PutUint32(buff[9:], di.BlockSize) + binary.LittleEndian.PutUint16(buff[13:], uint16(len(di.Name))) + copy(buff[15:], []byte(di.Name)) return buff } func DecodeDevInfo(buff []byte) (*DevInfo, error) { - if buff == nil || len(buff) < 11 || buff[0] != COMMAND_DEV_INFO { + if buff == nil || len(buff) < 15 || buff[0] != COMMAND_DEV_INFO { return nil, errors.New("Invalid packet") } size := binary.LittleEndian.Uint64(buff[1:]) - l := binary.LittleEndian.Uint16(buff[9:]) - if int(l)+11 > len(buff) { + blocksize := binary.LittleEndian.Uint32(buff[9:]) + + l := binary.LittleEndian.Uint16(buff[13:]) + if int(l)+15 > len(buff) { return nil, errors.New("Invalid packet") } - name := string(buff[11 : 11+l]) - return &DevInfo{Size: size, Name: name}, nil + name := string(buff[15 : 15+l]) + return &DevInfo{Size: size, BlockSize: blocksize, Name: name}, nil } diff --git a/pkg/storage/protocol/protocol_test.go b/pkg/storage/protocol/protocol_test.go index e55e433d..e951c6a7 100644 --- a/pkg/storage/protocol/protocol_test.go +++ b/pkg/storage/protocol/protocol_test.go @@ -173,11 +173,12 @@ func TestDirtyList(t *testing.T) { } func TestDevInfo(t *testing.T) { - b := EncodeDevInfo(&DevInfo{Size: 12345, Name: "hello"}) + b := EncodeDevInfo(&DevInfo{Size: 12345, BlockSize: 55, Name: "hello"}) di, err := DecodeDevInfo(b) assert.NoError(t, err) assert.Equal(t, uint64(12345), di.Size) + assert.Equal(t, uint32(55), di.BlockSize) assert.Equal(t, "hello", di.Name) // Make sure we can't decode silly things diff --git a/testing/protocol_test.go b/testing/protocol_test.go index 728969ab..42bde400 100644 --- a/testing/protocol_test.go +++ b/testing/protocol_test.go @@ -40,7 +40,7 @@ func TestProtocolWriteAt(t *testing.T) { go destFromProtocol.HandleWriteAt() // Send devInfo - sourceToProtocol.SendDevInfo("test") + sourceToProtocol.SendDevInfo("test", 4096) buff := make([]byte, 4096) rand.Read(buff) @@ -92,7 +92,7 @@ func TestProtocolReadAt(t *testing.T) { go destFromProtocol.HandleReadAt() go destFromProtocol.HandleWriteAt() - sourceToProtocol.SendDevInfo("test") + sourceToProtocol.SendDevInfo("test", 4096) // Now check it was written to the source buff2 := make([]byte, 4096) @@ -142,7 +142,7 @@ func TestProtocolRWWriteAt(t *testing.T) { go destFromProtocol.HandleReadAt() go destFromProtocol.HandleWriteAt() - sourceToProtocol.SendDevInfo("test") + sourceToProtocol.SendDevInfo("test", 4096) // Should know the dev now... assert.Equal(t, uint32(1), <-destDev) @@ -206,7 +206,7 @@ func TestProtocolRWReadAt(t *testing.T) { go destFromProtocol.HandleReadAt() go destFromProtocol.HandleWriteAt() - sourceToProtocol.SendDevInfo("test") + sourceToProtocol.SendDevInfo("test", 4096) // Now check it was written to the source buff2 := make([]byte, 4096) @@ -242,7 +242,7 @@ func TestProtocolEvents(t *testing.T) { go destFromProtocol.HandleSend(context.TODO()) // Send devInfo - sourceToProtocol.SendDevInfo("test") + sourceToProtocol.SendDevInfo("test", 4096) // Send some events and make sure they happen at the other end...