Skip to content

Commit

Permalink
migration chunk size now sent from src->dst
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmyaxod committed Feb 29, 2024
1 parent 531e444 commit 693a597
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 27 deletions.
10 changes: 4 additions & 6 deletions cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,21 @@ func runConnect(ccmd *cobra.Command, args []string) {

// Handle a new incoming device
func handleIncomingDevice(pro protocol.Protocol, dev uint32) {
// Size of migration blocks
// TODO: Configurable
block_size := 1024 * 64

var destStorage storage.StorageProvider
var destWaitingLocal *modules.WaitingCacheLocal
var destWaitingRemote *modules.WaitingCacheRemote
var dest *modules.FromProtocol

destWaitingRemoteFactory := func(di *protocol.DevInfo) storage.StorageProvider {
fmt.Printf("Received DevInfo name=%s size=%d\n", di.Name, di.Size)
fmt.Printf("= %d = Received DevInfo name=%s size=%d blocksize=%d\n", dev, di.Name, di.Size, di.BlockSize)

cr := func(s int) storage.StorageProvider {
return sources.NewMemoryStorage(s)
}
// Setup some sharded memory storage (for concurrent write speed)
destStorage = modules.NewShardedStorage(int(di.Size), int(di.Size/1024), cr)
destWaitingLocal, destWaitingRemote = modules.NewWaitingCache(destStorage, block_size)

destWaitingLocal, destWaitingRemote = modules.NewWaitingCache(destStorage, int(di.BlockSize))

// Connect the waitingCache to the FromProtocol
destWaitingLocal.NeedAt = func(offset int64, length int32) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func runServe(ccmd *cobra.Command, args []string) {
pro := protocol.NewProtocolRW(context.TODO(), c, c, nil)
dest := modules.NewToProtocol(uint64(serve_size), 777, pro)

dest.SendDevInfo("data")
dest.SendDevInfo("data", uint32(block_size))

go pro.Handle()

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/migrator/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestMigratorSimplePipe(t *testing.T) {
go destFrom.HandleWriteAt()
go destFrom.HandleDevInfo()

destination.SendDevInfo("test")
destination.SendDevInfo("test", uint32(blockSize))

conf := NewMigratorConfig().WithBlockSize(blockSize)
conf.LockerHandler = sourceStorage.Lock
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/modules/to_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ func (i *ToProtocol) SendEvent(e protocol.EventType) error {
return protocol.DecodeEventResponse(r)
}

func (i *ToProtocol) SendDevInfo(name string) error {
func (i *ToProtocol) SendDevInfo(name string, block_size uint32) error {
di := &protocol.DevInfo{
Size: i.size,
Name: name,
Size: i.size,
BlockSize: block_size,
Name: name,
}
b := protocol.EncodeDevInfo(di)
_, err := i.protocol.SendPacket(i.dev, protocol.ID_PICK_ANY, b)
Expand Down
24 changes: 14 additions & 10 deletions pkg/storage/protocol/dev_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,32 @@ import (
)

type DevInfo struct {
Size uint64
Name string
Size uint64
BlockSize uint32
Name string
}

func EncodeDevInfo(di *DevInfo) []byte {
buff := make([]byte, 1+8+2+len(di.Name))
buff := make([]byte, 1+8+4+2+len(di.Name))
buff[0] = COMMAND_DEV_INFO
binary.LittleEndian.PutUint64(buff[1:], di.Size)
binary.LittleEndian.PutUint16(buff[9:], uint16(len(di.Name)))
copy(buff[11:], []byte(di.Name))
binary.LittleEndian.PutUint32(buff[9:], di.BlockSize)
binary.LittleEndian.PutUint16(buff[13:], uint16(len(di.Name)))
copy(buff[15:], []byte(di.Name))
return buff
}

func DecodeDevInfo(buff []byte) (*DevInfo, error) {
if buff == nil || len(buff) < 11 || buff[0] != COMMAND_DEV_INFO {
if buff == nil || len(buff) < 15 || buff[0] != COMMAND_DEV_INFO {
return nil, errors.New("Invalid packet")
}
size := binary.LittleEndian.Uint64(buff[1:])
l := binary.LittleEndian.Uint16(buff[9:])
if int(l)+11 > len(buff) {
blocksize := binary.LittleEndian.Uint32(buff[9:])

l := binary.LittleEndian.Uint16(buff[13:])
if int(l)+15 > len(buff) {
return nil, errors.New("Invalid packet")
}
name := string(buff[11 : 11+l])
return &DevInfo{Size: size, Name: name}, nil
name := string(buff[15 : 15+l])
return &DevInfo{Size: size, BlockSize: blocksize, Name: name}, nil
}
3 changes: 2 additions & 1 deletion pkg/storage/protocol/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ func TestDirtyList(t *testing.T) {
}

func TestDevInfo(t *testing.T) {
b := EncodeDevInfo(&DevInfo{Size: 12345, Name: "hello"})
b := EncodeDevInfo(&DevInfo{Size: 12345, BlockSize: 55, Name: "hello"})

di, err := DecodeDevInfo(b)
assert.NoError(t, err)
assert.Equal(t, uint64(12345), di.Size)
assert.Equal(t, uint32(55), di.BlockSize)
assert.Equal(t, "hello", di.Name)

// Make sure we can't decode silly things
Expand Down
10 changes: 5 additions & 5 deletions testing/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestProtocolWriteAt(t *testing.T) {
go destFromProtocol.HandleWriteAt()

// Send devInfo
sourceToProtocol.SendDevInfo("test")
sourceToProtocol.SendDevInfo("test", 4096)

buff := make([]byte, 4096)
rand.Read(buff)
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestProtocolReadAt(t *testing.T) {
go destFromProtocol.HandleReadAt()
go destFromProtocol.HandleWriteAt()

sourceToProtocol.SendDevInfo("test")
sourceToProtocol.SendDevInfo("test", 4096)

// Now check it was written to the source
buff2 := make([]byte, 4096)
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestProtocolRWWriteAt(t *testing.T) {
go destFromProtocol.HandleReadAt()
go destFromProtocol.HandleWriteAt()

sourceToProtocol.SendDevInfo("test")
sourceToProtocol.SendDevInfo("test", 4096)

// Should know the dev now...
assert.Equal(t, uint32(1), <-destDev)
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestProtocolRWReadAt(t *testing.T) {
go destFromProtocol.HandleReadAt()
go destFromProtocol.HandleWriteAt()

sourceToProtocol.SendDevInfo("test")
sourceToProtocol.SendDevInfo("test", 4096)

// Now check it was written to the source
buff2 := make([]byte, 4096)
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestProtocolEvents(t *testing.T) {
go destFromProtocol.HandleSend(context.TODO())

// Send devInfo
sourceToProtocol.SendDevInfo("test")
sourceToProtocol.SendDevInfo("test", 4096)

// Send some events and make sure they happen at the other end...

Expand Down

0 comments on commit 693a597

Please sign in to comment.