From 6933f03feb4a35ecd018e1a3cd70d7ae1644eb51 Mon Sep 17 00:00:00 2001 From: hrt/derrandz Date: Wed, 8 Feb 2023 17:30:05 +0100 Subject: [PATCH] feat(node | das | libs/header/sync): add total uptime node metrics + totalSampled das metrics + totalSynced sync metrics (#1638) ## Overview This PR introduces node uptime metrics + das total sampled headers metrics to support calculating the uptime index proposed by mustafa on the monitoring side. This PR introduces a new module named `Telemetry` to support node related telemetry. This module can also host all general telemetry and observability that does not interest specific modules. ## Changes - [x] Introduced uptime metrics for node under `nodebuilder/node/uptime.go` - [x] Introduced persistent uptime metrics using datastore to persist node start time - [x] Testing for uptime metrics persistence using the store - [x] Unit testing for uptime metrics - [x] Integration testing for uptime metrics - [ ] e2e testing for uptime metrics ## Checklist - [x] New and updated code has appropriate documentation - [x] New and updated code has new and/or updated testing - [x] Required CI checks are passing - [x] Visual proof for any user facing features like CLI or documentation updates - [ ] Linked issues closed with keywords ## Blocked By PR: #1537 --------- Co-authored-by: rene <41963722+renaynay@users.noreply.github.com> --- das/checkpoint.go | 9 +++ das/coordinator.go | 4 ++ das/metrics.go | 60 +++++++++++++--- das/options.go | 3 +- das/worker.go | 40 ++++++++--- go.mod | 4 +- header/metrics.go | 9 ++- libs/header/sync/metrics.go | 43 ++++++++++++ libs/header/sync/sync.go | 5 ++ nodebuilder/node/metrics.go | 53 ++++++++++++++ nodebuilder/node_test.go | 88 ++++++++++++++++++++++++ nodebuilder/settings.go | 10 +-- share/availability/mocks/availability.go | 3 +- share/mocks/getter.go | 3 +- 14 files changed, 306 insertions(+), 28 deletions(-) create mode 100644 libs/header/sync/metrics.go create mode 100644 nodebuilder/node/metrics.go diff --git a/das/checkpoint.go b/das/checkpoint.go index 860c72cb35..04e6cafaf5 100644 --- a/das/checkpoint.go +++ b/das/checkpoint.go @@ -48,3 +48,12 @@ func (c checkpoint) String() string { return str } + +// totalSampled returns the total amount of sampled headers +func (c checkpoint) totalSampled() uint64 { + var totalInProgress uint64 + for _, w := range c.Workers { + totalInProgress += (w.To - w.From) + 1 + } + return c.SampleFrom - totalInProgress - uint64(len(c.Failed)) +} diff --git a/das/coordinator.go b/das/coordinator.go index d8e149ebd3..cf356349d9 100644 --- a/das/coordinator.go +++ b/das/coordinator.go @@ -55,6 +55,10 @@ func newSamplingCoordinator( func (sc *samplingCoordinator) run(ctx context.Context, cp checkpoint) { sc.state.resumeFromCheckpoint(cp) + + // the amount of sampled headers from the last checkpoint + sc.metrics.recordTotalSampled(cp.totalSampled()) + // resume workers for _, wk := range cp.Workers { sc.runWorker(ctx, sc.state.newJob(wk.From, wk.To)) diff --git a/das/metrics.go b/das/metrics.go index b417cb05c1..f498e02ae7 100644 --- a/das/metrics.go +++ b/das/metrics.go @@ -24,7 +24,9 @@ type metrics struct { sampleTime syncfloat64.Histogram getHeaderTime syncfloat64.Histogram newHead syncint64.Counter - lastSampledTS int64 + + lastSampledTS uint64 + totalSampledInt uint64 } func (d *DASer) InitMetrics() error { @@ -76,6 +78,16 @@ func (d *DASer) InitMetrics() error { return err } + totalSampled, err := meter. + AsyncInt64(). + Gauge( + "das_total_sampled_headers", + instrument.WithDescription("total sampled headers gauge"), + ) + if err != nil { + return err + } + d.sampler.metrics = &metrics{ sampled: sampled, sampleTime: sampleTime, @@ -85,7 +97,11 @@ func (d *DASer) InitMetrics() error { err = meter.RegisterCallback( []instrument.Asynchronous{ - lastSampledTS, busyWorkers, networkHead, sampledChainHead, + lastSampledTS, + busyWorkers, + networkHead, + sampledChainHead, + totalSampled, }, func(ctx context.Context) { stats, err := d.sampler.stats(ctx) @@ -97,9 +113,12 @@ func (d *DASer) InitMetrics() error { networkHead.Observe(ctx, int64(stats.NetworkHead)) sampledChainHead.Observe(ctx, int64(stats.SampledChainHead)) - if ts := atomic.LoadInt64(&d.sampler.metrics.lastSampledTS); ts != 0 { - lastSampledTS.Observe(ctx, ts) + if ts := atomic.LoadUint64(&d.sampler.metrics.lastSampledTS); ts != 0 { + lastSampledTS.Observe(ctx, int64(ts)) } + + totalSampledInt := atomic.LoadUint64(&d.sampler.metrics.totalSampledInt) + totalSampled.Observe(ctx, int64(totalSampledInt)) }, ) @@ -110,19 +129,35 @@ func (d *DASer) InitMetrics() error { return nil } -func (m *metrics) observeSample(ctx context.Context, h *header.ExtendedHeader, sampleTime time.Duration, err error) { +// observeSample records the time it took to sample a header + +// the amount of sampled contiguous headers +func (m *metrics) observeSample( + ctx context.Context, + h *header.ExtendedHeader, + sampleTime time.Duration, + err error, +) { if m == nil { return } m.sampleTime.Record(ctx, sampleTime.Seconds(), attribute.Bool("failed", err != nil), - attribute.Int("header_width", len(h.DAH.RowsRoots))) + attribute.Int("header_width", len(h.DAH.RowsRoots)), + ) + m.sampled.Add(ctx, 1, attribute.Bool("failed", err != nil), - attribute.Int("header_width", len(h.DAH.RowsRoots))) - atomic.StoreInt64(&m.lastSampledTS, time.Now().UTC().Unix()) + attribute.Int("header_width", len(h.DAH.RowsRoots)), + ) + + atomic.StoreUint64(&m.lastSampledTS, uint64(time.Now().UTC().Unix())) + + if err == nil { + atomic.AddUint64(&m.totalSampledInt, 1) + } } +// observeGetHeader records the time it took to get a header from the header store. func (m *metrics) observeGetHeader(ctx context.Context, d time.Duration) { if m == nil { return @@ -130,9 +165,18 @@ func (m *metrics) observeGetHeader(ctx context.Context, d time.Duration) { m.getHeaderTime.Record(ctx, d.Seconds()) } +// observeNewHead records the network head. func (m *metrics) observeNewHead(ctx context.Context) { if m == nil { return } m.newHead.Add(ctx, 1) } + +// recordTotalSampled records the total sampled headers. +func (m *metrics) recordTotalSampled(totalSampled uint64) { + if m == nil { + return + } + atomic.StoreUint64(&m.totalSampledInt, totalSampled) +} diff --git a/das/options.go b/das/options.go index 55be4c9e1f..eca8596de4 100644 --- a/das/options.go +++ b/das/options.go @@ -38,7 +38,8 @@ type Parameters struct { // SampleFrom is the height sampling will start from if no previous checkpoint was saved SampleFrom uint64 - // SampleTimeout is a maximum amount time sampling of single block may take until it will be canceled + // SampleTimeout is a maximum amount time sampling of single block may take until it will be + // canceled SampleTimeout time.Duration } diff --git a/das/worker.go b/das/worker.go index 7eb73ac703..13ab67a05e 100644 --- a/das/worker.go +++ b/das/worker.go @@ -60,8 +60,15 @@ func (w *worker) run( } metrics.observeGetHeader(ctx, time.Since(startGet)) - log.Debugw("got header from header store", "height", h.Height(), "hash", h.Hash(), - "square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "finished (s)", time.Since(startGet)) + + log.Debugw( + "got header from header store", + "height", h.Height(), + "hash", h.Hash(), + "square width", len(h.DAH.RowsRoots), + "data root", h.DAH.Hash(), + "finished (s)", time.Since(startGet), + ) startSample := time.Now() err = sample(ctx, h) @@ -72,18 +79,35 @@ func (w *worker) run( w.setResult(curr, err) metrics.observeSample(ctx, h, time.Since(startSample), err) if err != nil { - log.Debugw("failed to sampled header", "height", h.Height(), "hash", h.Hash(), - "square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "err", err) + log.Debugw( + "failed to sampled header", + "height", h.Height(), + "hash", h.Hash(), + "square width", len(h.DAH.RowsRoots), + "data root", h.DAH.Hash(), + "err", err, + ) } else { - log.Debugw("sampled header", "height", h.Height(), "hash", h.Hash(), - "square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "finished (s)", time.Since(startSample)) + log.Debugw( + "sampled header", + "height", h.Height(), + "hash", h.Hash(), + "square width", len(h.DAH.RowsRoots), + "data root", h.DAH.Hash(), + "finished (s)", time.Since(startSample), + ) } } if w.state.Curr > w.state.From { jobTime := time.Since(jobStart) - log.Infow("sampled headers", "from", w.state.From, "to", w.state.Curr, - "finished (s)", jobTime.Seconds()) + log.Infow( + "sampled headers", + "from", w.state.From, + "to", w.state.Curr, + "finished (s)", + jobTime.Seconds(), + ) } select { diff --git a/go.mod b/go.mod index e76a9626c3..8e03eeadb8 100644 --- a/go.mod +++ b/go.mod @@ -62,12 +62,14 @@ require ( go.opentelemetry.io/otel/sdk v1.11.2 go.opentelemetry.io/otel/sdk/metric v0.34.0 go.opentelemetry.io/otel/trace v1.11.2 + go.opentelemetry.io/proto/otlp v0.19.0 go.uber.org/fx v1.18.2 go.uber.org/multierr v1.9.0 golang.org/x/crypto v0.5.0 golang.org/x/sync v0.1.0 golang.org/x/text v0.6.0 google.golang.org/grpc v1.52.0 + google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8 ) require ( @@ -296,7 +298,6 @@ require ( go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect - go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/dig v1.15.0 // indirect go.uber.org/zap v1.24.0 // indirect @@ -311,7 +312,6 @@ require ( google.golang.org/api v0.102.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect - google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/header/metrics.go b/header/metrics.go index 64ff448c7f..dbf78d1e57 100644 --- a/header/metrics.go +++ b/header/metrics.go @@ -9,12 +9,13 @@ import ( "go.opentelemetry.io/otel/metric/unit" libhead "github.com/celestiaorg/celestia-node/libs/header" + "github.com/celestiaorg/celestia-node/libs/header/sync" ) var meter = global.MeterProvider().Meter("header") -// WithMetrics enables Otel metrics to monitor head. -func WithMetrics(store libhead.Store[*ExtendedHeader]) { +// WithMetrics enables Otel metrics to monitor head and total amount of synced headers. +func WithMetrics(store libhead.Store[*ExtendedHeader], syncer *sync.Syncer[*ExtendedHeader]) error { headC, _ := meter.AsyncInt64().Counter( "head", instrument.WithUnit(unit.Dimensionless), @@ -40,6 +41,8 @@ func WithMetrics(store libhead.Store[*ExtendedHeader]) { }, ) if err != nil { - panic(err) + return err } + + return syncer.InitMetrics() } diff --git a/libs/header/sync/metrics.go b/libs/header/sync/metrics.go new file mode 100644 index 0000000000..99851e9d18 --- /dev/null +++ b/libs/header/sync/metrics.go @@ -0,0 +1,43 @@ +package sync + +import ( + "context" + "sync/atomic" + + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" +) + +var meter = global.MeterProvider().Meter("header/sync") + +type metrics struct { + totalSynced int64 +} + +func (s *Syncer[H]) InitMetrics() error { + s.metrics = &metrics{} + + totalSynced, err := meter. + AsyncFloat64(). + Gauge( + "total_synced_headers", + instrument.WithDescription("total synced headers"), + ) + if err != nil { + return err + } + + return meter.RegisterCallback( + []instrument.Asynchronous{ + totalSynced, + }, + func(ctx context.Context) { + totalSynced.Observe(ctx, float64(atomic.LoadInt64(&s.metrics.totalSynced))) + }, + ) +} + +// recordTotalSynced records the total amount of synced headers. +func (m *metrics) recordTotalSynced(totalSynced int) { + atomic.AddInt64(&m.totalSynced, int64(totalSynced)) +} diff --git a/libs/header/sync/sync.go b/libs/header/sync/sync.go index 63be056591..983a841742 100644 --- a/libs/header/sync/sync.go +++ b/libs/header/sync/sync.go @@ -53,6 +53,8 @@ type Syncer[H header.Header] struct { cancel context.CancelFunc Params *Parameters + + metrics *metrics } // NewSyncer creates a new instance of Syncer. @@ -233,6 +235,9 @@ func (s *Syncer[H]) doSync(ctx context.Context, fromHead, toHead H) (err error) if err != nil && processed == 0 { break } + if s.metrics != nil { + s.metrics.recordTotalSynced(processed) + } } s.stateLk.Lock() diff --git a/nodebuilder/node/metrics.go b/nodebuilder/node/metrics.go new file mode 100644 index 0000000000..625e8425e8 --- /dev/null +++ b/nodebuilder/node/metrics.go @@ -0,0 +1,53 @@ +package node + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" +) + +var meter = global.MeterProvider().Meter("node") + +var ( + timeStarted time.Time + nodeStarted bool +) + +// WithMetrics registers node metrics. +func WithMetrics() error { + nodeStartTS, err := meter. + AsyncFloat64(). + Gauge( + "node_start_ts", + instrument.WithDescription("timestamp when the node was started"), + ) + if err != nil { + return err + } + + totalNodeRunTime, err := meter. + AsyncFloat64(). + Counter( + "node_runtime_counter_in_seconds", + instrument.WithDescription("total time the node has been running"), + ) + if err != nil { + return err + } + + return meter.RegisterCallback( + []instrument.Asynchronous{nodeStartTS, totalNodeRunTime}, + func(ctx context.Context) { + if !nodeStarted { + // Observe node start timestamp + timeStarted = time.Now() + nodeStartTS.Observe(ctx, float64(timeStarted.Unix())) + nodeStarted = true + } + + totalNodeRunTime.Observe(ctx, time.Since(timeStarted).Seconds()) + }, + ) +} diff --git a/nodebuilder/node_test.go b/nodebuilder/node_test.go index 51e0becc4d..3ed16d51ee 100644 --- a/nodebuilder/node_test.go +++ b/nodebuilder/node_test.go @@ -2,10 +2,18 @@ package nodebuilder import ( "context" + "net/http" + "net/http/httptest" "strconv" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + + collectormetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + "google.golang.org/protobuf/proto" + + "strings" "github.com/celestiaorg/celestia-node/nodebuilder/node" ) @@ -47,3 +55,83 @@ func TestLifecycle(t *testing.T) { }) } } + +func TestLifecycle_WithMetrics(t *testing.T) { + url, close := StartMockOtelCollectorHTTPServer(t) + defer close() + + otelCollectorURL := strings.ReplaceAll(url, "http://", "") + + var test = []struct { + tp node.Type + coreExpected bool + }{ + {tp: node.Bridge}, + {tp: node.Full}, + {tp: node.Light}, + } + + for i, tt := range test { + t.Run(strconv.Itoa(i), func(t *testing.T) { + node := TestNode( + t, + tt.tp, + WithMetrics( + []otlpmetrichttp.Option{ + otlpmetrichttp.WithEndpoint(otelCollectorURL), + otlpmetrichttp.WithInsecure(), + }, + tt.tp, + ), + ) + require.NotNil(t, node) + require.NotNil(t, node.Config) + require.NotNil(t, node.Host) + require.NotNil(t, node.HeaderServ) + require.NotNil(t, node.StateServ) + require.Equal(t, tt.tp, node.Type) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := node.Start(ctx) + require.NoError(t, err) + + // ensure the state service is running + require.False(t, node.StateServ.IsStopped(ctx)) + + err = node.Stop(ctx) + require.NoError(t, err) + + // ensure the state service is stopped + require.True(t, node.StateServ.IsStopped(ctx)) + }) + } +} + +func StartMockOtelCollectorHTTPServer(t *testing.T) (string, func()) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/metrics" && r.Method != http.MethodPost { + t.Errorf("Expected to request [POST] '/fixedvalue', got: [%s] %s", r.Method, r.URL.Path) + } + + if r.Header.Get("Content-Type") != "application/x-protobuf" { + t.Errorf("Expected Content-Type: application/x-protobuf header, got: %s", r.Header.Get("Content-Type")) + } + + response := collectormetricpb.ExportMetricsServiceResponse{} + rawResponse, _ := proto.Marshal(&response) + contentType := "application/x-protobuf" + status := http.StatusOK + + log.Debug("Responding to otlp POST request") + w.Header().Set("Content-Type", contentType) + w.WriteHeader(status) + _, _ = w.Write(rawResponse) + + log.Debug("Responded to otlp POST request") + })) + + server.EnableHTTP2 = true + return server.URL, server.Close +} diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index 18001c4bb1..bfa9f99667 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -13,8 +13,9 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.11.0" "go.uber.org/fx" - "github.com/celestiaorg/celestia-node/fraud" - "github.com/celestiaorg/celestia-node/header" + fraudPkg "github.com/celestiaorg/celestia-node/fraud" + headerPkg "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/nodebuilder/das" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" @@ -38,9 +39,10 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti baseComponents := fx.Options( fx.Supply(metricOpts), fx.Invoke(initializeMetrics), - fx.Invoke(header.WithMetrics), + fx.Invoke(headerPkg.WithMetrics), fx.Invoke(state.WithMetrics), - fx.Invoke(fraud.WithMetrics), + fx.Invoke(fraudPkg.WithMetrics), + fx.Invoke(node.WithMetrics), ) var opts fx.Option diff --git a/share/availability/mocks/availability.go b/share/availability/mocks/availability.go index 51c2acc391..030348f4e4 100644 --- a/share/availability/mocks/availability.go +++ b/share/availability/mocks/availability.go @@ -8,9 +8,10 @@ import ( context "context" reflect "reflect" - da "github.com/celestiaorg/celestia-app/pkg/da" gomock "github.com/golang/mock/gomock" peer "github.com/libp2p/go-libp2p/core/peer" + + da "github.com/celestiaorg/celestia-app/pkg/da" ) // MockAvailability is a mock of Availability interface. diff --git a/share/mocks/getter.go b/share/mocks/getter.go index 1c73c9170d..12c36cb015 100644 --- a/share/mocks/getter.go +++ b/share/mocks/getter.go @@ -8,11 +8,12 @@ import ( context "context" reflect "reflect" + gomock "github.com/golang/mock/gomock" + da "github.com/celestiaorg/celestia-app/pkg/da" share "github.com/celestiaorg/celestia-node/share" namespace "github.com/celestiaorg/nmt/namespace" rsmt2d "github.com/celestiaorg/rsmt2d" - gomock "github.com/golang/mock/gomock" ) // MockGetter is a mock of Getter interface.