Skip to content

Commit

Permalink
Distributor: don't return errors when discarding samples with duplica…
Browse files Browse the repository at this point in the history
…ted timestamps (#10430)

* Distributor: don't return errors when discarding samples with duplicated timestamps

Signed-off-by: Yuri Nikolic <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Yuri Nikolic <[email protected]>

* Update CHANGELOG.md

Co-authored-by: Taylor C <[email protected]>

* Remove logging for the time being

Signed-off-by: Yuri Nikolic <[email protected]>

---------

Signed-off-by: Yuri Nikolic <[email protected]>
Co-authored-by: Taylor C <[email protected]>
  • Loading branch information
duricanikolic and tacole02 authored Jan 14, 2025
1 parent 2250282 commit 78e0d3a
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 41 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* [CHANGE] Ruler: cap the rate of retries for remote query evaluation to 170/sec. This is configurable via `-ruler.query-frontend.max-retries-rate`. #10375 #10403
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by `cortex_discarded_samples_total` metrics with the reason `sample_duplicate_timestamp`. #10145 #10430
* [ENHANCEMENT] Ruler: Add `cortex_prometheus_rule_group_last_rule_duration_sum_seconds` metric to track the total evaluation duration of a rule group regardless of concurrency #10189
* [ENHANCEMENT] Distributor: Add native histogram support for `electedReplicaPropagationTime` metric in ha_tracker. #10264
* [ENHANCEMENT] Ingester: More efficient CPU/memory utilization-based read request limiting. #10325
Expand Down
4 changes: 0 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/ingest"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/extract"
"github.com/grafana/mimir/pkg/util/globalerror"
mimir_limiter "github.com/grafana/mimir/pkg/util/limiter"
util_math "github.com/grafana/mimir/pkg/util/math"
Expand Down Expand Up @@ -895,11 +894,8 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
d.validateExemplars(ts, userID, minExemplarTS, maxExemplarTS)

deduplicatedSamplesAndHistograms := totalSamplesAndHistograms - len(ts.Samples) - len(ts.Histograms)

if deduplicatedSamplesAndHistograms > 0 {
d.sampleValidationMetrics.duplicateTimestamp.WithLabelValues(userID, group).Add(float64(deduplicatedSamplesAndHistograms))
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ts.Labels)
return false, fmt.Errorf(duplicateTimestampMsgFormat, deduplicatedSamplesAndHistograms, unsafeMetricName)
}

return false, nil
Expand Down
39 changes: 4 additions & 35 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,6 @@ func TestDistributor_SampleDuplicateTimestamp(t *testing.T) {
testCases := map[string]struct {
req *mimirpb.WriteRequest
expectedSamples []mimirpb.PreallocTimeseries
expectedErrors []error
expectedMetrics string
}{
"do not deduplicate if there are no duplicated timestamps": {
Expand All @@ -1494,10 +1493,6 @@ func TestDistributor_SampleDuplicateTimestamp(t *testing.T) {
makeTimeseries(labels, append(makeSamples(10, 1), makeSamples(20, 2)...), nil, nil),
makeTimeseries(labels, nil, append(makeHistograms(30, generateTestHistogram(0)), makeHistograms(40, generateTestHistogram(1))...), nil),
},
expectedErrors: []error{
fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 2, "series"),
fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 2, "series"),
},
expectedMetrics: `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
Expand All @@ -1515,11 +1510,6 @@ func TestDistributor_SampleDuplicateTimestamp(t *testing.T) {
makeTimeseries(labels, makeSamples(10, 3), makeHistograms(20, generateTestHistogram(1)), nil),
makeTimeseries(labels, makeSamples(10, 4), append(makeHistograms(20, generateTestHistogram(3)), makeHistograms(30, generateTestHistogram(4))...), nil),
},
expectedErrors: []error{
fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 1, "series"),
fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 1, "series"),
nil,
},
expectedMetrics: `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
Expand All @@ -1541,19 +1531,10 @@ func TestDistributor_SampleDuplicateTimestamp(t *testing.T) {
require.Len(t, regs, 1)

now := mtime.Now()
for i, ts := range tc.req.Timeseries {
for _, ts := range tc.req.Timeseries {
shouldRemove, err := ds[0].validateSeries(now, &ts, "user", "test-group", true, true, 0, 0)
require.False(t, shouldRemove)
if len(tc.expectedErrors) == 0 {
require.NoError(t, err)
} else {
if tc.expectedErrors[i] == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Equal(t, tc.expectedErrors[i], err)
}
}
require.NoError(t, err)
}

assert.Equal(t, tc.expectedSamples, tc.req.Timeseries)
Expand All @@ -1574,8 +1555,7 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
timestamp := now.UnixMilli()

testCases := map[string]struct {
setup func(int) [][]mimirpb.PreallocTimeseries
expectedErrors bool
setup func(int) [][]mimirpb.PreallocTimeseries
}{
"one timeseries with one sample": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1588,7 +1568,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with one histogram": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1601,7 +1580,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with one sample and one histogram": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1614,7 +1592,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with two samples": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1627,7 +1604,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with two histograms": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1640,7 +1616,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with two samples and two histograms": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1653,7 +1628,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with 80_000 samples with duplicated timestamps": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1674,7 +1648,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: true,
},
"one timeseries with 80_000 histograms with duplicated timestamps": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1694,7 +1667,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: true,
},
"one timeseries with 80_000 samples and 80_000 histograms with duplicated timestamps": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1717,7 +1689,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: true,
},
}

Expand All @@ -1739,10 +1710,8 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
for n := 0; n < b.N; n++ {
for _, ts := range timeseries[n] {
_, err := ds[0].validateSeries(now, &ts, "user", "test-group", true, true, 0, 0)
if !tc.expectedErrors && err != nil {
if err != nil {
b.Fatal(err)
} else if tc.expectedErrors && err == nil {
b.Fatal("an error was expected")
}
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/distributor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ var (
"received a sample whose timestamp is too far in the past, timestamp: %d series: '%.200s'",
validation.PastGracePeriodFlag,
)
duplicateTimestampMsgFormat = globalerror.SampleDuplicateTimestamp.Message("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s'")
exemplarEmptyLabelsMsgFormat = globalerror.ExemplarLabelsMissing.Message(
"received an exemplar with no valid labels, timestamp: %d series: %s labels: %s",
)
Expand Down

0 comments on commit 78e0d3a

Please sign in to comment.