Skip to content

Commit

Permalink
Metrics into protocol
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod committed Nov 19, 2024
1 parent 2c5e720 commit 0f521be
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 46 deletions.
84 changes: 84 additions & 0 deletions pkg/storage/protocol/from_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"sync"
"sync/atomic"

"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
Expand All @@ -24,6 +25,38 @@ type FromProtocol struct {

alternateSourcesLock sync.Mutex
alternateSources []packets.AlternateSource

metricRecvEvents uint64
metricRecvHashes uint64
metricRecvDevInfo uint64
metricRecvAltSources uint64
metricRecvReadAt uint64
metricRecvWriteAtHash uint64
metricRecvWriteAtComp uint64
metricRecvWriteAt uint64
metricRecvWriteAtWithMap uint64
metricRecvRemoveFromMap uint64
metricRecvRemoveDev uint64
metricRecvDirtyList uint64
metricSentNeedAt uint64
metricSentDontNeedAt uint64
}

type FromProtocolMetrics struct {
RecvEvents uint64
RecvHashes uint64
RecvDevInfo uint64
RecvAltSources uint64
RecvReadAt uint64
RecvWriteAtHash uint64
RecvWriteAtComp uint64
RecvWriteAt uint64
RecvWriteAtWithMap uint64
RecvRemoveFromMap uint64
RecvRemoveDev uint64
RecvDirtyList uint64
SentNeedAt uint64
SentDontNeedAt uint64
}

func NewFromProtocol(ctx context.Context, dev uint32, provFactory func(*packets.DevInfo) storage.Provider, protocol Protocol) *FromProtocol {
Expand All @@ -38,6 +71,25 @@ func NewFromProtocol(ctx context.Context, dev uint32, provFactory func(*packets.
return fp
}

func (fp *FromProtocol) GetMetrics() *FromProtocolMetrics {
return &FromProtocolMetrics{
RecvEvents: atomic.LoadUint64(&fp.metricRecvEvents),
RecvHashes: atomic.LoadUint64(&fp.metricRecvHashes),
RecvDevInfo: atomic.LoadUint64(&fp.metricRecvDevInfo),
RecvAltSources: atomic.LoadUint64(&fp.metricRecvAltSources),
RecvReadAt: atomic.LoadUint64(&fp.metricRecvReadAt),
RecvWriteAtHash: atomic.LoadUint64(&fp.metricRecvWriteAtHash),
RecvWriteAtComp: atomic.LoadUint64(&fp.metricRecvWriteAtComp),
RecvWriteAt: atomic.LoadUint64(&fp.metricRecvWriteAt),
RecvWriteAtWithMap: atomic.LoadUint64(&fp.metricRecvWriteAtWithMap),
RecvRemoveFromMap: atomic.LoadUint64(&fp.metricRecvRemoveFromMap),
RecvRemoveDev: atomic.LoadUint64(&fp.metricRecvRemoveDev),
RecvDirtyList: atomic.LoadUint64(&fp.metricRecvDirtyList),
SentNeedAt: atomic.LoadUint64(&fp.metricSentNeedAt),
SentDontNeedAt: atomic.LoadUint64(&fp.metricSentDontNeedAt),
}
}

func (fp *FromProtocol) GetAlternateSources() []packets.AlternateSource {
fp.alternateSourcesLock.Lock()
defer fp.alternateSourcesLock.Unlock()
Expand Down Expand Up @@ -161,6 +213,8 @@ func (fp *FromProtocol) HandleEvent(cb func(*packets.Event)) error {
return err
}

atomic.AddUint64(&fp.metricRecvEvents, 1)

if ev.Type == packets.EventCompleted {
// Deal with the sync here, and WAIT if needed.
storage.SendSiloEvent(fp.prov, "sync.start", storage.SyncStartConfig{
Expand Down Expand Up @@ -196,6 +250,8 @@ func (fp *FromProtocol) HandleHashes(cb func(map[uint][sha256.Size]byte)) error
return err
}

atomic.AddUint64(&fp.metricRecvHashes, 1)

// Relay the hashes, wait and then respond
cb(hashes)

Expand All @@ -217,6 +273,8 @@ func (fp *FromProtocol) HandleDevInfo() error {
return err
}

atomic.AddUint64(&fp.metricRecvDevInfo, 1)

// Create storage
fp.prov = fp.providerFactory(di)
numBlocks := (int(fp.prov.Size()) + fp.presentBlockSize - 1) / fp.presentBlockSize
Expand All @@ -236,6 +294,8 @@ func (fp *FromProtocol) HandleDevInfo() error {
return
}

atomic.AddUint64(&fp.metricRecvAltSources, 1)

// For now just set it. It only gets sent ONCE at the start of a migration at the moment.
fp.alternateSourcesLock.Lock()
fp.alternateSources = altSources
Expand Down Expand Up @@ -274,6 +334,8 @@ func (fp *FromProtocol) HandleReadAt() error {
return err
}

atomic.AddUint64(&fp.metricRecvReadAt, 1)

// Handle them in goroutines
go func(goffset int64, glength int32, gid uint32) {
buff := make([]byte, glength)
Expand Down Expand Up @@ -324,6 +386,8 @@ func (fp *FromProtocol) HandleWriteAt() error {
return err
}

atomic.AddUint64(&fp.metricRecvWriteAtHash, 1)

// For now, we will simply ack. We do NOT mark it as present. That part will be done when the alternateSources is retrieved.
// fp.mark_range_present(int(offset), int(length))

Expand All @@ -342,8 +406,14 @@ func (fp *FromProtocol) HandleWriteAt() error {

if len(data) > 1 && data[1] == packets.WriteAtCompRLE {
offset, writeData, err = packets.DecodeWriteAtComp(data)
if err == nil {
atomic.AddUint64(&fp.metricRecvWriteAtComp, 1)
}
} else {
offset, writeData, err = packets.DecodeWriteAt(data)
if err == nil {
atomic.AddUint64(&fp.metricRecvWriteAt, 1)
}
}
if err != nil {
return err
Expand Down Expand Up @@ -388,6 +458,8 @@ func (fp *FromProtocol) HandleWriteAtWithMap(cb func(offset int64, data []byte,
return err
}

atomic.AddUint64(&fp.metricRecvWriteAtWithMap, 1)

err = cb(offset, writeData, idMap)
if err == nil {
fp.markRangePresent(int(offset), len(writeData))
Expand Down Expand Up @@ -421,6 +493,8 @@ func (fp *FromProtocol) HandleRemoveFromMap(cb func(ids []uint64)) error {
return err
}

atomic.AddUint64(&fp.metricRecvRemoveFromMap, 1)

cb(ids)
/*
// TODO: Should probably do this
Expand Down Expand Up @@ -448,6 +522,8 @@ func (fp *FromProtocol) HandleRemoveDev(cb func()) error {
return err
}

atomic.AddUint64(&fp.metricRecvRemoveDev, 1)

cb()
return nil
}
Expand All @@ -469,6 +545,8 @@ func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint)) error {
return err
}

atomic.AddUint64(&fp.metricRecvDirtyList, 1)

// Mark these as non-present (useful for debugging issues)
for _, b := range blocks {
offset := int(b) * blockSize
Expand All @@ -488,11 +566,17 @@ func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint)) error {
func (fp *FromProtocol) NeedAt(offset int64, length int32) error {
b := packets.EncodeNeedAt(offset, length)
_, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b, UrgencyUrgent)
if err != nil {
atomic.AddUint64(&fp.metricSentNeedAt, 1)
}
return err
}

func (fp *FromProtocol) DontNeedAt(offset int64, length int32) error {
b := packets.EncodeDontNeedAt(offset, length)
_, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b, UrgencyUrgent)
if err != nil {
atomic.AddUint64(&fp.metricSentDontNeedAt, 1)
}
return err
}
Loading

0 comments on commit 0f521be

Please sign in to comment.