Skip to content

Commit

Permalink
perf(approx_topk): Reduce memory usage of HyperLogLog in approx_topk. (
Browse files Browse the repository at this point in the history
…#15559)

The count min sketch data structure backing the new approx_topk aggregation uses HyperLogLog (HLL) to track the actual cardinality of the aggregated vector.

We were using the sparse version of the HLL. However, that resulted in memory and allocation overhead.
  • Loading branch information
jeschkies authored Jan 9, 2025
1 parent 5f476a3 commit bef2043
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 5 deletions.
7 changes: 5 additions & 2 deletions pkg/logql/count_min_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package logql
import (
"container/heap"
"fmt"
"slices"
"strings"

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -187,16 +189,17 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou
}

func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) {
slices.SortFunc(metric, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) })
v.buffer = metric.Bytes(v.buffer)

v.F.Add(v.buffer, value)

// Add our metric if we haven't seen it

// TODO(karsten): There is a chance that the ids match but not the labels due to hash collision. Ideally there's
// an else block the compares the series labels. However, that's not trivial. Besides, instance.Series has the
// same issue in its deduping logic.
id := xxhash.Sum64(v.buffer)

// Add our metric if we haven't seen it
if _, ok := v.observed[id]; !ok {
heap.Push(v, metric)
v.observed[id] = struct{}{}
Expand Down
5 changes: 4 additions & 1 deletion pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package log

import (
"fmt"
"slices"
"sort"
"strings"
"sync"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -585,7 +587,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult {

// Get all labels at once and sort them
b.buf = b.UnsortedLabels(b.buf)
sort.Sort(b.buf)
// sort.Sort(b.buf)
slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) })
hash := b.hasher.Hash(b.buf)

if cached, ok := b.resultCache[hash]; ok {
Expand Down
14 changes: 13 additions & 1 deletion pkg/logql/log/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,25 +241,32 @@ func (p *fakeParseHints) ShouldExtract(key string) bool {
p.checkCount++
return key == p.label || p.extractAll
}

func (p *fakeParseHints) ShouldExtractPrefix(prefix string) bool {
return prefix == p.label || p.extractAll
}

func (p *fakeParseHints) NoLabels() bool {
return false
}

func (p *fakeParseHints) RecordExtracted(_ string) {
p.count++
}

func (p *fakeParseHints) AllRequiredExtracted() bool {
return !p.extractAll && p.count == 1
}

func (p *fakeParseHints) Reset() {
p.checkCount = 0
p.count = 0
}

func (p *fakeParseHints) PreserveError() bool {
return false
}

func (p *fakeParseHints) ShouldContinueParsingLine(_ string, _ *LabelsBuilder) bool {
return p.keepGoing
}
Expand Down Expand Up @@ -656,30 +663,36 @@ func Benchmark_Parser(b *testing.B) {
b.Run(tt.name, func(b *testing.B) {
line := []byte(tt.line)
b.Run("no labels hints", func(b *testing.B) {
b.ReportAllocs()
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
})

b.Run("labels hints", func(b *testing.B) {
b.ReportAllocs()
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "", nil)

for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
})

b.Run("inline stages", func(b *testing.B) {
b.ReportAllocs()
stages := []Stage{NewStringLabelFilter(tt.LabelFilterParseHint)}
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
builder.parserKeyHints = NewParserHint(nil, nil, false, false, ", nil", stages)
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
})
})
Expand Down Expand Up @@ -1251,7 +1264,6 @@ func TestXExpressionParserFailures(t *testing.T) {
},
}
for _, tt := range tests {

t.Run(tt.name, func(t *testing.T) {
_, err := NewLogfmtExpressionParser([]LabelExtractionExpr{tt.expression}, false)

Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/sketch/cms.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) {
Depth: d,
Width: w,
Counters: make2dslice(w, d),
HyperLogLog: hyperloglog.New16(),
HyperLogLog: hyperloglog.New16NoSparse(),
}, nil
}

Expand Down

0 comments on commit bef2043

Please sign in to comment.