Skip to content

Commit

Permalink
Fixed up toProtocol compressed write flag
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod committed Nov 25, 2024
1 parent fd33b4f commit f84513d
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/migrator/migrator_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func BenchmarkMigrationPipe(mb *testing.B) {
destination := protocol.NewToProtocol(sourceDirtyRemote.Size(), 17, prSource)

if testconf.compress {
destination.CompressedWrites = true
destination.SetCompression(true)
}

err = destination.SendDevInfo("test", uint32(blockSize), "")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/protocol/protocol_transport_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func BenchmarkWriteAt(mb *testing.B) {
func BenchmarkWriteAtComp(mb *testing.B) {
sourceToProtocol := setup(1)

sourceToProtocol.CompressedWrites = true
sourceToProtocol.SetCompression(true)

// Do some writes
buff := make([]byte, 256*1024)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/protocol/protocol_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestProtocolWriteAtComp(t *testing.T) {

sourceToProtocol := NewToProtocol(uint64(size), 1, pr)

sourceToProtocol.CompressedWrites = true
sourceToProtocol.SetCompression(true)

storeFactory := func(di *packets.DevInfo) storage.Provider {
store = sources.NewMemoryStorage(int(di.Size))
Expand Down
16 changes: 10 additions & 6 deletions pkg/storage/protocol/to_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/sha256"
"encoding/binary"
"sync/atomic"

"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
Expand All @@ -14,16 +15,15 @@ type ToProtocol struct {
size uint64
dev uint32
protocol Protocol
CompressedWrites bool
compressedWrites atomic.Bool
alternateSources []packets.AlternateSource
}

func NewToProtocol(size uint64, deviceID uint32, p Protocol) *ToProtocol {
return &ToProtocol{
size: size,
dev: deviceID,
protocol: p,
CompressedWrites: false,
size: size,
dev: deviceID,
protocol: p,
}
}

Expand All @@ -39,6 +39,10 @@ func (i *ToProtocol) SendSiloEvent(eventType storage.EventType, eventData storag
return nil
}

func (i *ToProtocol) SetCompression(compressed bool) {
i.compressedWrites.Store(compressed)
}

func (i *ToProtocol) SendEvent(e *packets.Event) error {
b := packets.EncodeEvent(e)
id, err := i.protocol.SendPacket(i.dev, IDPickAny, b, UrgencyUrgent)
Expand Down Expand Up @@ -150,7 +154,7 @@ func (i *ToProtocol) WriteAt(buffer []byte, offset int64) (int, error) {
}

if !dontSendData {
if i.CompressedWrites {
if i.compressedWrites.Load() {
data := packets.EncodeWriteAtComp(offset, buffer)
id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, UrgencyNormal)
} else {
Expand Down

0 comments on commit f84513d

Please sign in to comment.