Skip to content

Commit

Permalink
Implement matchers cache on Ingester (#6477)
Browse files Browse the repository at this point in the history
* Implement matcher cache

Signed-off-by: alanprot <[email protected]>

* changelog

Signed-off-by: alanprot <[email protected]>

* lint

Signed-off-by: alanprot <[email protected]>

* fix evict metric

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
Signed-off-by: Alan Protasio <[email protected]>
  • Loading branch information
alanprot authored Jan 6, 2025
1 parent 12412e6 commit c2c4827
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
* [FEATURE] Ingester: Add support for cache query matchers via `-ingester.matchers-cache-max-items. #6477
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3181,6 +3181,10 @@ instance_limits:
# change by changing this option.
# CLI flag: -ingester.disable-chunk-trimming
[disable_chunk_trimming: <boolean> | default = false]
# Maximum number of entries in the matchers cache. 0 to disable.
# CLI flag: -ingester.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]
```

### `ingester_client_config`
Expand Down
5 changes: 3 additions & 2 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,9 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
"-blocks-storage.expanded_postings_cache.head.enabled": "true",
"-blocks-storage.expanded_postings_cache.block.enabled": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
"-ring.store": "consul",
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
"-ingester.matchers-cache-max-items": "10000",
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
Expand Down
9 changes: 5 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
Expand Down Expand Up @@ -3374,7 +3375,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts
return nil, errFail
}

_, _, matchers, err := client.FromQueryRequest(req)
_, _, matchers, err := client.FromQueryRequest(storecache.NewNoopMatcherCache(), req)
if err != nil {
return nil, err
}
Expand All @@ -3400,7 +3401,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
return nil, errFail
}

_, _, matchers, err := client.FromQueryRequest(req)
_, _, matchers, err := client.FromQueryRequest(storecache.NewNoopMatcherCache(), req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3459,7 +3460,7 @@ func (i *mockIngester) MetricsForLabelMatchersStream(ctx context.Context, req *c
return nil, errFail
}

_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(storecache.NewNoopMatcherCache(), req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3491,7 +3492,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
return nil, errFail
}

_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(storecache.NewNoopMatcherCache(), req)
if err != nil {
return nil, err
}
Expand Down
57 changes: 31 additions & 26 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
storecache "github.com/thanos-io/thanos/pkg/store/cache"

"github.com/cortexproject/cortex/pkg/cortexpb"
)
Expand All @@ -26,8 +27,8 @@ func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequ
}

// FromQueryRequest unpacks a QueryRequest proto.
func FromQueryRequest(req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) {
matchers, err := FromLabelMatchers(req.Matchers)
func FromQueryRequest(cache storecache.MatchersCache, req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) {
matchers, err := FromLabelMatchers(cache, req.Matchers)
if err != nil {
return 0, 0, nil, err
}
Expand Down Expand Up @@ -55,10 +56,10 @@ func ToExemplarQueryRequest(from, to model.Time, matchers ...[]*labels.Matcher)
}

// FromExemplarQueryRequest unpacks a ExemplarQueryRequest proto.
func FromExemplarQueryRequest(req *ExemplarQueryRequest) (int64, int64, [][]*labels.Matcher, error) {
func FromExemplarQueryRequest(cache storecache.MatchersCache, req *ExemplarQueryRequest) (int64, int64, [][]*labels.Matcher, error) {
var result [][]*labels.Matcher
for _, m := range req.Matchers {
matchers, err := FromLabelMatchers(m.Matchers)
matchers, err := FromLabelMatchers(cache, m.Matchers)
if err != nil {
return 0, 0, nil, err
}
Expand Down Expand Up @@ -175,10 +176,10 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
}

// FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
func FromMetricsForLabelMatchersRequest(cache storecache.MatchersCache, req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
for _, matchers := range req.MatchersSet {
matchers, err := FromLabelMatchers(matchers.Matchers)
matchers, err := FromLabelMatchers(cache, matchers.Matchers)
if err != nil {
return 0, 0, 0, nil, err
}
Expand Down Expand Up @@ -206,12 +207,12 @@ func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, limit
}

// FromLabelValuesRequest unpacks a LabelValuesRequest proto
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) {
func FromLabelValuesRequest(cache storecache.MatchersCache, req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) {
var err error
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
matchers, err = FromLabelMatchers(cache, req.Matchers.Matchers)
if err != nil {
return "", 0, 0, 0, nil, err
}
Expand All @@ -236,12 +237,12 @@ func ToLabelNamesRequest(from, to model.Time, limit int, matchers []*labels.Matc
}

// FromLabelNamesRequest unpacks a LabelNamesRequest proto
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) {
func FromLabelNamesRequest(cache storecache.MatchersCache, req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) {
var err error
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
matchers, err = FromLabelMatchers(cache, req.Matchers.Matchers)
if err != nil {
return 0, 0, 0, nil, err
}
Expand Down Expand Up @@ -275,27 +276,31 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
return result, nil
}

func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) {
func FromLabelMatchers(cache storecache.MatchersCache, matchers []*LabelMatcher) ([]*labels.Matcher, error) {
result := make([]*labels.Matcher, 0, len(matchers))
for _, matcher := range matchers {
var mtype labels.MatchType
switch matcher.Type {
case EQUAL:
mtype = labels.MatchEqual
case NOT_EQUAL:
mtype = labels.MatchNotEqual
case REGEX_MATCH:
mtype = labels.MatchRegexp
case REGEX_NO_MATCH:
mtype = labels.MatchNotRegexp
default:
return nil, fmt.Errorf("invalid matcher type")
}
matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value)
m, err := cache.GetOrSet(matcher.String(), func() (*labels.Matcher, error) {
var mtype labels.MatchType
switch matcher.Type {
case EQUAL:
mtype = labels.MatchEqual
case NOT_EQUAL:
mtype = labels.MatchNotEqual
case REGEX_MATCH:
mtype = labels.MatchRegexp
case REGEX_NO_MATCH:
mtype = labels.MatchNotRegexp
default:
return nil, fmt.Errorf("invalid matcher type")
}
return labels.NewMatcher(mtype, matcher.GetName(), matcher.GetValue())
})

if err != nil {
return nil, err
}
result = append(result, matcher)

result = append(result, m)
}
return result, nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/client/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
)

func TestQueryRequest(t *testing.T) {
Expand Down Expand Up @@ -41,7 +42,7 @@ func TestQueryRequest(t *testing.T) {
t.Fatal(err)
}

haveFrom, haveTo, haveMatchers, err := FromQueryRequest(req)
haveFrom, haveTo, haveMatchers, err := FromQueryRequest(storecache.NewNoopMatcherCache(), req)
if err != nil {
t.Fatal(err)
}
Expand Down
27 changes: 22 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/shipper"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
Expand Down Expand Up @@ -145,6 +146,9 @@ type Config struct {
// When disabled, the result may contain samples outside the queried time range but Select() performances
// may be improved.
DisableChunkTrimming bool `yaml:"disable_chunk_trimming"`

// Maximum number of entries in the matchers cache. 0 to disable.
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -173,6 +177,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.")

f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.")
f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the matchers cache. 0 to disable.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -243,6 +248,7 @@ type Ingester struct {
inflightQueryRequests atomic.Int64
maxInflightQueryRequests util_math.MaxTracker

matchersCache storecache.MatchersCache
expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory
}

Expand Down Expand Up @@ -708,7 +714,18 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
logger: logger,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
expandedPostingsCacheFactory: cortex_tsdb.NewExpandedPostingsCacheFactory(cfg.BlocksStorageConfig.TSDB.PostingsCache),
matchersCache: storecache.NewNoopMatcherCache(),
}

if cfg.MatchersCacheMaxItems > 0 {
r := prometheus.NewRegistry()
registerer.MustRegister(newMatchCacheMetrics(r, logger))
i.matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(cfg.MatchersCacheMaxItems), storecache.WithPromRegistry(r))
if err != nil {
return nil, err
}
}

i.metrics = newIngesterMetrics(registerer,
false,
cfg.ActiveSeriesMetricsEnabled,
Expand Down Expand Up @@ -1474,7 +1491,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return nil, err
}

from, through, matchers, err := client.FromExemplarQueryRequest(req)
from, through, matchers, err := client.FromExemplarQueryRequest(i.matchersCache, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1564,7 +1581,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
return nil, cleanup, err
}

labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(req)
labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(i.matchersCache, req)
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -1654,7 +1671,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
return nil, cleanup, err
}

startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(req)
startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(i.matchersCache, req)
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -1768,7 +1785,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
}

// Parse the request
_, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req)
_, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(i.matchersCache, req)
if err != nil {
return cleanup, err
}
Expand Down Expand Up @@ -1982,7 +1999,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return err
}

from, through, matchers, err := client.FromQueryRequest(req)
from, through, matchers, err := client.FromQueryRequest(i.matchersCache, req)
if err != nil {
return err
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,64 @@ func seriesSetFromResponseStream(s *mockQueryStreamServer) (storage.SeriesSet, e
return set, nil
}

func TestMatcherCache(t *testing.T) {
limits := defaultLimitsTestConfig()
userID := "1"
tenantLimits := newMockTenantLimits(map[string]*validation.Limits{userID: &limits})
registry := prometheus.NewRegistry()

dir := t.TempDir()
chunksDir := filepath.Join(dir, "chunks")
blocksDir := filepath.Join(dir, "blocks")
require.NoError(t, os.Mkdir(chunksDir, os.ModePerm))
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))
cfg := defaultIngesterTestConfig(t)
cfg.MatchersCacheMaxItems = 50
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, tenantLimits, blocksDir, registry, true)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
return ing.lifecycler.GetState()
})
ctx := user.InjectOrgID(context.Background(), userID)
// Lets have 1 key evicted
numberOfDifferentMatchers := cfg.MatchersCacheMaxItems + 1
callPerMatcher := 10
for j := 0; j < numberOfDifferentMatchers; j++ {
for i := 0; i < callPerMatcher; i++ {
s := &mockQueryStreamServer{ctx: ctx}
err = ing.QueryStream(&client.QueryRequest{
StartTimestampMs: math.MinInt64,
EndTimestampMs: math.MaxInt64,
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: fmt.Sprintf("%d", j)}},
}, s)
require.NoError(t, err)
}
}

require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(fmt.Sprintf(`
# HELP ingester_matchers_cache_evicted_total Total number of items evicted from the cache
# TYPE ingester_matchers_cache_evicted_total counter
ingester_matchers_cache_evicted_total 1
# HELP ingester_matchers_cache_hits_total Total number of cache hits for series matchers
# TYPE ingester_matchers_cache_hits_total counter
ingester_matchers_cache_hits_total %v
# HELP ingester_matchers_cache_items Total number of cached items
# TYPE ingester_matchers_cache_items gauge
ingester_matchers_cache_items %v
# HELP ingester_matchers_cache_max_items Maximum number of items that can be cached
# TYPE ingester_matchers_cache_max_items gauge
ingester_matchers_cache_max_items 0
# HELP ingester_matchers_cache_requests_total Total number of cache requests for series matchers
# TYPE ingester_matchers_cache_requests_total counter
ingester_matchers_cache_requests_total %v
`, callPerMatcher*numberOfDifferentMatchers-numberOfDifferentMatchers, cfg.MatchersCacheMaxItems, callPerMatcher*numberOfDifferentMatchers)), "ingester_matchers_cache_requests_total", "ingester_matchers_cache_hits_total", "ingester_matchers_cache_items", "ingester_matchers_cache_max_items", "ingester_matchers_cache_evicted_total"))
}

func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
limits := defaultLimitsTestConfig()
userID := "1"
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func defaultIngesterTestConfig(t testing.TB) Config {
cfg.LifecyclerConfig.FinalSleep = 0
cfg.ActiveSeriesMetricsEnabled = true
cfg.LabelsStringInterningEnabled = true
cfg.MatchersCacheMaxItems = 1024
return cfg
}

Expand Down
Loading

0 comments on commit c2c4827

Please sign in to comment.