Skip to content

Commit

Permalink
Jamesmoore/arch 219 silo move s3 grab (#62)
Browse files Browse the repository at this point in the history
* Added metrics for waitingCache

Signed-off-by: Jimmy Moore <[email protected]>

* Updated waitingCache

Signed-off-by: Jimmy Moore <[email protected]>

* Added prom exporter

Signed-off-by: Jimmy Moore <[email protected]>

* Added metrics option to cmd/serve and cmd/connect

Signed-off-by: Jimmy Moore <[email protected]>

* Add metrics to WriteCombinator and to test

Signed-off-by: Jimmy Moore <[email protected]>

* Prom now configurable

Signed-off-by: Jimmy Moore <[email protected]>

* Added metrics for waitingCache

Signed-off-by: Jimmy Moore <[email protected]>

* Simplified fromProtocol and adjusted to allow s3 grab/ sync on DirtyList or Complete

Signed-off-by: Jimmy Moore <[email protected]>

* New code working

Signed-off-by: Jimmy Moore <[email protected]>

* Added sync behaviour option to FromProtocol

Signed-off-by: Jimmy Moore <[email protected]>

* Start on heatmap fromProtocol

Signed-off-by: Jimmy Moore <[email protected]>

* fix broken merge

Signed-off-by: Jimmy Moore <[email protected]>

* s3.sync now uses vm

Signed-off-by: Jimmy Moore <[email protected]>

* Latest heatmap + dash

Signed-off-by: Jimmy Moore <[email protected]>

* Fixed s3 assist test

Signed-off-by: Jimmy Moore <[email protected]>

* lint fix

Signed-off-by: Jimmy Moore <[email protected]>

* race in vm fix

Signed-off-by: Jimmy Moore <[email protected]>

* S3Storage can now disable reads/writes

Signed-off-by: Jimmy Moore <[email protected]>

* Updated device to stop sync quicker, and added progress rate limit migrator

Signed-off-by: Jimmy Moore <[email protected]>

* Added metric for active writers

Signed-off-by: Jimmy Moore <[email protected]>

* Added metric for active writers

Signed-off-by: Jimmy Moore <[email protected]>

* Lint fix, and fixed flakey test

Signed-off-by: Jimmy Moore <[email protected]>

* Tidy up metrics

Signed-off-by: Jimmy Moore <[email protected]>

---------

Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod authored Dec 5, 2024
1 parent 925dcc4 commit 91f0c96
Show file tree
Hide file tree
Showing 15 changed files with 2,045 additions and 1,443 deletions.
39 changes: 25 additions & 14 deletions pkg/storage/device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/loopholelabs/logging/types"
"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/blocks"
"github.com/loopholelabs/silo/pkg/storage/config"
"github.com/loopholelabs/silo/pkg/storage/dirtytracker"
"github.com/loopholelabs/silo/pkg/storage/expose"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/loopholelabs/silo/pkg/storage/volatilitymonitor"
)

const (
Expand All @@ -35,6 +35,7 @@ const (

var syncConcurrency = map[int]int{storage.BlockTypeAny: 10}
var syncGrabConcurrency = 100
var syncVolatilityExpiry = 10 * time.Minute

type Device struct {
Provider storage.Provider
Expand Down Expand Up @@ -308,17 +309,20 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.Logger, met

dirtyBlockSize := bs >> ds.Sync.Config.BlockShift

numBlocks := (int(prov.Size()) + bs - 1) / bs
// numBlocks := (int(prov.Size()) + bs - 1) / bs

sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(prov, dirtyBlockSize)
vm := volatilitymonitor.NewVolatilityMonitor(prov, bs, syncVolatilityExpiry)

sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(vm, dirtyBlockSize)
sourceStorage := modules.NewLockable(sourceDirtyLocal)

if met != nil {
met.AddDirtyTracker(fmt.Sprintf("s3sync_%s", ds.Name), sourceDirtyRemote)
}

// Setup a block order
orderer := blocks.NewAnyBlockOrder(numBlocks, nil)
orderer := vm
// orderer := blocks.NewAnyBlockOrder(numBlocks, nil)
orderer.AddAll()

checkPeriod, err := time.ParseDuration(ds.Sync.Config.CheckPeriod)
Expand Down Expand Up @@ -372,6 +376,9 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.Logger, met
if log != nil {
log.Debug().Str("name", ds.Name).Msg("sync.start called")
}
// Make sure we can read/write to S3
s3dest.SetReadWriteEnabled(false, false)

if data != nil {
startConfig := data.(storage.SyncStartConfig)

Expand Down Expand Up @@ -425,25 +432,33 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.Logger, met
return true
}

stopSyncing := func(cancelWrites bool) storage.EventReturnData {
stopSyncing := func(cancelWrites bool, wait bool) {
if log != nil {
log.Debug().Str("name", ds.Name).Msg("sync.stop called")
}
syncLock.Lock()
if !syncRunning {
syncLock.Unlock()
return nil
return
}
cancelfn()

if cancelWrites {
// Stop any new writes coming in
s3dest.SetReadWriteEnabled(true, true)
// Cancel any pending writes
s3dest.CancelWrites(0, int64(s3dest.Size()))
}

// WAIT HERE for the sync to finish
wg.Wait()
// WAIT HERE for the sync to finish?
if wait {
wg.Wait()
}
syncRunning = false
syncLock.Unlock()
}

stopSync := func(_ storage.EventType, _ storage.EventData) storage.EventReturnData {
stopSyncing(true, true)

// Get the list of safe blocks we can use.
blocks := syncer.GetSafeBlockMap()
Expand All @@ -466,10 +481,6 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.Logger, met
return altSources
}

stopSync := func(_ storage.EventType, _ storage.EventData) storage.EventReturnData {
return stopSyncing(false)
}

// If the storage gets a "sync.stop", we should cancel the sync, and return the safe blocks
storage.AddSiloEventNotification(prov, "sync.stop", stopSync)

Expand All @@ -496,7 +507,7 @@ func NewDeviceWithLoggingMetrics(ds *config.DeviceSchema, log types.Logger, met
hooks := modules.NewHooks(prov)
hooks.PostClose = func(err error) error {
// We should stop any sync here, but ask it to cancel any existing writes if possible.
stopSyncing(true)
stopSyncing(true, true)
return err
}
prov = hooks
Expand Down
139 changes: 104 additions & 35 deletions pkg/storage/metrics/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

type MetricsConfig struct {
StorageHeatmapBuckets []float64
HeatmapResolution uint64
Namespace string
SubSyncer string
SubMigrator string
Expand Down Expand Up @@ -46,10 +46,7 @@ type MetricsConfig struct {

func DefaultConfig() *MetricsConfig {
return &MetricsConfig{
StorageHeatmapBuckets: []float64{
0, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45,
0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 1,
},
HeatmapResolution: 64,
Namespace: "silo",
SubSyncer: "syncer",
SubMigrator: "migrator",
Expand Down Expand Up @@ -98,13 +95,14 @@ type Metrics struct {
migratorTotalMigratedBlocks *prometheus.GaugeVec

// protocol
protocolPacketsSent *prometheus.GaugeVec
protocolDataSent *prometheus.GaugeVec
protocolPacketsRecv *prometheus.GaugeVec
protocolDataRecv *prometheus.GaugeVec
protocolWrites *prometheus.GaugeVec
protocolWriteErrors *prometheus.GaugeVec
protocolWaitingForID *prometheus.GaugeVec
protocolActivePacketsSending *prometheus.GaugeVec
protocolPacketsSent *prometheus.GaugeVec
protocolDataSent *prometheus.GaugeVec
protocolPacketsRecv *prometheus.GaugeVec
protocolDataRecv *prometheus.GaugeVec
protocolWrites *prometheus.GaugeVec
protocolWriteErrors *prometheus.GaugeVec
protocolWaitingForID *prometheus.GaugeVec

// s3
s3BlocksR *prometheus.GaugeVec
Expand Down Expand Up @@ -134,20 +132,25 @@ type Metrics struct {
toProtocolRecvDontNeedAt *prometheus.GaugeVec

// fromProtocol
fromProtocolRecvEvents *prometheus.GaugeVec
fromProtocolRecvHashes *prometheus.GaugeVec
fromProtocolRecvDevInfo *prometheus.GaugeVec
fromProtocolRecvAltSources *prometheus.GaugeVec
fromProtocolRecvReadAt *prometheus.GaugeVec
fromProtocolRecvWriteAtHash *prometheus.GaugeVec
fromProtocolRecvWriteAtComp *prometheus.GaugeVec
fromProtocolRecvWriteAt *prometheus.GaugeVec
fromProtocolRecvWriteAtWithMap *prometheus.GaugeVec
fromProtocolRecvRemoveFromMap *prometheus.GaugeVec
fromProtocolRecvRemoveDev *prometheus.GaugeVec
fromProtocolRecvDirtyList *prometheus.GaugeVec
fromProtocolSentNeedAt *prometheus.GaugeVec
fromProtocolSentDontNeedAt *prometheus.GaugeVec
fromProtocolRecvEvents *prometheus.GaugeVec
fromProtocolRecvHashes *prometheus.GaugeVec
fromProtocolRecvDevInfo *prometheus.GaugeVec
fromProtocolRecvAltSources *prometheus.GaugeVec
fromProtocolRecvReadAt *prometheus.GaugeVec
fromProtocolRecvWriteAtHash *prometheus.GaugeVec
fromProtocolRecvWriteAtComp *prometheus.GaugeVec
fromProtocolRecvWriteAt *prometheus.GaugeVec
fromProtocolRecvWriteAtWithMap *prometheus.GaugeVec
fromProtocolRecvRemoveFromMap *prometheus.GaugeVec
fromProtocolRecvRemoveDev *prometheus.GaugeVec
fromProtocolRecvDirtyList *prometheus.GaugeVec
fromProtocolSentNeedAt *prometheus.GaugeVec
fromProtocolSentDontNeedAt *prometheus.GaugeVec
fromProtocolWritesAllowedP2P *prometheus.GaugeVec
fromProtocolWritesBlockedP2P *prometheus.GaugeVec
fromProtocolWritesAllowedAltSources *prometheus.GaugeVec
fromProtocolWritesBlockedAltSources *prometheus.GaugeVec
fromProtocolHeatmap *prometheus.GaugeVec

// dirtyTracker
dirtyTrackerBlockSize *prometheus.GaugeVec
Expand All @@ -159,7 +162,7 @@ type Metrics struct {
volatilityMonitorBlockSize *prometheus.GaugeVec
volatilityMonitorAvailable *prometheus.GaugeVec
volatilityMonitorVolatility *prometheus.GaugeVec
volatilityMonitorHeatmap *prometheus.HistogramVec
volatilityMonitorHeatmap *prometheus.GaugeVec

// metrics
metricsReadOps *prometheus.GaugeVec
Expand Down Expand Up @@ -230,6 +233,8 @@ func New(reg prometheus.Registerer, config *MetricsConfig) *Metrics {
Namespace: config.Namespace, Subsystem: config.SubMigrator, Name: "total_migrated_blocks", Help: "Total migrated blocks"}, []string{"device"}),

// Protocol
protocolActivePacketsSending: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubProtocol, Name: "active_packets_sending", Help: "Packets sending"}, []string{"device"}),
protocolPacketsSent: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubProtocol, Name: "packets_sent", Help: "Packets sent"}, []string{"device"}),
protocolDataSent: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -310,6 +315,16 @@ func New(reg prometheus.Registerer, config *MetricsConfig) *Metrics {
Namespace: config.Namespace, Subsystem: config.SubFromProtocol, Name: "sent_need_at", Help: "sentNeedAt"}, []string{"device"}),
fromProtocolSentDontNeedAt: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubFromProtocol, Name: "sent_dont_need_at", Help: "sentDontNeedAt"}, []string{"device"}),
fromProtocolWritesAllowedP2P: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubFromProtocol, Name: "writes_allowed_p2p", Help: "writesAllowedP2P"}, []string{"device"}),
fromProtocolWritesBlockedP2P: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubFromProtocol, Name: "writes_blocked_p2p", Help: "writesBlockedP2P"}, []string{"device"}),
fromProtocolWritesAllowedAltSources: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubFromProtocol, Name: "writes_allowed_alt_sources", Help: "writesAllowedAltSources"}, []string{"device"}),
fromProtocolWritesBlockedAltSources: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubFromProtocol, Name: "writes_blocked_alt_sources", Help: "writesBlockedAltSources"}, []string{"device"}),
fromProtocolHeatmap: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubFromProtocol, Name: "heatmap", Help: "Heatmap"}, []string{"device", "le"}),

// S3Storage
s3BlocksW: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -342,8 +357,8 @@ func New(reg prometheus.Registerer, config *MetricsConfig) *Metrics {
Namespace: config.Namespace, Subsystem: config.SubVolatilityMonitor, Name: "available", Help: "Blocks available"}, []string{"device"}),
volatilityMonitorVolatility: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubVolatilityMonitor, Name: "volatility", Help: "Volatility"}, []string{"device"}),
volatilityMonitorHeatmap: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: config.Namespace, Subsystem: config.SubVolatilityMonitor, Name: "heatmap", Help: "Heatmap", Buckets: config.StorageHeatmapBuckets}, []string{"device"}),
volatilityMonitorHeatmap: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Namespace, Subsystem: config.SubVolatilityMonitor, Name: "heatmap", Help: "Heatmap"}, []string{"device", "le"}),

// Metrics
metricsReadOps: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -411,7 +426,7 @@ func New(reg prometheus.Registerer, config *MetricsConfig) *Metrics {

reg.MustRegister(met.migratorBlockSize, met.migratorActiveBlocks, met.migratorTotalBlocks, met.migratorMigratedBlocks, met.migratorTotalMigratedBlocks, met.migratorReadyBlocks)

reg.MustRegister(met.protocolPacketsSent, met.protocolDataSent, met.protocolPacketsRecv, met.protocolDataRecv, met.protocolWrites, met.protocolWriteErrors, met.protocolWaitingForID)
reg.MustRegister(met.protocolActivePacketsSending, met.protocolPacketsSent, met.protocolDataSent, met.protocolPacketsRecv, met.protocolDataRecv, met.protocolWrites, met.protocolWriteErrors, met.protocolWaitingForID)

reg.MustRegister(met.s3BlocksR, met.s3BlocksRBytes, met.s3BlocksW, met.s3BlocksWBytes, met.s3ActiveReads, met.s3ActiveWrites)

Expand All @@ -426,7 +441,10 @@ func New(reg prometheus.Registerer, config *MetricsConfig) *Metrics {
met.fromProtocolRecvAltSources, met.fromProtocolRecvReadAt, met.fromProtocolRecvWriteAtHash,
met.fromProtocolRecvWriteAtComp, met.fromProtocolRecvWriteAt, met.fromProtocolRecvWriteAtWithMap,
met.fromProtocolRecvRemoveFromMap, met.fromProtocolRecvRemoveDev, met.fromProtocolRecvDirtyList,
met.fromProtocolSentNeedAt, met.fromProtocolSentDontNeedAt)
met.fromProtocolSentNeedAt, met.fromProtocolSentDontNeedAt,
met.fromProtocolWritesAllowedP2P, met.fromProtocolWritesBlockedP2P,
met.fromProtocolWritesAllowedAltSources, met.fromProtocolWritesBlockedAltSources,
met.fromProtocolHeatmap)

reg.MustRegister(met.dirtyTrackerBlockSize, met.dirtyTrackerDirtyBlocks, met.dirtyTrackerTrackingBlocks, met.dirtyTrackerMaxAgeDirtyMS)

Expand Down Expand Up @@ -539,6 +557,7 @@ func (m *Metrics) RemoveMigrator(name string) {
func (m *Metrics) AddProtocol(name string, proto *protocol.RW) {
m.add(m.config.SubProtocol, name, m.config.TickProtocol, func() {
met := proto.GetMetrics()
m.protocolActivePacketsSending.WithLabelValues(name).Set(float64(met.ActivePacketsSending))
m.protocolPacketsSent.WithLabelValues(name).Set(float64(met.PacketsSent))
m.protocolDataSent.WithLabelValues(name).Set(float64(met.DataSent))
m.protocolPacketsRecv.WithLabelValues(name).Set(float64(met.PacketsRecv))
Expand Down Expand Up @@ -585,6 +604,10 @@ func (m *Metrics) AddFromProtocol(name string, proto *protocol.FromProtocol) {
m.add(m.config.SubFromProtocol, name, m.config.TickFromProtocol, func() {
met := proto.GetMetrics()

if met.DeviceName != "" {
name = met.DeviceName
}

m.fromProtocolRecvEvents.WithLabelValues(name).Set(float64(met.RecvEvents))
m.fromProtocolRecvHashes.WithLabelValues(name).Set(float64(met.RecvHashes))
m.fromProtocolRecvDevInfo.WithLabelValues(name).Set(float64(met.RecvDevInfo))
Expand All @@ -599,6 +622,49 @@ func (m *Metrics) AddFromProtocol(name string, proto *protocol.FromProtocol) {
m.fromProtocolRecvDirtyList.WithLabelValues(name).Set(float64(met.RecvDirtyList))
m.fromProtocolSentNeedAt.WithLabelValues(name).Set(float64(met.SentNeedAt))
m.fromProtocolSentDontNeedAt.WithLabelValues(name).Set(float64(met.SentDontNeedAt))

m.fromProtocolWritesAllowedP2P.WithLabelValues(name).Set(float64(met.WritesAllowedP2P))
m.fromProtocolWritesBlockedP2P.WithLabelValues(name).Set(float64(met.WritesBlockedP2P))
m.fromProtocolWritesAllowedAltSources.WithLabelValues(name).Set(float64(met.WritesAllowedAltSources))
m.fromProtocolWritesBlockedAltSources.WithLabelValues(name).Set(float64(met.WritesBlockedAltSources))

totalHeatmapP2P := make([]uint64, m.config.HeatmapResolution)
for _, block := range met.AvailableP2P {
part := uint64(block) * m.config.HeatmapResolution / met.NumBlocks
totalHeatmapP2P[part]++
}

totalHeatmapAltSources := make([]uint64, m.config.HeatmapResolution)
for _, block := range met.AvailableAltSources {
part := uint64(block) * m.config.HeatmapResolution / met.NumBlocks
totalHeatmapAltSources[part]++
}

totalHeatmapP2PDupe := make([]uint64, m.config.HeatmapResolution)
for _, block := range met.DuplicateP2P {
part := uint64(block) * m.config.HeatmapResolution / met.NumBlocks
totalHeatmapP2PDupe[part]++
}

blocksPerPart := 2 * (met.NumBlocks / m.config.HeatmapResolution)

//
for part, blocks := range totalHeatmapP2P {
m.fromProtocolHeatmap.WithLabelValues(name, fmt.Sprintf("%d", part)).Set(float64(blocks))
}

for part, blocks := range totalHeatmapAltSources {
if blocks > 0 {
m.fromProtocolHeatmap.WithLabelValues(name, fmt.Sprintf("%d", part)).Set(float64(blocksPerPart*2 + blocks))
}
}

for part, blocks := range totalHeatmapP2PDupe {
if blocks > 0 {
m.fromProtocolHeatmap.WithLabelValues(name, fmt.Sprintf("%d", part)).Set(float64(blocksPerPart + blocks))
}
}

})
}

Expand Down Expand Up @@ -644,11 +710,14 @@ func (m *Metrics) AddVolatilityMonitor(name string, vm *volatilitymonitor.Volati
m.volatilityMonitorAvailable.WithLabelValues(name).Set(float64(met.Available))
m.volatilityMonitorVolatility.WithLabelValues(name).Set(float64(met.Volatility))

// TODO: Better way?
totalVolatility := make([]uint64, m.config.HeatmapResolution)
for block, volatility := range met.VolatilityMap {
for v := 0; v < int(volatility); v++ {
m.volatilityMonitorHeatmap.WithLabelValues(name).Observe(float64(block) / float64(met.NumBlocks))
}
part := uint64(block) * m.config.HeatmapResolution / met.NumBlocks
totalVolatility[part] += volatility
}

for part, volatility := range totalVolatility {
m.volatilityMonitorHeatmap.WithLabelValues(name, fmt.Sprintf("%d", part)).Set(float64(volatility))
}
})
}
Expand Down
Loading

0 comments on commit 91f0c96

Please sign in to comment.