Skip to content

Commit

Permalink
Record Metrics for Reminder
Browse files Browse the repository at this point in the history
Signed-off-by: Vyom Yadav <[email protected]>
  • Loading branch information
Vyom-Yadav committed Jan 3, 2025
1 parent de9b376 commit b92b0de
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 6 deletions.
80 changes: 80 additions & 0 deletions internal/reminder/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package metrics

Check failure on line 1 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

package-comments: should have a package comment (revive)

import (
"context"

Check failure on line 4 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

File is not properly formatted (gci)
"go.opentelemetry.io/otel/metric"
)

// Default bucket boundaries in seconds for the send delay histogram
var sendDelayBuckets = []float64{
0, // immediate
10, // 10 seconds
20, // 20 seconds
40, // 40 seconds
80, // 1m 20s
160, // 2m 40s
320, // 5m 20s
640, // 10m 40s
1280, // 21m 20s
}

type Metrics struct {

Check failure on line 21 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported type Metrics should have comment or be unexported (revive)
// Time between when a reminder became eligible and when it was sent
SendDelay metric.Float64Histogram

// Current number of reminders in the batch
BatchSize metric.Int64Gauge

// Average batch size (updated on each batch)
AvgBatchSize metric.Float64Gauge

// For tracking average calculation
// TODO: consider persisting this to avoid reset on restart (maybe)
totalBatches int64
totalReminders int64
}

func NewMetrics(meter metric.Meter) (*Metrics, error) {

Check failure on line 37 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported function NewMetrics should have comment or be unexported (revive)
sendDelay, err := meter.Float64Histogram(
"reminder_send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(sendDelayBuckets...),
)
if err != nil {
return nil, err
}

batchSize, err := meter.Int64Gauge(
"reminder_batch_size",
metric.WithDescription("Current number of reminders in the batch"),
)
if err != nil {
return nil, err
}

avgBatchSize, err := meter.Float64Gauge(
"reminder_avg_batch_size",
metric.WithDescription("Average number of reminders per batch"),
)
if err != nil {
return nil, err
}

return &Metrics{
SendDelay: sendDelay,
BatchSize: batchSize,
AvgBatchSize: avgBatchSize,
}, nil
}

func (m *Metrics) RecordBatch(ctx context.Context, size int64) {

Check failure on line 71 in internal/reminder/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported method Metrics.RecordBatch should have comment or be unexported (revive)
// Update current batch size
m.BatchSize.Record(ctx, size)

// Update running average
m.totalBatches++
m.totalReminders += size
avgSize := float64(m.totalReminders) / float64(m.totalBatches)
m.AvgBatchSize.Record(ctx, avgSize)
}
80 changes: 80 additions & 0 deletions internal/reminder/metrics_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package reminder

import (
"context"
"fmt"

Check failure on line 5 in internal/reminder/metrics_server.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

File is not properly formatted (gci)
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"net/http"

Check failure on line 13 in internal/reminder/metrics_server.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

File is not properly formatted (gci)
"time"
)

const (
metricsPath = "/metrics"
readHeaderTimeout = 2 * time.Second
)

func (r *reminder) startMetricServer(ctx context.Context) error {
logger := zerolog.Ctx(ctx)

prometheusExporter, err := prometheus.New(
prometheus.WithNamespace("reminder"),
)
if err != nil {
return fmt.Errorf("failed to create Prometheus exporter: %w", err)
}

res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("reminder"),
// TODO: Make this auto-generated
semconv.ServiceVersion("v0.1.0"),
)

mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(prometheusExporter),
sdkmetric.WithResource(res),
)

otel.SetMeterProvider(mp)

mux := http.NewServeMux()
mux.Handle(metricsPath, promhttp.Handler())

server := &http.Server{
Addr: fmt.Sprintf("%s:%d", r.cfg.MetricsConfig.Host, r.cfg.MetricsConfig.Port),
Handler: mux,
ReadHeaderTimeout: readHeaderTimeout,
}

logger.Info().Msgf("starting metrics server on %s", server.Addr)

errCh := make(chan error)
go func() {
errCh <- server.ListenAndServe()
}()

select {
case err := <-errCh:
return err
case <-ctx.Done():
case <-r.stop:
}

// shutdown the metrics server when either the context is done or when reminder is stopped
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownRelease()

logger.Info().Msg("shutting down metrics server")

if err := mp.Shutdown(shutdownCtx); err != nil {
logger.Err(err).Msg("error shutting down metrics provider")
}

return server.Shutdown(shutdownCtx)
}
39 changes: 33 additions & 6 deletions internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"errors"
"fmt"
"github.com/mindersec/minder/internal/reminder/metrics"
"go.opentelemetry.io/otel"
"sync"
"time"

Expand Down Expand Up @@ -44,6 +46,8 @@ type reminder struct {

eventPublisher message.Publisher
eventDBCloser common.DriverCloser

metrics *metrics.Metrics
}

// NewReminder creates a new reminder instance
Expand Down Expand Up @@ -83,6 +87,20 @@ func (r *reminder) Start(ctx context.Context) error {
return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval)
}

if r.cfg.MetricsConfig.Enabled {
go func() {
if err := r.startMetricServer(ctx); err != nil {
logger.Error().Err(err).Msg("error starting metrics server")
}
}()

var err error
r.metrics, err = metrics.NewMetrics(otel.Meter("reminder"))
if err != nil {
return err
}
}

r.ticker = time.NewTicker(interval)
defer r.Stop()

Expand Down Expand Up @@ -143,6 +161,10 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return fmt.Errorf("error creating reminder messages: %w", err)
}

if r.metrics != nil {
r.metrics.RecordBatch(ctx, int64(len(repos)))
}

err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...)
if err != nil {
return fmt.Errorf("error publishing messages: %w", err)
Expand All @@ -151,14 +173,19 @@ func (r *reminder) sendReminders(ctx context.Context) error {
repoIds := make([]uuid.UUID, len(repos))
for _, repo := range repos {
repoIds = append(repoIds, repo.ID)
if r.metrics != nil {
// sendDelay = Now() - ReminderLastSent - MinElapsed
reminderLastSent := repo.ReminderLastSent
if reminderLastSent.Valid {
r.metrics.SendDelay.Record(ctx, (time.Now().Sub(reminderLastSent.Time) - r.cfg.RecurrenceConfig.MinElapsed).Seconds())

Check failure on line 180 in internal/reminder/reminder.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)
} else {
// TODO: Should the send delay be zero if the reminder has never been sent?
r.metrics.SendDelay.Record(ctx, 0)
//remMetrics.SendDelay.Record(ctx, r.cfg.RecurrenceConfig.MinElapsed.Seconds())
}
}
}

// TODO: Collect Metrics
// Potential metrics:
// - Gauge: Number of reminders in the current batch
// - UpDownCounter: Average reminders sent per batch
// - Histogram: reminder_last_sent time distribution

err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds)
if err != nil {
return fmt.Errorf("reminders published but error updating last sent time: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/reminder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
MetricsConfig MetricsConfig `mapstructure:"metrics"`
}

// Validate validates the configuration
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/reminder/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package reminder

type MetricsConfig struct {

Check failure on line 3 in pkg/config/reminder/metrics.go

View workflow job for this annotation

GitHub Actions / lint / Run golangci-lint

exported: exported type MetricsConfig should have comment or be unexported (revive)
Enabled bool `mapstructure:"enabled" default:"true"`
Host string `mapstructure:"host" default:"127.0.0.1"`
Port int `mapstructure:"port" default:"8080"`
}

0 comments on commit b92b0de

Please sign in to comment.