Skip to content

Commit

Permalink
Add influx push endpoint to mimir (#10153)
Browse files Browse the repository at this point in the history
* olegs base commits from #1971

Signed-off-by: alexgreenbank <[email protected]>

* move top level influx files

Signed-off-by: alexgreenbank <[email protected]>

* latest wip

Signed-off-by: alexgreenbank <[email protected]>

* still WIP but better, still need to move to parserFunc() style

Signed-off-by: alexgreenbank <[email protected]>

* it builds!

Signed-off-by: alexgreenbank <[email protected]>

* tweaks and add span logging

Signed-off-by: alexgreenbank <[email protected]>

* more todo

Signed-off-by: alexgreenbank <[email protected]>

* further tweaks

Signed-off-by: alexgreenbank <[email protected]>

* some fixes to tests

Signed-off-by: alexgreenbank <[email protected]>

* rejigged error handling, tests passing

Signed-off-by: alexgreenbank <[email protected]>

* add vendored influxdb code

Signed-off-by: alexgreenbank <[email protected]>

* lint

Signed-off-by: alexgreenbank <[email protected]>

* go mod sum vendor/modules.txt

Signed-off-by: alexgreenbank <[email protected]>

* add a metric, add tenant info, other tweaks

Signed-off-by: alexgreenbank <[email protected]>

* various rework, still WIP

Signed-off-by: alexgreenbank <[email protected]>

* propagate bytesRead down to caller and log and histogram

Signed-off-by: alexgreenbank <[email protected]>

* remove comment now dealt with

Signed-off-by: alexgreenbank <[email protected]>

* add defaults in error handling

Signed-off-by: alexgreenbank <[email protected]>

* Add note to docs about experimental Influx flag

Signed-off-by: alexgreenbank <[email protected]>

* Note influx endpoint as experimental too

Signed-off-by: alexgreenbank <[email protected]>

* test for specific errors received

Signed-off-by: alexgreenbank <[email protected]>

* bolster parser tests

Signed-off-by: alexgreenbank <[email protected]>

* Use literal chars rather than ascii codes

Signed-off-by: alexgreenbank <[email protected]>

* remove unnecessary cast to int()

Signed-off-by: alexgreenbank <[email protected]>

* use mimirpb.PreallocTimeseries in influx parser

Signed-off-by: alexgreenbank <[email protected]>

* remove unnecessary tryUnwrap()

Signed-off-by: alexgreenbank <[email protected]>

* Work on byteslice rather than chars

Signed-off-by: alexgreenbank <[email protected]>

* yoloString for label value as push code does not keep references to strings from LabelAdapter

Signed-off-by: alexgreenbank <[email protected]>

* update go.sum

Signed-off-by: alexgreenbank <[email protected]>

* gah go.sum

Signed-off-by: alexgreenbank <[email protected]>

* oops, missed removal of paramter to InfluxHandler()

Signed-off-by: alexgreenbank <[email protected]>

* wrong metrics incremented

Signed-off-by: alexgreenbank <[email protected]>

* lint

Signed-off-by: alexgreenbank <[email protected]>

* lint

Signed-off-by: alexgreenbank <[email protected]>

* go mod tidy && go mod vendor

Signed-off-by: alexgreenbank <[email protected]>

* go.sum conflict

Signed-off-by: alexgreenbank <[email protected]>

* make doc

Signed-off-by: alexgreenbank <[email protected]>

* make influx config hidden/experimental

Signed-off-by: alexgreenbank <[email protected]>

* fix byteslice handling in replaceInvalidChars()

Signed-off-by: alexgreenbank <[email protected]>

* remove unnecessary TODOs

Signed-off-by: alexgreenbank <[email protected]>

* influx: happy path e2e test

Signed-off-by: alexgreenbank <[email protected]>

* lint

Signed-off-by: alexgreenbank <[email protected]>

* consolidate logging

Signed-off-by: alexgreenbank <[email protected]>

* CHANGELOG

Signed-off-by: alexgreenbank <[email protected]>

* about-versioning.md

Signed-off-by: alexgreenbank <[email protected]>

* Update pkg/distributor/influxpush/parser.go

Co-authored-by: Oleg Zaytsev <[email protected]>

* Update pkg/distributor/influxpush/parser.go

Co-authored-by: Oleg Zaytsev <[email protected]>

* Update pkg/distributor/influxpush/parser.go

Co-authored-by: Oleg Zaytsev <[email protected]>

* fix parsing string replacing code

Signed-off-by: alexgreenbank <[email protected]>

* fix nits

Signed-off-by: alexgreenbank <[email protected]>

---------

Signed-off-by: alexgreenbank <[email protected]>
Co-authored-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
alexgreenbank and colega authored Jan 17, 2025
1 parent bd6e14b commit 1836ff1
Show file tree
Hide file tree
Showing 27 changed files with 4,343 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_strong_consistency_requests_total`, `cortex_ingest_storage_strong_consistency_failures_total`, and `cortex_ingest_storage_strong_consistency_wait_duration_seconds` metrics. #10220
* [CHANGE] Ruler: cap the rate of retries for remote query evaluation to 170/sec. This is configurable via `-ruler.query-frontend.max-retries-rate`. #10375 #10403
* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_reader_last_produced_offset_requests_total`, `cortex_ingest_storage_reader_last_produced_offset_failures_total`, `cortex_ingest_storage_reader_last_produced_offset_request_duration_seconds`, `cortex_ingest_storage_reader_partition_start_offset_requests_total`, `cortex_ingest_storage_reader_partition_start_offset_failures_total`, `cortex_ingest_storage_reader_partition_start_offset_request_duration_seconds` metrics. #10462
* [FEATURE] Distributor: Add experimental Influx handler. #10153
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by `cortex_discarded_samples_total` metrics with the reason `sample_duplicate_timestamp`. #10145 #10430
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ The following features are currently experimental:
- Cache rule group contents.
- `-ruler-storage.cache.rule-group-enabled`
- Distributor
- Influx ingestion
- `/api/v1/influx/push` endpoint
- `-distributor.influx-endpoint-enabled`
- `-distributor.max-influx-request-size`
- Metrics relabeling
- `-distributor.metric-relabeling-enabled`
- Using status code 529 instead of 429 upon rate limit exhaustion.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/grafana/dskit v0.0.0-20250107142522-441a90acd4e5
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/influxdata/influxdb/v2 v2.7.11
github.com/json-iterator/go v1.1.12
github.com/minio/minio-go/v7 v7.0.83
github.com/mitchellh/go-wordwrap v1.0.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/influxdata/influxdb/v2 v2.7.11 h1:qs9qr5hsuFrlTiBtr5lBrALbQ2dHAanf21fBLlLpKww=
github.com/influxdata/influxdb/v2 v2.7.11/go.mod h1:zNOyzQy6WbfvGi1CK1aJ2W8khOq9+Gdsj8yLj8bHHqg=
github.com/ionos-cloud/sdk-go/v6 v6.3.0 h1:/lTieTH9Mo/CWm3cTlFLnK10jgxjUGkAqRffGqvPteY=
github.com/ionos-cloud/sdk-go/v6 v6.3.0/go.mod h1:SXrO9OGyWjd2rZhAhEpdYN6VUAODzzqRdqA9BCviQtI=
github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc=
Expand Down Expand Up @@ -1553,8 +1555,8 @@ github.com/onsi/gomega v1.24.0 h1:+0glovB9Jd6z3VR+ScSwQqXVTIfJcGA9UBM8yzQxhqg=
github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opencontainers/image-spec v1.1.0-rc2 h1:2zx/Stx4Wc5pIPDvIxHXvXtQFW/7XWJGmnM7r3wg034=
github.com/opencontainers/image-spec v1.1.0-rc2/go.mod h1:3OVijpioIKYWTqjiG0zfF6wvoJ4fAXGbjdZuI2NgsRQ=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b h1:FfH+VrHHk6Lxt9HdVS0PXzSXFyS2NbZKXv33FYPol0A=
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b/go.mod h1:AC62GU6hc0BrNm+9RK9VSiwa/EUe1bkIeFORAMcHvJU=
Expand Down
39 changes: 39 additions & 0 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package e2emimir

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -36,6 +37,7 @@ import (
yaml "gopkg.in/yaml.v3"

"github.com/grafana/mimir/pkg/alertmanager"
mimirapi "github.com/grafana/mimir/pkg/api"
"github.com/grafana/mimir/pkg/cardinality"
"github.com/grafana/mimir/pkg/distributor"
"github.com/grafana/mimir/pkg/frontend/querymiddleware"
Expand Down Expand Up @@ -187,6 +189,43 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
return res, nil
}

// PushInflux the input timeseries to the remote endpoint in Influx format.
func (c *Client) PushInflux(timeseries []prompb.TimeSeries) (*http.Response, error) {
// Create write request.
data := distributor.TimeseriesToInfluxRequest(timeseries)

// Compress it.
var buf bytes.Buffer

gzipData := gzip.NewWriter(&buf)
if _, err := gzipData.Write([]byte(data)); err != nil {
return nil, err
}
if err := gzipData.Close(); err != nil {
return nil, err
}

// Create HTTP request
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s%s", c.distributorAddress, mimirapi.InfluxPushEndpoint), &buf)
if err != nil {
return nil, err
}
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

func (c *Client) PushRW2(writeRequest *promRW2.Request) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(writeRequest)
Expand Down
100 changes: 100 additions & 0 deletions integration/influx_ingestion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// SPDX-License-Identifier: AGPL-3.0-only
//go:build requires_docker

package integration

import (
"math"
"testing"
"time"

"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/integration/e2emimir"
)

func TestInfluxIngestion(t *testing.T) {
t.Helper()

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, blocksBucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Start Mimir components.
require.NoError(t, copyFileToSharedDir(s, "docs/configurations/single-process-config-blocks.yaml", mimirConfigFile))

// Start Mimir in single binary mode, reading the config from file and overwriting
// the backend config to make it work with Minio.
flags := mergeFlags(
DefaultSingleBinaryFlags(),
BlocksStorageFlags(),
BlocksStorageS3Flags(),
map[string]string{
"-distributor.influx-endpoint-enabled": "true",
},
)

mimir := e2emimir.NewSingleBinary("mimir-1", flags, e2emimir.WithConfigFile(mimirConfigFile), e2emimir.WithPorts(9009, 9095))
require.NoError(t, s.StartAndWaitReady(mimir))

c, err := e2emimir.NewClient(mimir.HTTPEndpoint(), mimir.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push some series to Mimir.
now := time.Now()

series, expectedVector, expectedMatrix := generateFloatSeries("series_f1", now, prompb.Label{Name: "foo", Value: "bar"})
// Fix up the expectation as Influx values seem to be rounded to millionths
for _, s := range expectedVector {
s.Metric[model.LabelName("__mimir_source__")] = model.LabelValue("influx")
s.Value = model.SampleValue(math.Round(float64(s.Value)*1000000) / 1000000.0)
}
// Fix up the expectation as Influx values seem to be rounded to millionths
for im, s := range expectedMatrix {
for iv, v := range s.Values {
expectedMatrix[im].Values[iv].Value = model.SampleValue(math.Round(float64(v.Value)*1000000) / 1000000.0)
}
}

res, err := c.PushInflux(series)
require.NoError(t, err)
require.Equal(t, 204, res.StatusCode)

// Check metric to track Influx requests
require.NoError(t, mimir.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_distributor_influx_requests_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"))))

// Query the series.
result, err := c.Query("series_f1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))

labelValues, err := c.LabelValues("foo", v1.MinTime, v1.MaxTime, nil)
require.NoError(t, err)
require.Equal(t, model.LabelValues{"bar"}, labelValues)

labelNames, err := c.LabelNames(v1.MinTime, v1.MaxTime, nil)
require.NoError(t, err)
require.Equal(t, []string{"__mimir_source__", "__name__", "foo"}, labelNames)

rangeResult, err := c.QueryRange("series_f1", now.Add(-15*time.Minute), now, 15*time.Second)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, rangeResult.Type())
require.Equal(t, expectedMatrix, rangeResult.(model.Matrix))

// No metadata to query, but we do the query anyway.
_, err = c.GetPrometheusMetadata()
require.NoError(t, err)
}
9 changes: 9 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc, userL

const PrometheusPushEndpoint = "/api/v1/push"
const OTLPPushEndpoint = "/otlp/v1/metrics"
const InfluxPushEndpoint = "/api/v1/influx/push"

// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) {
Expand All @@ -266,6 +267,14 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader,
a.cfg.SkipLabelCountValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger,
), true, false, "POST")

if pushConfig.EnableInfluxEndpoint {
// The Influx Push endpoint is experimental.
a.RegisterRoute(InfluxPushEndpoint, distributor.InfluxHandler(
pushConfig.MaxInfluxRequestSize, d.RequestBufferPool, a.sourceIPs, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger,
), true, false, "POST")
}

a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(
pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.OTelResourceAttributePromotionConfig,
pushConfig.RetryConfig, pushConfig.EnableStartTimeQuietZero, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger,
Expand Down
38 changes: 37 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ const (
// metaLabelTenantID is the name of the metric_relabel_configs label with tenant ID.
metaLabelTenantID = model.MetaLabelPrefix + "tenant_id"

maxOTLPRequestSizeFlag = "distributor.max-otlp-request-size"
maxOTLPRequestSizeFlag = "distributor.max-otlp-request-size"
maxInfluxRequestSizeFlag = "distributor.max-influx-request-size"

instanceIngestionRateTickInterval = time.Second

Expand Down Expand Up @@ -209,6 +210,7 @@ type Config struct {

MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"`
MaxOTLPRequestSize int `yaml:"max_otlp_request_size" category:"experimental"`
MaxInfluxRequestSize int `yaml:"max_influx_request_size" category:"experimental" doc:"hidden"`
MaxRequestPoolBufferSize int `yaml:"max_request_pool_buffer_size" category:"experimental"`
RemoteTimeout time.Duration `yaml:"remote_timeout" category:"advanced"`

Expand Down Expand Up @@ -251,6 +253,9 @@ type Config struct {
// OTelResourceAttributePromotionConfig allows for specializing OTel resource attribute promotion.
OTelResourceAttributePromotionConfig OTelResourceAttributePromotionConfig `yaml:"-"`

// Influx endpoint disabled by default
EnableInfluxEndpoint bool `yaml:"influx_endpoint_enabled" category:"experimental" doc:"hidden"`

// Change the implementation of OTel startTime from a real zero to a special NaN value.
EnableStartTimeQuietZero bool `yaml:"start_time_quiet_zero" category:"advanced" doc:"hidden"`
}
Expand All @@ -267,9 +272,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.")
f.IntVar(&cfg.MaxOTLPRequestSize, maxOTLPRequestSizeFlag, 100<<20, "Maximum OTLP request size in bytes that the distributors accept. Requests exceeding this limit are rejected.")
f.IntVar(&cfg.MaxInfluxRequestSize, maxInfluxRequestSizeFlag, 100<<20, "Maximum Influx request size in bytes that the distributors accept. Requests exceeding this limit are rejected.")
f.IntVar(&cfg.MaxRequestPoolBufferSize, "distributor.max-request-pool-buffer-size", 0, "Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
f.BoolVar(&cfg.EnableInfluxEndpoint, "distributor.influx-endpoint-enabled", false, "Enable Influx endpoint.")
f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)")
f.BoolVar(&cfg.EnableStartTimeQuietZero, "distributor.otel-start-time-quiet-zero", false, "Change the implementation of OTel startTime from a real zero to a special NaN value.")

Expand All @@ -295,12 +302,27 @@ const (
)

type PushMetrics struct {
// Influx metrics.
influxRequestCounter *prometheus.CounterVec
influxUncompressedBodySize *prometheus.HistogramVec
// OTLP metrics.
otlpRequestCounter *prometheus.CounterVec
uncompressedBodySize *prometheus.HistogramVec
}

func newPushMetrics(reg prometheus.Registerer) *PushMetrics {
return &PushMetrics{
influxRequestCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_influx_requests_total",
Help: "The total number of Influx requests that have come in to the distributor.",
}, []string{"user"}),
influxUncompressedBodySize: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_distributor_influx_uncompressed_request_body_size_bytes",
Help: "Size of uncompressed request body in bytes.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
}, []string{"user"}),
otlpRequestCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_otlp_requests_total",
Help: "The total number of OTLP requests that have come in to the distributor.",
Expand All @@ -315,6 +337,18 @@ func newPushMetrics(reg prometheus.Registerer) *PushMetrics {
}
}

func (m *PushMetrics) IncInfluxRequest(user string) {
if m != nil {
m.influxRequestCounter.WithLabelValues(user).Inc()
}
}

func (m *PushMetrics) ObserveInfluxUncompressedBodySize(user string, size float64) {
if m != nil {
m.influxUncompressedBodySize.WithLabelValues(user).Observe(size)
}
}

func (m *PushMetrics) IncOTLPRequest(user string) {
if m != nil {
m.otlpRequestCounter.WithLabelValues(user).Inc()
Expand All @@ -328,6 +362,8 @@ func (m *PushMetrics) ObserveUncompressedBodySize(user string, size float64) {
}

func (m *PushMetrics) deleteUserMetrics(user string) {
m.influxRequestCounter.DeleteLabelValues(user)
m.influxUncompressedBodySize.DeleteLabelValues(user)
m.otlpRequestCounter.DeleteLabelValues(user)
m.uncompressedBodySize.DeleteLabelValues(user)
}
Expand Down
Loading

0 comments on commit 1836ff1

Please sign in to comment.