Skip to content

Commit

Permalink
Rolled WriteAtHash into WriteAt to simplify integration
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod committed Oct 25, 2024
1 parent 6d89c1f commit f862546
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 70 deletions.
6 changes: 0 additions & 6 deletions pkg/storage/migrator/migrator_s3_assisted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ func TestMigratorS3Assisted(t *testing.T) {
go func() {
_ = destFrom.HandleWriteAt()
}()
go func() {
_ = destFrom.HandleWriteAtHash()
}()
go func() {
_ = destFrom.HandleDevInfo()
}()
Expand Down Expand Up @@ -280,9 +277,6 @@ func TestMigratorS3AssistedChangeSource(t *testing.T) {
go func() {
_ = destFrom.HandleWriteAt()
}()
go func() {
_ = destFrom.HandleWriteAtHash()
}()
go func() {
_ = destFrom.HandleDevInfo()
}()
Expand Down
78 changes: 31 additions & 47 deletions pkg/storage/protocol/from_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,62 +311,46 @@ func (fp *FromProtocol) HandleWriteAt() error {

offset, write_data, err := packets.DecodeWriteAt(data)
if err != nil {
return err
}
// It could be a WriteAtHash command...
_, length, _, errWriteAtHash := packets.DecodeWriteAtHash(data)
if errWriteAtHash != nil {
return err
}

// For now, we will simply ack. We do NOT mark it as present. That part will be done when the alternateSources is retrieved.
// fp.mark_range_present(int(offset), int(length))

// Handle in a goroutine
go func(goffset int64, gdata []byte, gid uint32) {
n, err := fp.prov.WriteAt(gdata, goffset)
war := &packets.WriteAtResponse{
Bytes: n,
Error: err,
Error: nil,
Bytes: int(length),
}
if err == nil {
fp.mark_range_present(int(goffset), len(gdata))
}
_, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeWriteAtResponse(war))
_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeWriteAtResponse(war))
if err != nil {
errLock.Lock()
errValue = err
errLock.Unlock()
return err
}
}(offset, write_data, id)
}
}

// Handle any WriteAtHash commands. (We simply acknowledge)
func (fp *FromProtocol) HandleWriteAtHash() error {
err := fp.wait_init_or_cancel()
if err != nil {
return err
}

for {
id, data, err := fp.protocol.WaitForCommand(fp.dev, packets.COMMAND_WRITE_AT_HASH)
if err != nil {
return err
}

_, length, _, err := packets.DecodeWriteAtHash(data)
if err != nil {
return err
}

// For now, we will simply ack. We do NOT mark it as present. That part will be done when the alternateSources is retrieved.
// fp.mark_range_present(int(offset), int(length))

war := &packets.WriteAtResponse{
Error: nil,
Bytes: int(length),
}
_, err = fp.protocol.SendPacket(fp.dev, id, packets.EncodeWriteAtResponse(war))
if err != nil {
return err
} else {
// Handle in a goroutine
go func(goffset int64, gdata []byte, gid uint32) {
n, err := fp.prov.WriteAt(gdata, goffset)
war := &packets.WriteAtResponse{
Bytes: n,
Error: err,
}
if err == nil {
fp.mark_range_present(int(goffset), len(gdata))
}
_, err = fp.protocol.SendPacket(fp.dev, gid, packets.EncodeWriteAtResponse(war))
if err != nil {
errLock.Lock()
errValue = err
errLock.Unlock()
}
}(offset, write_data, id)
}
}
}

// Handle any WriteAt commands, and send to provider
// Handle any WriteAtWithMap commands, and send to provider
func (fp *FromProtocol) HandleWriteAtWithMap(cb func(offset int64, data []byte, idmap map[uint64]uint64) error) error {
err := fp.wait_init_or_cancel()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/protocol/packets/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ const (
COMMAND_WRITE_AT_WITH_MAP = COMMAND_REQUEST | byte(10)
COMMAND_REMOVE_DEV = COMMAND_REQUEST | byte(11)
COMMAND_REMOVE_FROM_MAP = COMMAND_REQUEST | byte(12)
COMMAND_WRITE_AT_HASH = COMMAND_REQUEST | byte(13)
COMMAND_ALTERNATE_SOURCES = COMMAND_REQUEST | byte(14)
COMMAND_ALTERNATE_SOURCES = COMMAND_REQUEST | byte(13)
)

const (
Expand Down
23 changes: 14 additions & 9 deletions pkg/storage/protocol/packets/write_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,24 @@ import (
"io"
)

const WRITE_AT_DATA = 0
const WRITE_AT_HASH = 1

func EncodeWriteAt(offset int64, data []byte) []byte {
buff := make([]byte, 1+8+len(data))
buff := make([]byte, 2+8+len(data))
buff[0] = COMMAND_WRITE_AT
binary.LittleEndian.PutUint64(buff[1:], uint64(offset))
copy(buff[9:], data)
buff[1] = WRITE_AT_DATA
binary.LittleEndian.PutUint64(buff[2:], uint64(offset))
copy(buff[10:], data)
return buff
}

func EncodeWriterWriteAt(offset int64, data []byte) (uint32, func(w io.Writer) error) {
return uint32(9 + len(data)), func(w io.Writer) error {
header := make([]byte, 1+8)
return uint32(10 + len(data)), func(w io.Writer) error {
header := make([]byte, 2+8)
header[0] = COMMAND_WRITE_AT
binary.LittleEndian.PutUint64(header[1:], uint64(offset))
header[1] = WRITE_AT_DATA
binary.LittleEndian.PutUint64(header[2:], uint64(offset))
_, err := w.Write(header)
if err != nil {
return err
Expand All @@ -29,11 +34,11 @@ func EncodeWriterWriteAt(offset int64, data []byte) (uint32, func(w io.Writer) e
}

func DecodeWriteAt(buff []byte) (offset int64, data []byte, err error) {
if buff == nil || len(buff) < 9 || buff[0] != COMMAND_WRITE_AT {
if buff == nil || len(buff) < 10 || buff[0] != COMMAND_WRITE_AT || buff[1] != WRITE_AT_DATA {
return 0, nil, errors.New("Invalid packet command")
}
off := int64(binary.LittleEndian.Uint64(buff[1:]))
return off, buff[9:], nil
off := int64(binary.LittleEndian.Uint64(buff[2:]))
return off, buff[10:], nil
}

type WriteAtResponse struct {
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/protocol/packets/write_at_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ import (
func EncodeWriteAtHash(offset int64, length int64, hash []byte) []byte {
var buff bytes.Buffer

buff.WriteByte(COMMAND_WRITE_AT_HASH)
buff.WriteByte(COMMAND_WRITE_AT)
buff.WriteByte(WRITE_AT_HASH)
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(offset))
buff.Write(b)
binary.LittleEndian.PutUint64(b, uint64(length))
buff.Write(b)
buff.Write(hash)

return buff.Bytes()
}

func DecodeWriteAtHash(buff []byte) (offset int64, length int64, hash []byte, err error) {
if buff == nil || len(buff) < 17 || buff[0] != COMMAND_WRITE_AT_HASH {
if buff == nil || len(buff) < 18 || buff[0] != COMMAND_WRITE_AT || buff[1] != WRITE_AT_HASH {
return 0, 0, nil, errors.New("invalid packet command")
}
off := int64(binary.LittleEndian.Uint64(buff[1:]))
l := int64(binary.LittleEndian.Uint64(buff[9:]))
off := int64(binary.LittleEndian.Uint64(buff[2:]))
l := int64(binary.LittleEndian.Uint64(buff[10:]))

return off, l, buff[17:], nil
return off, l, buff[18:], nil
}

0 comments on commit f862546

Please sign in to comment.