From b812677266850268b4b00dd689a3baedb13a87b2 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri <56quarters@users.noreply.github.com> Date: Thu, 16 Jan 2025 09:12:29 -0500 Subject: [PATCH] Rewrite subqueries that have the same range and resolution (#10445) * Rewrite subqueries that have the same range and resolution Fix queries that have the same range and resolution since this will only return a single point when using Prometheus 3 selectors, left open and right closed (compared to Prometheus 2 left closed and right closed). * Changelog Signed-off-by: Nick Pillitteri --------- Signed-off-by: Nick Pillitteri --- CHANGELOG.md | 1 + cmd/mimir/config-descriptor.json | 11 +++ cmd/mimir/help-all.txt.tmpl | 2 + .../configuration-parameters/index.md | 6 ++ .../astmapper/prom2_range_compat.go | 44 +++++++++ .../astmapper/prom2_range_compat_test.go | 64 +++++++++++++ pkg/frontend/querymiddleware/limits.go | 3 + pkg/frontend/querymiddleware/limits_test.go | 9 ++ .../querymiddleware/prom2_range_compat.go | 79 ++++++++++++++++ .../prom2_range_compat_test.go | 93 +++++++++++++++++++ pkg/frontend/querymiddleware/roundtrip.go | 8 +- .../querymiddleware/roundtrip_test.go | 1 + pkg/util/validation/limits.go | 6 ++ 13 files changed, 326 insertions(+), 1 deletion(-) create mode 100644 pkg/frontend/querymiddleware/astmapper/prom2_range_compat.go create mode 100644 pkg/frontend/querymiddleware/astmapper/prom2_range_compat_test.go create mode 100644 pkg/frontend/querymiddleware/prom2_range_compat.go create mode 100644 pkg/frontend/querymiddleware/prom2_range_compat_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b45e39a133..a64668720bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ * [BUGFIX] PromQL: Fix functions with histograms https://github.com/prometheus/prometheus/pull/15711 #10400 * [BUGFIX] MQE: Fix functions with histograms #10400 * [BUGFIX] Distributor: return HTTP status 415 Unsupported Media Type instead of 200 Success for Remote Write 2.0 until we support it. #10423 +* [BUGFIX] Query-frontend: Add flag `-query-frontend.prom2-range-compat` and corresponding YAML to rewrite queries with ranges that worked in Prometheus 2 but are invalid in Prometheus 3. #10445 ### Mixin diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index bdf5cd5536f..ca4965964c9 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -4338,6 +4338,17 @@ "fieldType": "string", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "prom2_range_compat", + "required": false, + "desc": "Rewrite queries using the same range selector and resolution [X:X] which do not work in Prometheus 3 to a nearly identical form that does work with Prometheus 3 semantics", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "query-frontend.prom2-range-compat", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "cardinality_analysis_enabled", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 17caad393e3..add1270f858 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2289,6 +2289,8 @@ Usage of ./cmd/mimir/mimir: Maximum time to wait for the query-frontend to become ready before rejecting requests received before the frontend was ready. 0 to disable (i.e. fail immediately if a request is received while the frontend is still starting up) (default 2s) -query-frontend.parallelize-shardable-queries True to enable query sharding. + -query-frontend.prom2-range-compat + [experimental] Rewrite queries using the same range selector and resolution [X:X] which do not work in Prometheus 3 to a nearly identical form that does work with Prometheus 3 semantics -query-frontend.prune-queries [experimental] True to enable pruning dead code (eg. expressions that cannot produce any results) and simplifying expressions (eg. expressions that can be evaluated immediately) in queries. -query-frontend.querier-forget-delay duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index b65a35a7889..989aab6b4f8 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3559,6 +3559,12 @@ The `limits` block configures default and per-tenant limits imposed by component # CLI flag: -query-frontend.enabled-promql-experimental-functions [enabled_promql_experimental_functions: | default = ""] +# (experimental) Rewrite queries using the same range selector and resolution +# [X:X] which do not work in Prometheus 3 to a nearly identical form that does +# work with Prometheus 3 semantics +# CLI flag: -query-frontend.prom2-range-compat +[prom2_range_compat: | default = false] + # Enables endpoints used for cardinality analysis. # CLI flag: -querier.cardinality-analysis-enabled [cardinality_analysis_enabled: | default = false] diff --git a/pkg/frontend/querymiddleware/astmapper/prom2_range_compat.go b/pkg/frontend/querymiddleware/astmapper/prom2_range_compat.go new file mode 100644 index 00000000000..b9c50bc124e --- /dev/null +++ b/pkg/frontend/querymiddleware/astmapper/prom2_range_compat.go @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package astmapper + +import ( + "context" + "time" + + "github.com/prometheus/prometheus/promql/parser" +) + +// NewProm2RangeCompat creates a new ASTMapper which modifies the range of subqueries +// with identical ranges and steps (which used to returns results in Prometheus 2 since +// range selectors were left closed right closed) to be compatible with Prometheus 3 +// range selectors which are left open right closed. +func NewProm2RangeCompat(ctx context.Context) ASTMapper { + compat := &prom2RangeCompat{ctx: ctx} + return NewASTExprMapper(compat) +} + +type prom2RangeCompat struct { + ctx context.Context +} + +func (c prom2RangeCompat) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) { + if err := c.ctx.Err(); err != nil { + return nil, false, err + } + + e, ok := expr.(*parser.SubqueryExpr) + if !ok { + return expr, false, nil + } + + // Due to range selectors being left open right closed in Prometheus 3, subqueries with identical + // range and step will only select a single datapoint which breaks functions that need multiple + // points (rate, increase). Adjust the range here slightly to ensure that multiple data points + // are returned to match the Prometheus 2 behavior. + if e.Range == e.Step { + e.Range = e.Range + time.Millisecond + } + + return e, false, nil +} diff --git a/pkg/frontend/querymiddleware/astmapper/prom2_range_compat_test.go b/pkg/frontend/querymiddleware/astmapper/prom2_range_compat_test.go new file mode 100644 index 00000000000..6c8f5c9e7dd --- /dev/null +++ b/pkg/frontend/querymiddleware/astmapper/prom2_range_compat_test.go @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package astmapper + +import ( + "context" + "testing" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/require" +) + +func TestProm2RangeCompat_Cancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + query, _ := parser.ParseExpr(`up{foo="bar"}`) + mapper := NewProm2RangeCompat(ctx) + _, err := mapper.Map(query) + + require.ErrorIs(t, err, context.Canceled) +} + +func TestProm2RangeCompat_Queries(t *testing.T) { + type testCase struct { + query string + expectedQuery string + } + + testCases := []testCase{ + { + query: `sum(rate(some_series{job="foo"}[1m]))`, + expectedQuery: `sum(rate(some_series{job="foo"}[1m]))`, + }, + { + query: `sum(rate(some_series{job="foo"}[1m:1m]))`, + expectedQuery: `sum(rate(some_series{job="foo"}[1m1ms:1m]))`, + }, + { + query: `sum(rate(some_series{job="foo"}[1h]))`, + expectedQuery: `sum(rate(some_series{job="foo"}[1h]))`, + }, + { + query: `sum(rate(some_series{job="foo"}[1h:1h]))`, + expectedQuery: `sum(rate(some_series{job="foo"}[1h1ms:1h]))`, + }, + { + query: `sum(rate(some_series{job="foo"}[1h:1h])) / sum(rate(other_series{job="foo"}[1m:1m]))`, + expectedQuery: `sum(rate(some_series{job="foo"}[1h1ms:1h])) / sum(rate(other_series{job="foo"}[1m1ms:1m]))`, + }, + } + + for _, tc := range testCases { + t.Run(tc.query, func(t *testing.T) { + query, err := parser.ParseExpr(tc.query) + require.NoError(t, err) + + mapper := NewProm2RangeCompat(context.Background()) + mapped, err := mapper.Map(query) + require.NoError(t, err) + require.Equal(t, tc.expectedQuery, mapped.String()) + }) + } +} diff --git a/pkg/frontend/querymiddleware/limits.go b/pkg/frontend/querymiddleware/limits.go index 364497af69d..7e76fdc32e9 100644 --- a/pkg/frontend/querymiddleware/limits.go +++ b/pkg/frontend/querymiddleware/limits.go @@ -99,6 +99,9 @@ type Limits interface { // EnabledPromQLExperimentalFunctions returns the names of PromQL experimental functions allowed for the tenant. EnabledPromQLExperimentalFunctions(userID string) []string + // Prom2RangeCompat returns if Prometheus 2/3 range compatibility fixes are enabled for the tenant. + Prom2RangeCompat(userID string) bool + // BlockedQueries returns the blocked queries. BlockedQueries(userID string) []*validation.BlockedQuery diff --git a/pkg/frontend/querymiddleware/limits_test.go b/pkg/frontend/querymiddleware/limits_test.go index 8d5551b679f..ac2b3615765 100644 --- a/pkg/frontend/querymiddleware/limits_test.go +++ b/pkg/frontend/querymiddleware/limits_test.go @@ -573,6 +573,10 @@ func (m multiTenantMockLimits) EnabledPromQLExperimentalFunctions(userID string) return m.byTenant[userID].enabledPromQLExperimentalFunctions } +func (m multiTenantMockLimits) Prom2RangeCompat(userID string) bool { + return m.byTenant[userID].prom2RangeCompat +} + func (m multiTenantMockLimits) BlockedQueries(userID string) []*validation.BlockedQuery { return m.byTenant[userID].blockedQueries } @@ -620,6 +624,7 @@ type mockLimits struct { resultsCacheTTLForErrors time.Duration resultsCacheForUnalignedQueryEnabled bool enabledPromQLExperimentalFunctions []string + prom2RangeCompat bool blockedQueries []*validation.BlockedQuery alignQueriesWithStep bool queryIngestersWithin time.Duration @@ -712,6 +717,10 @@ func (m mockLimits) EnabledPromQLExperimentalFunctions(string) []string { return m.enabledPromQLExperimentalFunctions } +func (m mockLimits) Prom2RangeCompat(string) bool { + return m.prom2RangeCompat +} + func (m mockLimits) CreationGracePeriod(string) time.Duration { return m.creationGracePeriod } diff --git a/pkg/frontend/querymiddleware/prom2_range_compat.go b/pkg/frontend/querymiddleware/prom2_range_compat.go new file mode 100644 index 00000000000..9a53ccabc4c --- /dev/null +++ b/pkg/frontend/querymiddleware/prom2_range_compat.go @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querymiddleware + +import ( + "context" + + "github.com/go-kit/log" + "github.com/grafana/dskit/tenant" + "github.com/prometheus/prometheus/promql/parser" + + apierror "github.com/grafana/mimir/pkg/api/error" + "github.com/grafana/mimir/pkg/frontend/querymiddleware/astmapper" + "github.com/grafana/mimir/pkg/util/spanlogger" + "github.com/grafana/mimir/pkg/util/validation" +) + +// newProm2RangeCompatMiddleware creates a new query middleware that adjusts range and resolution +// selectors in subqueries in some cases for compatibility with Prometheus 3. +func newProm2RangeCompatMiddleware(limits Limits, logger log.Logger) MetricsQueryMiddleware { + return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { + return &prom2RangeCompatHandler{ + next: next, + limits: limits, + logger: logger, + } + }) +} + +type prom2RangeCompatHandler struct { + next MetricsQueryHandler + limits Limits + logger log.Logger +} + +func (c *prom2RangeCompatHandler) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) { + spanLog := spanlogger.FromContext(ctx, c.logger) + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return c.next.Do(ctx, r) + } + + if !validation.AllTrueBooleansPerTenant(tenantIDs, c.limits.Prom2RangeCompat) { + return c.next.Do(ctx, r) + } + + origQuery := r.GetQuery() + rewritten, err := c.rewrite(ctx, origQuery) + if err != nil { + return c.next.Do(ctx, r) + } + + rewrittenQuery := rewritten.String() + if origQuery != rewrittenQuery { + spanLog.DebugLog( + "msg", "modified subquery for compatibility with Prometheus 3 range selectors", + "original", origQuery, + "rewritten", rewrittenQuery, + ) + r, _ = r.WithExpr(rewritten) + } + + return c.next.Do(ctx, r) +} + +func (c *prom2RangeCompatHandler) rewrite(ctx context.Context, query string) (parser.Expr, error) { + expr, err := parser.ParseExpr(query) + if err != nil { + return nil, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error()) + } + + mapper := astmapper.NewProm2RangeCompat(ctx) + rewritten, err := mapper.Map(expr) + if err != nil { + return nil, err + } + + return rewritten, nil +} diff --git a/pkg/frontend/querymiddleware/prom2_range_compat_test.go b/pkg/frontend/querymiddleware/prom2_range_compat_test.go new file mode 100644 index 00000000000..745fdd0f981 --- /dev/null +++ b/pkg/frontend/querymiddleware/prom2_range_compat_test.go @@ -0,0 +1,93 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querymiddleware + +import ( + "context" + "testing" + + "github.com/go-kit/log" + "github.com/grafana/dskit/user" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestProm2RangeCompat_Do(t *testing.T) { + newRangeRequest := func(q string) *PrometheusRangeQueryRequest { + return &PrometheusRangeQueryRequest{ + queryExpr: parseQuery(t, q), + start: 100, + end: 200, + step: 10, + } + } + + runHandler := func(ctx context.Context, inner MetricsQueryHandler, limits Limits, req MetricsQueryRequest) (Response, error) { + middleware := newProm2RangeCompatMiddleware(limits, log.NewNopLogger()) + handler := middleware.Wrap(inner) + return handler.Do(ctx, req) + } + + t.Run("no user set", func(t *testing.T) { + ctx := context.Background() + req := newRangeRequest("sum(rate(some_series[1m:1m]))") + + innerRes := newEmptyPrometheusResponse() + inner := &mockHandler{} + inner.On("Do", mock.Anything, req).Return(innerRes, nil) + + limits := &mockLimits{prom2RangeCompat: true} + res, err := runHandler(ctx, inner, limits, req) + + require.NoError(t, err) + require.Equal(t, innerRes, res) + }) + + t.Run("disabled by config", func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "1234") + req := newRangeRequest("sum(rate(some_series[1m:1m]))") + + innerRes := newEmptyPrometheusResponse() + inner := &mockHandler{} + inner.On("Do", mock.Anything, req).Return(innerRes, nil) + + limits := &mockLimits{prom2RangeCompat: false} + res, err := runHandler(ctx, inner, limits, req) + + require.NoError(t, err) + require.Equal(t, innerRes, res) + }) + + t.Run("no resolution", func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "1234") + req := newRangeRequest("sum(rate(some_series[1m]))") + + innerRes := newEmptyPrometheusResponse() + inner := &mockHandler{} + inner.On("Do", mock.Anything, req).Return(innerRes, nil) + + limits := &mockLimits{prom2RangeCompat: true} + res, err := runHandler(ctx, inner, limits, req) + + require.NoError(t, err) + require.Equal(t, innerRes, res) + }) + + t.Run("query rewritten", func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "1234") + orig := newRangeRequest("sum(rate(some_series[1m:1m]))") + rewritten := newRangeRequest("sum(rate(some_series[1m1ms:1m]))") + + innerRes := newEmptyPrometheusResponse() + inner := &mockHandler{} + inner.On("Do", mock.Anything, mock.MatchedBy(func(req MetricsQueryRequest) bool { + return req.GetQuery() == rewritten.GetQuery() + })).Return(innerRes, nil) + + limits := &mockLimits{prom2RangeCompat: true} + res, err := runHandler(ctx, inner, limits, orig) + + require.NoError(t, err) + require.Equal(t, innerRes, res) + }) +} diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index 05da4ffe986..3ac38c4e573 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -352,18 +352,22 @@ func newQueryMiddlewares( metrics := newInstrumentMiddlewareMetrics(registerer) queryBlockerMiddleware := newQueryBlockerMiddleware(limits, log, registerer) queryStatsMiddleware := newQueryStatsMiddleware(registerer, engine) + prom2CompatMiddleware := newProm2RangeCompatMiddleware(limits, log) remoteReadMiddleware = append(remoteReadMiddleware, // Track query range statistics. Added first before any subsequent middleware modifies the request. queryStatsMiddleware, newLimitsMiddleware(limits, log), - queryBlockerMiddleware) + queryBlockerMiddleware, + ) queryRangeMiddleware = append(queryRangeMiddleware, // Track query range statistics. Added first before any subsequent middleware modifies the request. queryStatsMiddleware, newLimitsMiddleware(limits, log), queryBlockerMiddleware, + newInstrumentMiddleware("prom2_compat", metrics), + prom2CompatMiddleware, newInstrumentMiddleware("step_align", metrics), newStepAlignMiddleware(limits, log, registerer), ) @@ -399,6 +403,8 @@ func newQueryMiddlewares( newLimitsMiddleware(limits, log), newSplitInstantQueryByIntervalMiddleware(limits, log, engine, registerer), queryBlockerMiddleware, + newInstrumentMiddleware("prom2_compat", metrics), + prom2CompatMiddleware, ) // Inject the extra middlewares provided by the user before the query pruning and query sharding middleware. diff --git a/pkg/frontend/querymiddleware/roundtrip_test.go b/pkg/frontend/querymiddleware/roundtrip_test.go index 0e829478ae7..32f9260ab89 100644 --- a/pkg/frontend/querymiddleware/roundtrip_test.go +++ b/pkg/frontend/querymiddleware/roundtrip_test.go @@ -551,6 +551,7 @@ func TestMiddlewaresConsistency(t *testing.T) { "stepAlignMiddleware", // Not applicable because remote read requests don't take step in account when running in Mimir. "pruneMiddleware", // No query pruning support. "experimentalFunctionsMiddleware", // No blocking for PromQL experimental functions as it is executed remotely. + "prom2RangeCompatHandler", // No rewriting Prometheus 2 subqueries to Prometheus 3 }, }, } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 039929f045a..df9d12979d2 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -180,6 +180,7 @@ type Limits struct { BlockedQueries []*BlockedQuery `yaml:"blocked_queries,omitempty" json:"blocked_queries,omitempty" doc:"nocli|description=List of queries to block." category:"experimental"` AlignQueriesWithStep bool `yaml:"align_queries_with_step" json:"align_queries_with_step"` EnabledPromQLExperimentalFunctions flagext.StringSliceCSV `yaml:"enabled_promql_experimental_functions" json:"enabled_promql_experimental_functions" category:"experimental"` + Prom2RangeCompat bool `yaml:"prom2_range_compat" json:"prom2_range_compat" category:"experimental"` // Cardinality CardinalityAnalysisEnabled bool `yaml:"cardinality_analysis_enabled" json:"cardinality_analysis_enabled"` @@ -371,6 +372,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxQueryExpressionSizeBytes, MaxQueryExpressionSizeBytesFlag, 0, "Max size of the raw query, in bytes. This limit is enforced by the query-frontend for instant, range and remote read queries. 0 to not apply a limit to the size of the query.") f.BoolVar(&l.AlignQueriesWithStep, alignQueriesWithStepFlag, false, "Mutate incoming queries to align their start and end with their step to improve result caching.") f.Var(&l.EnabledPromQLExperimentalFunctions, "query-frontend.enabled-promql-experimental-functions", "Enable certain experimental PromQL functions, which are subject to being changed or removed at any time, on a per-tenant basis. Defaults to empty which means all experimental functions are disabled. Set to 'all' to enable all experimental functions.") + f.BoolVar(&l.Prom2RangeCompat, "query-frontend.prom2-range-compat", false, "Rewrite queries using the same range selector and resolution [X:X] which do not work in Prometheus 3 to a nearly identical form that does work with Prometheus 3 semantics") // Store-gateway. f.IntVar(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The tenant's shard size, used when store-gateway sharding is enabled. Value of 0 disables shuffle sharding for the tenant, that is all tenant blocks are sharded across all store-gateway replicas.") @@ -1103,6 +1105,10 @@ func (o *Overrides) EnabledPromQLExperimentalFunctions(userID string) []string { return o.getOverridesForUser(userID).EnabledPromQLExperimentalFunctions } +func (o *Overrides) Prom2RangeCompat(userID string) bool { + return o.getOverridesForUser(userID).Prom2RangeCompat +} + func (o *Overrides) OTelMetricSuffixesEnabled(tenantID string) bool { return o.getOverridesForUser(tenantID).OTelMetricSuffixesEnabled }