diff --git a/pkg/storage/migrator/migrator_s3_assisted_test.go b/pkg/storage/migrator/migrator_s3_assisted_test.go index d67fda56..751c101c 100644 --- a/pkg/storage/migrator/migrator_s3_assisted_test.go +++ b/pkg/storage/migrator/migrator_s3_assisted_test.go @@ -153,9 +153,6 @@ func TestMigratorS3Assisted(t *testing.T) { go func() { _ = destFrom.HandleWriteAt() }() - go func() { - _ = destFrom.HandleWriteAtHash() - }() go func() { _ = destFrom.HandleDevInfo() }() @@ -280,9 +277,6 @@ func TestMigratorS3AssistedChangeSource(t *testing.T) { go func() { _ = destFrom.HandleWriteAt() }() - go func() { - _ = destFrom.HandleWriteAtHash() - }() go func() { _ = destFrom.HandleDevInfo() }() diff --git a/pkg/storage/protocol/from_protocol.go b/pkg/storage/protocol/from_protocol.go index d0248023..228f3232 100644 --- a/pkg/storage/protocol/from_protocol.go +++ b/pkg/storage/protocol/from_protocol.go @@ -311,62 +311,46 @@ func (fp *FromProtocol) HandleWriteAt() error { offset, write_data, err := packets.DecodeWriteAt(data) if err != nil { - return err - } + // It could be a WriteAtHash command... + _, length, _, errWriteAtHash := packets.DecodeWriteAtHash(data) + if errWriteAtHash != nil { + return err + } + + // For now, we will simply ack. We do NOT mark it as present. That part will be done when the alternateSources is retrieved. + // fp.mark_range_present(int(offset), int(length)) - // Handle in a goroutine - go func(goffset int64, gdata []byte, gid uint32) { - n, err := fp.prov.WriteAt(gdata, goffset) war := &packets.WriteAtResponse{ - Bytes: n, - Error: err, + Error: nil, + Bytes: int(length), } - if err == nil { - fp.mark_range_present(int(goffset), len(gdata)) - } - _, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeWriteAtResponse(war)) + _, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeWriteAtResponse(war)) if err != nil { - errLock.Lock() - errValue = err - errLock.Unlock() + return err } - }(offset, write_data, id) - } -} - -// Handle any WriteAtHash commands. (We simply acknowledge) -func (fp *FromProtocol) HandleWriteAtHash() error { - err := fp.wait_init_or_cancel() - if err != nil { - return err - } - - for { - id, data, err := fp.protocol.WaitForCommand(fp.dev, packets.COMMAND_WRITE_AT_HASH) - if err != nil { - return err - } - - _, length, _, err := packets.DecodeWriteAtHash(data) - if err != nil { - return err - } - - // For now, we will simply ack. We do NOT mark it as present. That part will be done when the alternateSources is retrieved. - // fp.mark_range_present(int(offset), int(length)) - - war := &packets.WriteAtResponse{ - Error: nil, - Bytes: int(length), - } - _, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeWriteAtResponse(war)) - if err != nil { - return err + } else { + // Handle in a goroutine + go func(goffset int64, gdata []byte, gid uint32) { + n, err := fp.prov.WriteAt(gdata, goffset) + war := &packets.WriteAtResponse{ + Bytes: n, + Error: err, + } + if err == nil { + fp.mark_range_present(int(goffset), len(gdata)) + } + _, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeWriteAtResponse(war)) + if err != nil { + errLock.Lock() + errValue = err + errLock.Unlock() + } + }(offset, write_data, id) } } } -// Handle any WriteAt commands, and send to provider +// Handle any WriteAtWithMap commands, and send to provider func (fp *FromProtocol) HandleWriteAtWithMap(cb func(offset int64, data []byte, idmap map[uint64]uint64) error) error { err := fp.wait_init_or_cancel() if err != nil { diff --git a/pkg/storage/protocol/packets/packet.go b/pkg/storage/protocol/packets/packet.go index 6ca67aff..3dabb32e 100644 --- a/pkg/storage/protocol/packets/packet.go +++ b/pkg/storage/protocol/packets/packet.go @@ -16,8 +16,7 @@ const ( COMMAND_WRITE_AT_WITH_MAP = COMMAND_REQUEST | byte(10) COMMAND_REMOVE_DEV = COMMAND_REQUEST | byte(11) COMMAND_REMOVE_FROM_MAP = COMMAND_REQUEST | byte(12) - COMMAND_WRITE_AT_HASH = COMMAND_REQUEST | byte(13) - COMMAND_ALTERNATE_SOURCES = COMMAND_REQUEST | byte(14) + COMMAND_ALTERNATE_SOURCES = COMMAND_REQUEST | byte(13) ) const ( diff --git a/pkg/storage/protocol/packets/write_at.go b/pkg/storage/protocol/packets/write_at.go index e3df1d55..e8460c0a 100644 --- a/pkg/storage/protocol/packets/write_at.go +++ b/pkg/storage/protocol/packets/write_at.go @@ -6,19 +6,24 @@ import ( "io" ) +const WRITE_AT_DATA = 0 +const WRITE_AT_HASH = 1 + func EncodeWriteAt(offset int64, data []byte) []byte { - buff := make([]byte, 1+8+len(data)) + buff := make([]byte, 2+8+len(data)) buff[0] = COMMAND_WRITE_AT - binary.LittleEndian.PutUint64(buff[1:], uint64(offset)) - copy(buff[9:], data) + buff[1] = WRITE_AT_DATA + binary.LittleEndian.PutUint64(buff[2:], uint64(offset)) + copy(buff[10:], data) return buff } func EncodeWriterWriteAt(offset int64, data []byte) (uint32, func(w io.Writer) error) { - return uint32(9 + len(data)), func(w io.Writer) error { - header := make([]byte, 1+8) + return uint32(10 + len(data)), func(w io.Writer) error { + header := make([]byte, 2+8) header[0] = COMMAND_WRITE_AT - binary.LittleEndian.PutUint64(header[1:], uint64(offset)) + header[1] = WRITE_AT_DATA + binary.LittleEndian.PutUint64(header[2:], uint64(offset)) _, err := w.Write(header) if err != nil { return err @@ -29,11 +34,11 @@ func EncodeWriterWriteAt(offset int64, data []byte) (uint32, func(w io.Writer) e } func DecodeWriteAt(buff []byte) (offset int64, data []byte, err error) { - if buff == nil || len(buff) < 9 || buff[0] != COMMAND_WRITE_AT { + if buff == nil || len(buff) < 10 || buff[0] != COMMAND_WRITE_AT || buff[1] != WRITE_AT_DATA { return 0, nil, errors.New("Invalid packet command") } - off := int64(binary.LittleEndian.Uint64(buff[1:])) - return off, buff[9:], nil + off := int64(binary.LittleEndian.Uint64(buff[2:])) + return off, buff[10:], nil } type WriteAtResponse struct { diff --git a/pkg/storage/protocol/packets/write_at_hash.go b/pkg/storage/protocol/packets/write_at_hash.go index e145e084..9e263ea1 100644 --- a/pkg/storage/protocol/packets/write_at_hash.go +++ b/pkg/storage/protocol/packets/write_at_hash.go @@ -9,23 +9,23 @@ import ( func EncodeWriteAtHash(offset int64, length int64, hash []byte) []byte { var buff bytes.Buffer - buff.WriteByte(COMMAND_WRITE_AT_HASH) + buff.WriteByte(COMMAND_WRITE_AT) + buff.WriteByte(WRITE_AT_HASH) b := make([]byte, 8) binary.LittleEndian.PutUint64(b, uint64(offset)) buff.Write(b) binary.LittleEndian.PutUint64(b, uint64(length)) buff.Write(b) buff.Write(hash) - return buff.Bytes() } func DecodeWriteAtHash(buff []byte) (offset int64, length int64, hash []byte, err error) { - if buff == nil || len(buff) < 17 || buff[0] != COMMAND_WRITE_AT_HASH { + if buff == nil || len(buff) < 18 || buff[0] != COMMAND_WRITE_AT || buff[1] != WRITE_AT_HASH { return 0, 0, nil, errors.New("invalid packet command") } - off := int64(binary.LittleEndian.Uint64(buff[1:])) - l := int64(binary.LittleEndian.Uint64(buff[9:])) + off := int64(binary.LittleEndian.Uint64(buff[2:])) + l := int64(binary.LittleEndian.Uint64(buff[10:])) - return off, l, buff[17:], nil + return off, l, buff[18:], nil }