diff --git a/internal/reminder/metrics/metrics.go b/internal/reminder/metrics/metrics.go new file mode 100644 index 0000000000..7f8d8130ea --- /dev/null +++ b/internal/reminder/metrics/metrics.go @@ -0,0 +1,80 @@ +package metrics + +import ( + "context" + "go.opentelemetry.io/otel/metric" +) + +// Default bucket boundaries in seconds for the send delay histogram +var sendDelayBuckets = []float64{ + 0, // immediate + 10, // 10 seconds + 20, // 20 seconds + 40, // 40 seconds + 80, // 1m 20s + 160, // 2m 40s + 320, // 5m 20s + 640, // 10m 40s + 1280, // 21m 20s +} + +type Metrics struct { + // Time between when a reminder became eligible and when it was sent + SendDelay metric.Float64Histogram + + // Current number of reminders in the batch + BatchSize metric.Int64Gauge + + // Average batch size (updated on each batch) + AvgBatchSize metric.Float64Gauge + + // For tracking average calculation + // TODO: consider persisting this to avoid reset on restart (maybe) + totalBatches int64 + totalReminders int64 +} + +func NewMetrics(meter metric.Meter) (*Metrics, error) { + sendDelay, err := meter.Float64Histogram( + "reminder_send_delay", + metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(sendDelayBuckets...), + ) + if err != nil { + return nil, err + } + + batchSize, err := meter.Int64Gauge( + "reminder_batch_size", + metric.WithDescription("Current number of reminders in the batch"), + ) + if err != nil { + return nil, err + } + + avgBatchSize, err := meter.Float64Gauge( + "reminder_avg_batch_size", + metric.WithDescription("Average number of reminders per batch"), + ) + if err != nil { + return nil, err + } + + return &Metrics{ + SendDelay: sendDelay, + BatchSize: batchSize, + AvgBatchSize: avgBatchSize, + }, nil +} + +func (m *Metrics) RecordBatch(ctx context.Context, size int64) { + // Update current batch size + m.BatchSize.Record(ctx, size) + + // Update running average + m.totalBatches++ + m.totalReminders += size + avgSize := float64(m.totalReminders) / float64(m.totalBatches) + m.AvgBatchSize.Record(ctx, avgSize) +} diff --git a/internal/reminder/metrics_server.go b/internal/reminder/metrics_server.go new file mode 100644 index 0000000000..2fbe122b7f --- /dev/null +++ b/internal/reminder/metrics_server.go @@ -0,0 +1,80 @@ +package reminder + +import ( + "context" + "fmt" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "net/http" + "time" +) + +const ( + metricsPath = "/metrics" + readHeaderTimeout = 2 * time.Second +) + +func (r *reminder) startMetricServer(ctx context.Context) error { + logger := zerolog.Ctx(ctx) + + prometheusExporter, err := prometheus.New( + prometheus.WithNamespace("reminder"), + ) + if err != nil { + return fmt.Errorf("failed to create Prometheus exporter: %w", err) + } + + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("reminder"), + // TODO: Make this auto-generated + semconv.ServiceVersion("v0.1.0"), + ) + + mp := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(prometheusExporter), + sdkmetric.WithResource(res), + ) + + otel.SetMeterProvider(mp) + + mux := http.NewServeMux() + mux.Handle(metricsPath, promhttp.Handler()) + + server := &http.Server{ + Addr: fmt.Sprintf("%s:%d", r.cfg.MetricsConfig.Host, r.cfg.MetricsConfig.Port), + Handler: mux, + ReadHeaderTimeout: readHeaderTimeout, + } + + logger.Info().Msgf("starting metrics server on %s", server.Addr) + + errCh := make(chan error) + go func() { + errCh <- server.ListenAndServe() + }() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + case <-r.stop: + } + + // shutdown the metrics server when either the context is done or when reminder is stopped + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownRelease() + + logger.Info().Msg("shutting down metrics server") + + if err := mp.Shutdown(shutdownCtx); err != nil { + logger.Err(err).Msg("error shutting down metrics provider") + } + + return server.Shutdown(shutdownCtx) +} diff --git a/internal/reminder/reminder.go b/internal/reminder/reminder.go index 7ec9a61974..ab5d05599f 100644 --- a/internal/reminder/reminder.go +++ b/internal/reminder/reminder.go @@ -8,6 +8,8 @@ import ( "context" "errors" "fmt" + "github.com/mindersec/minder/internal/reminder/metrics" + "go.opentelemetry.io/otel" "sync" "time" @@ -44,6 +46,8 @@ type reminder struct { eventPublisher message.Publisher eventDBCloser common.DriverCloser + + metrics *metrics.Metrics } // NewReminder creates a new reminder instance @@ -83,6 +87,20 @@ func (r *reminder) Start(ctx context.Context) error { return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval) } + if r.cfg.MetricsConfig.Enabled { + go func() { + if err := r.startMetricServer(ctx); err != nil { + logger.Error().Err(err).Msg("error starting metrics server") + } + }() + + var err error + r.metrics, err = metrics.NewMetrics(otel.Meter("reminder")) + if err != nil { + return err + } + } + r.ticker = time.NewTicker(interval) defer r.Stop() @@ -143,6 +161,10 @@ func (r *reminder) sendReminders(ctx context.Context) error { return fmt.Errorf("error creating reminder messages: %w", err) } + if r.metrics != nil { + r.metrics.RecordBatch(ctx, int64(len(repos))) + } + err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...) if err != nil { return fmt.Errorf("error publishing messages: %w", err) @@ -151,14 +173,19 @@ func (r *reminder) sendReminders(ctx context.Context) error { repoIds := make([]uuid.UUID, len(repos)) for _, repo := range repos { repoIds = append(repoIds, repo.ID) + if r.metrics != nil { + // sendDelay = Now() - ReminderLastSent - MinElapsed + reminderLastSent := repo.ReminderLastSent + if reminderLastSent.Valid { + r.metrics.SendDelay.Record(ctx, (time.Now().Sub(reminderLastSent.Time) - r.cfg.RecurrenceConfig.MinElapsed).Seconds()) + } else { + // TODO: Should the send delay be zero if the reminder has never been sent? + r.metrics.SendDelay.Record(ctx, 0) + //remMetrics.SendDelay.Record(ctx, r.cfg.RecurrenceConfig.MinElapsed.Seconds()) + } + } } - // TODO: Collect Metrics - // Potential metrics: - // - Gauge: Number of reminders in the current batch - // - UpDownCounter: Average reminders sent per batch - // - Histogram: reminder_last_sent time distribution - err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds) if err != nil { return fmt.Errorf("reminders published but error updating last sent time: %w", err) diff --git a/pkg/config/reminder/config.go b/pkg/config/reminder/config.go index c53b5c1129..2e4f398545 100644 --- a/pkg/config/reminder/config.go +++ b/pkg/config/reminder/config.go @@ -19,6 +19,7 @@ type Config struct { RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"` EventConfig EventConfig `mapstructure:"events"` LoggingConfig LoggingConfig `mapstructure:"logging"` + MetricsConfig MetricsConfig `mapstructure:"metrics"` } // Validate validates the configuration diff --git a/pkg/config/reminder/metrics.go b/pkg/config/reminder/metrics.go new file mode 100644 index 0000000000..1596f5b3cd --- /dev/null +++ b/pkg/config/reminder/metrics.go @@ -0,0 +1,7 @@ +package reminder + +type MetricsConfig struct { + Enabled bool `mapstructure:"enabled" default:"true"` + Host string `mapstructure:"host" default:"127.0.0.1"` + Port int `mapstructure:"port" default:"8080"` +}