From bef20431cbbf302e584c4eea2eb423537bcf86e7 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 9 Jan 2025 11:27:23 +0100 Subject: [PATCH] perf(approx_topk): Reduce memory usage of HyperLogLog in approx_topk. (#15559) The count min sketch data structure backing the new approx_topk aggregation uses HyperLogLog (HLL) to track the actual cardinality of the aggregated vector. We were using the sparse version of the HLL. However, that resulted in memory and allocation overhead. --- pkg/logql/count_min_sketch.go | 7 +++++-- pkg/logql/log/labels.go | 5 ++++- pkg/logql/log/parser_test.go | 14 +++++++++++++- pkg/logql/sketch/cms.go | 2 +- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pkg/logql/count_min_sketch.go b/pkg/logql/count_min_sketch.go index e24e089ad307b..927ff6ff772e7 100644 --- a/pkg/logql/count_min_sketch.go +++ b/pkg/logql/count_min_sketch.go @@ -3,6 +3,8 @@ package logql import ( "container/heap" "fmt" + "slices" + "strings" "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" @@ -187,16 +189,17 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou } func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) { + slices.SortFunc(metric, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) v.buffer = metric.Bytes(v.buffer) v.F.Add(v.buffer, value) - // Add our metric if we haven't seen it - // TODO(karsten): There is a chance that the ids match but not the labels due to hash collision. Ideally there's // an else block the compares the series labels. However, that's not trivial. Besides, instance.Series has the // same issue in its deduping logic. id := xxhash.Sum64(v.buffer) + + // Add our metric if we haven't seen it if _, ok := v.observed[id]; !ok { heap.Push(v, metric) v.observed[id] = struct{}{} diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index c5ef408cc21ed..9e494f99237d4 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -2,7 +2,9 @@ package log import ( "fmt" + "slices" "sort" + "strings" "sync" "github.com/prometheus/prometheus/model/labels" @@ -585,7 +587,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { // Get all labels at once and sort them b.buf = b.UnsortedLabels(b.buf) - sort.Sort(b.buf) + // sort.Sort(b.buf) + slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) hash := b.hasher.Hash(b.buf) if cached, ok := b.resultCache[hash]; ok { diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 5ac3a87503634..af332c8cb54c7 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -241,25 +241,32 @@ func (p *fakeParseHints) ShouldExtract(key string) bool { p.checkCount++ return key == p.label || p.extractAll } + func (p *fakeParseHints) ShouldExtractPrefix(prefix string) bool { return prefix == p.label || p.extractAll } + func (p *fakeParseHints) NoLabels() bool { return false } + func (p *fakeParseHints) RecordExtracted(_ string) { p.count++ } + func (p *fakeParseHints) AllRequiredExtracted() bool { return !p.extractAll && p.count == 1 } + func (p *fakeParseHints) Reset() { p.checkCount = 0 p.count = 0 } + func (p *fakeParseHints) PreserveError() bool { return false } + func (p *fakeParseHints) ShouldContinueParsingLine(_ string, _ *LabelsBuilder) bool { return p.keepGoing } @@ -656,30 +663,36 @@ func Benchmark_Parser(b *testing.B) { b.Run(tt.name, func(b *testing.B) { line := []byte(tt.line) b.Run("no labels hints", func(b *testing.B) { + b.ReportAllocs() builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) b.Run("labels hints", func(b *testing.B) { + b.ReportAllocs() builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "", nil) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) b.Run("inline stages", func(b *testing.B) { + b.ReportAllocs() stages := []Stage{NewStringLabelFilter(tt.LabelFilterParseHint)} builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) builder.parserKeyHints = NewParserHint(nil, nil, false, false, ", nil", stages) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) }) @@ -1251,7 +1264,6 @@ func TestXExpressionParserFailures(t *testing.T) { }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { _, err := NewLogfmtExpressionParser([]LabelExtractionExpr{tt.expression}, false) diff --git a/pkg/logql/sketch/cms.go b/pkg/logql/sketch/cms.go index b510ee8504ea0..67f72be976c19 100644 --- a/pkg/logql/sketch/cms.go +++ b/pkg/logql/sketch/cms.go @@ -19,7 +19,7 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) { Depth: d, Width: w, Counters: make2dslice(w, d), - HyperLogLog: hyperloglog.New16(), + HyperLogLog: hyperloglog.New16NoSparse(), }, nil }