Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

arch-212-silo-metrics #55

Merged
merged 29 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d1d7f1e
Added metrics for waitingCache
jimmyaxod Nov 19, 2024
89471eb
Updated waitingCache
jimmyaxod Nov 19, 2024
db26027
Updated volatilityMonitor
jimmyaxod Nov 19, 2024
f112140
Renamed Snapshot to GetMetrics in metrics
jimmyaxod Nov 19, 2024
efa5d45
Metrics in dirtyTracker
jimmyaxod Nov 19, 2024
b4ed65d
Added metrics to nbd expose
jimmyaxod Nov 19, 2024
1e88bbe
Metrics into protocol
jimmyaxod Nov 19, 2024
ad6ae9a
Device tweaks
jimmyaxod Nov 19, 2024
4fc5966
Migrator/sync metrics updates
jimmyaxod Nov 19, 2024
5c9a2ef
Metrics ifc and device updated
jimmyaxod Nov 19, 2024
cd2dd9c
Added prom exporter
jimmyaxod Nov 19, 2024
ab57ecb
go mod
jimmyaxod Nov 19, 2024
b1195fa
Added metrics option to cmd/serve and cmd/connect
jimmyaxod Nov 20, 2024
6252540
S3Sync now cancels pending writes. Added metrics to S3Storage for cur…
jimmyaxod Nov 20, 2024
f1f992f
Reduced s3 sync concurrency default
jimmyaxod Nov 20, 2024
6d1e9bd
lint fix
jimmyaxod Nov 20, 2024
706bc1d
removed vmRunning not needed here
jimmyaxod Nov 20, 2024
3f11ee0
Renamed couple of metrics
jimmyaxod Nov 20, 2024
0c1aa73
Stopgap fix for minio init tests
jimmyaxod Nov 20, 2024
c0c503e
comment minio
jimmyaxod Nov 21, 2024
c9afc56
Add metrics to WriteCombinator and to test
jimmyaxod Nov 21, 2024
aec981d
volatilityMonitor heatmaps
jimmyaxod Nov 22, 2024
f10b404
Added throttle on s3 grab concurrency
jimmyaxod Nov 26, 2024
da8e788
go.sum update
jimmyaxod Nov 26, 2024
00aead2
Fix compression flag
jimmyaxod Nov 26, 2024
1ab524f
Added Shutdown() to metrics for graceful shutdown
jimmyaxod Nov 26, 2024
2115699
Prom now configurable
jimmyaxod Nov 26, 2024
3bc5da7
WriteCombinator now has stats on blocks
jimmyaxod Nov 26, 2024
ceb6ed5
Rename lint
jimmyaxod Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,30 @@ import (
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
"time"

"github.com/loopholelabs/logging"
"github.com/loopholelabs/logging/types"
"github.com/loopholelabs/silo/pkg/storage"
"github.com/loopholelabs/silo/pkg/storage/config"
"github.com/loopholelabs/silo/pkg/storage/expose"
"github.com/loopholelabs/silo/pkg/storage/integrity"
"github.com/loopholelabs/silo/pkg/storage/metrics"
siloprom "github.com/loopholelabs/silo/pkg/storage/metrics/prometheus"
"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/protocol"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/loopholelabs/silo/pkg/storage/waitingcache"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"

"github.com/fatih/color"
Expand Down Expand Up @@ -49,6 +57,10 @@ var connectMountDev bool

var connectProgress bool

var connectDebug bool

var connectMetrics string

// List of ExposedStorage so they can be cleaned up on exit.
var dstExposed []storage.ExposedStorage

Expand All @@ -63,13 +75,47 @@ func init() {
cmdConnect.Flags().BoolVarP(&connectExposeDev, "expose", "e", false, "Expose as an nbd devices")
cmdConnect.Flags().BoolVarP(&connectMountDev, "mount", "m", false, "Mount the nbd devices")
cmdConnect.Flags().BoolVarP(&connectProgress, "progress", "p", false, "Show progress")
cmdConnect.Flags().BoolVarP(&connectDebug, "debug", "d", false, "Debug logging (trace)")
cmdConnect.Flags().StringVarP(&connectMetrics, "metrics", "M", "", "Prom metrics address")
}

/**
* Connect to a silo source and stream whatever devices are available.
*
*/
func runConnect(_ *cobra.Command, _ []string) {
var log types.RootLogger
var reg *prometheus.Registry
var siloMetrics metrics.SiloMetrics

if connectDebug {
log = logging.New(logging.Zerolog, "silo.connect", os.Stderr)
log.SetLevel(types.TraceLevel)
}

if connectMetrics != "" {
reg = prometheus.NewRegistry()
siloMetrics = siloprom.New(reg, siloprom.DefaultConfig())

// Add the default go metrics
reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)

http.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
// Opt into OpenMetrics to support exemplars.
EnableOpenMetrics: true,
// Pass custom registry
Registry: reg,
},
))

go http.ListenAndServe(connectMetrics, nil)
}

if connectProgress {
dstProgress = mpb.New(
mpb.WithOutput(color.Output),
Expand Down Expand Up @@ -107,6 +153,10 @@ func runConnect(_ *cobra.Command, _ []string) {

protoCtx, protoCancelfn := context.WithCancel(context.TODO())

handleIncomingDevice := func(ctx context.Context, pro protocol.Protocol, dev uint32) {
handleIncomingDeviceWithLogging(ctx, pro, dev, log, siloMetrics)
}

pro := protocol.NewRW(protoCtx, []io.Reader{con}, []io.Writer{con}, handleIncomingDevice)

// Let the protocol do its thing.
Expand All @@ -121,12 +171,26 @@ func runConnect(_ *cobra.Command, _ []string) {
protoCancelfn()
}()

if siloMetrics != nil {
siloMetrics.AddProtocol("protocol", pro)
}

dstWG.Wait() // Wait until the migrations have completed...

if connectProgress {
dstProgress.Wait()
}

if log != nil {
metrics := pro.GetMetrics()
log.Debug().
Uint64("PacketsSent", metrics.PacketsSent).
Uint64("DataSent", metrics.DataSent).
Uint64("PacketsRecv", metrics.PacketsRecv).
Uint64("DataRecv", metrics.DataRecv).
Msg("protocol metrics")
}

fmt.Printf("\nMigrations completed. Please ctrl-c if you want to shut down, or wait an hour :)\n")

// We should pause here, to allow the user to do things with the devices
Expand All @@ -139,7 +203,7 @@ func runConnect(_ *cobra.Command, _ []string) {
}

// Handle a new incoming device. This is called when a packet is received for a device we haven't heard about before.
func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32) {
func handleIncomingDeviceWithLogging(ctx context.Context, pro protocol.Protocol, dev uint32, log types.RootLogger, met metrics.SiloMetrics) {
var destStorage storage.Provider
var destWaitingLocal *waitingcache.Local
var destWaitingRemote *waitingcache.Remote
Expand All @@ -151,6 +215,7 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32
var bar *mpb.Bar

var blockSize uint
var deviceName string

var statusString = " "
var statusVerify = " "
Expand All @@ -174,6 +239,7 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32
}

blockSize = uint(di.BlockSize)
deviceName = di.Name

statusFn := func(_ decor.Statistics) string {
return statusString + statusVerify
Expand Down Expand Up @@ -267,6 +333,10 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32

dest = protocol.NewFromProtocol(ctx, dev, storageFactory, pro)

if met != nil {
met.AddFromProtocol(deviceName, dest)
}

var handlerWG sync.WaitGroup

handlerWG.Add(1)
Expand Down Expand Up @@ -304,6 +374,42 @@ func handleIncomingDevice(ctx context.Context, pro protocol.Protocol, dev uint32
// Check we have all data...
case packets.EventCompleted:

if log != nil {
m := destWaitingLocal.GetMetrics()
log.Debug().
Uint64("WaitForBlock", m.WaitForBlock).
Uint64("WaitForBlockHadRemote", m.WaitForBlockHadRemote).
Uint64("WaitForBlockHadLocal", m.WaitForBlockHadLocal).
Uint64("WaitForBlockTimeMS", uint64(m.WaitForBlockTime.Milliseconds())).
Uint64("WaitForBlockLock", m.WaitForBlockLock).
Uint64("WaitForBlockLockDone", m.WaitForBlockLockDone).
Uint64("MarkAvailableLocalBlock", m.MarkAvailableLocalBlock).
Uint64("MarkAvailableRemoteBlock", m.MarkAvailableRemoteBlock).
Uint64("AvailableLocal", m.AvailableLocal).
Uint64("AvailableRemote", m.AvailableRemote).
Str("name", deviceName).
Msg("waitingCacheMetrics")

fromMetrics := dest.GetMetrics()
log.Debug().
Uint64("RecvEvents", fromMetrics.RecvEvents).
Uint64("RecvHashes", fromMetrics.RecvHashes).
Uint64("RecvDevInfo", fromMetrics.RecvDevInfo).
Uint64("RecvAltSources", fromMetrics.RecvAltSources).
Uint64("RecvReadAt", fromMetrics.RecvReadAt).
Uint64("RecvWriteAtHash", fromMetrics.RecvWriteAtHash).
Uint64("RecvWriteAtComp", fromMetrics.RecvWriteAtComp).
Uint64("RecvWriteAt", fromMetrics.RecvWriteAt).
Uint64("RecvWriteAtWithMap", fromMetrics.RecvWriteAtWithMap).
Uint64("RecvRemoveFromMap", fromMetrics.RecvRemoveFromMap).
Uint64("RecvRemoveDev", fromMetrics.RecvRemoveDev).
Uint64("RecvDirtyList", fromMetrics.RecvDirtyList).
Uint64("SentNeedAt", fromMetrics.SentNeedAt).
Uint64("SentDontNeedAt", fromMetrics.SentDontNeedAt).
Str("name", deviceName).
Msg("fromProtocolMetrics")
}

// We completed the migration, but we should wait for handlers to finish before we ok things...
// fmt.Printf("Completed, now wait for handlers...\n")
go func() {
Expand Down
Loading
Loading