Skip to content

Commit

Permalink
Rewrite subqueries that have the same range and resolution (#10445)
Browse files Browse the repository at this point in the history
* Rewrite subqueries that have the same range and resolution

Fix queries that have the same range and resolution since this will
only return a single point when using Prometheus 3 selectors, left
open and right closed (compared to Prometheus 2 left closed and right
closed).

* Changelog

Signed-off-by: Nick Pillitteri <[email protected]>

---------

Signed-off-by: Nick Pillitteri <[email protected]>
(cherry picked from commit b812677)
  • Loading branch information
56quarters committed Jan 17, 2025
1 parent d541fff commit 17afe02
Show file tree
Hide file tree
Showing 13 changed files with 326 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [CHANGE] Ruler: cap the rate of retries for remote query evaluation to 170/sec. This is configurable via `-ruler.query-frontend.max-retries-rate`. #10375
* [BUGFIX] Query-frontend: Add flag `-query-frontend.prom2-range-compat` and corresponding YAML to rewrite queries with ranges that worked in Prometheus 2 but are invalid in Prometheus 3. #10445

### Mixin

Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4327,6 +4327,17 @@
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "prom2_range_compat",
"required": false,
"desc": "Rewrite queries using the same range selector and resolution [X:X] which do not work in Prometheus 3 to a nearly identical form that does work with Prometheus 3 semantics",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "query-frontend.prom2-range-compat",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cardinality_analysis_enabled",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2287,6 +2287,8 @@ Usage of ./cmd/mimir/mimir:
Maximum time to wait for the query-frontend to become ready before rejecting requests received before the frontend was ready. 0 to disable (i.e. fail immediately if a request is received while the frontend is still starting up) (default 2s)
-query-frontend.parallelize-shardable-queries
True to enable query sharding.
-query-frontend.prom2-range-compat
[experimental] Rewrite queries using the same range selector and resolution [X:X] which do not work in Prometheus 3 to a nearly identical form that does work with Prometheus 3 semantics
-query-frontend.prune-queries
[experimental] True to enable pruning dead code (eg. expressions that cannot produce any results) and simplifying expressions (eg. expressions that can be evaluated immediately) in queries.
-query-frontend.querier-forget-delay duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3552,6 +3552,12 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -query-frontend.enabled-promql-experimental-functions
[enabled_promql_experimental_functions: <string> | default = ""]

# (experimental) Rewrite queries using the same range selector and resolution
# [X:X] which do not work in Prometheus 3 to a nearly identical form that does
# work with Prometheus 3 semantics
# CLI flag: -query-frontend.prom2-range-compat
[prom2_range_compat: <boolean> | default = false]

# Enables endpoints used for cardinality analysis.
# CLI flag: -querier.cardinality-analysis-enabled
[cardinality_analysis_enabled: <boolean> | default = false]
Expand Down
44 changes: 44 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/prom2_range_compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

import (
"context"
"time"

"github.com/prometheus/prometheus/promql/parser"
)

// NewProm2RangeCompat creates a new ASTMapper which modifies the range of subqueries
// with identical ranges and steps (which used to returns results in Prometheus 2 since
// range selectors were left closed right closed) to be compatible with Prometheus 3
// range selectors which are left open right closed.
func NewProm2RangeCompat(ctx context.Context) ASTMapper {
compat := &prom2RangeCompat{ctx: ctx}
return NewASTExprMapper(compat)
}

type prom2RangeCompat struct {
ctx context.Context
}

func (c prom2RangeCompat) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if err := c.ctx.Err(); err != nil {
return nil, false, err
}

e, ok := expr.(*parser.SubqueryExpr)
if !ok {
return expr, false, nil
}

// Due to range selectors being left open right closed in Prometheus 3, subqueries with identical
// range and step will only select a single datapoint which breaks functions that need multiple
// points (rate, increase). Adjust the range here slightly to ensure that multiple data points
// are returned to match the Prometheus 2 behavior.
if e.Range == e.Step {
e.Range = e.Range + time.Millisecond
}

return e, false, nil
}
64 changes: 64 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/prom2_range_compat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

import (
"context"
"testing"

"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
)

func TestProm2RangeCompat_Cancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

query, _ := parser.ParseExpr(`up{foo="bar"}`)
mapper := NewProm2RangeCompat(ctx)
_, err := mapper.Map(query)

require.ErrorIs(t, err, context.Canceled)
}

func TestProm2RangeCompat_Queries(t *testing.T) {
type testCase struct {
query string
expectedQuery string
}

testCases := []testCase{
{
query: `sum(rate(some_series{job="foo"}[1m]))`,
expectedQuery: `sum(rate(some_series{job="foo"}[1m]))`,
},
{
query: `sum(rate(some_series{job="foo"}[1m:1m]))`,
expectedQuery: `sum(rate(some_series{job="foo"}[1m1ms:1m]))`,
},
{
query: `sum(rate(some_series{job="foo"}[1h]))`,
expectedQuery: `sum(rate(some_series{job="foo"}[1h]))`,
},
{
query: `sum(rate(some_series{job="foo"}[1h:1h]))`,
expectedQuery: `sum(rate(some_series{job="foo"}[1h1ms:1h]))`,
},
{
query: `sum(rate(some_series{job="foo"}[1h:1h])) / sum(rate(other_series{job="foo"}[1m:1m]))`,
expectedQuery: `sum(rate(some_series{job="foo"}[1h1ms:1h])) / sum(rate(other_series{job="foo"}[1m1ms:1m]))`,
},
}

for _, tc := range testCases {
t.Run(tc.query, func(t *testing.T) {
query, err := parser.ParseExpr(tc.query)
require.NoError(t, err)

mapper := NewProm2RangeCompat(context.Background())
mapped, err := mapper.Map(query)
require.NoError(t, err)
require.Equal(t, tc.expectedQuery, mapped.String())
})
}
}
3 changes: 3 additions & 0 deletions pkg/frontend/querymiddleware/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type Limits interface {
// EnabledPromQLExperimentalFunctions returns the names of PromQL experimental functions allowed for the tenant.
EnabledPromQLExperimentalFunctions(userID string) []string

// Prom2RangeCompat returns if Prometheus 2/3 range compatibility fixes are enabled for the tenant.
Prom2RangeCompat(userID string) bool

// BlockedQueries returns the blocked queries.
BlockedQueries(userID string) []*validation.BlockedQuery

Expand Down
9 changes: 9 additions & 0 deletions pkg/frontend/querymiddleware/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,10 @@ func (m multiTenantMockLimits) EnabledPromQLExperimentalFunctions(userID string)
return m.byTenant[userID].enabledPromQLExperimentalFunctions
}

func (m multiTenantMockLimits) Prom2RangeCompat(userID string) bool {
return m.byTenant[userID].prom2RangeCompat
}

func (m multiTenantMockLimits) BlockedQueries(userID string) []*validation.BlockedQuery {
return m.byTenant[userID].blockedQueries
}
Expand Down Expand Up @@ -620,6 +624,7 @@ type mockLimits struct {
resultsCacheTTLForErrors time.Duration
resultsCacheForUnalignedQueryEnabled bool
enabledPromQLExperimentalFunctions []string
prom2RangeCompat bool
blockedQueries []*validation.BlockedQuery
alignQueriesWithStep bool
queryIngestersWithin time.Duration
Expand Down Expand Up @@ -712,6 +717,10 @@ func (m mockLimits) EnabledPromQLExperimentalFunctions(string) []string {
return m.enabledPromQLExperimentalFunctions
}

func (m mockLimits) Prom2RangeCompat(string) bool {
return m.prom2RangeCompat
}

func (m mockLimits) CreationGracePeriod(string) time.Duration {
return m.creationGracePeriod
}
Expand Down
79 changes: 79 additions & 0 deletions pkg/frontend/querymiddleware/prom2_range_compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querymiddleware

import (
"context"

"github.com/go-kit/log"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/prometheus/promql/parser"

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/frontend/querymiddleware/astmapper"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
)

// newProm2RangeCompatMiddleware creates a new query middleware that adjusts range and resolution
// selectors in subqueries in some cases for compatibility with Prometheus 3.
func newProm2RangeCompatMiddleware(limits Limits, logger log.Logger) MetricsQueryMiddleware {
return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler {
return &prom2RangeCompatHandler{
next: next,
limits: limits,
logger: logger,
}
})
}

type prom2RangeCompatHandler struct {
next MetricsQueryHandler
limits Limits
logger log.Logger
}

func (c *prom2RangeCompatHandler) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) {
spanLog := spanlogger.FromContext(ctx, c.logger)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return c.next.Do(ctx, r)
}

if !validation.AllTrueBooleansPerTenant(tenantIDs, c.limits.Prom2RangeCompat) {
return c.next.Do(ctx, r)
}

origQuery := r.GetQuery()
rewritten, err := c.rewrite(ctx, origQuery)
if err != nil {
return c.next.Do(ctx, r)
}

rewrittenQuery := rewritten.String()
if origQuery != rewrittenQuery {
spanLog.DebugLog(
"msg", "modified subquery for compatibility with Prometheus 3 range selectors",
"original", origQuery,
"rewritten", rewrittenQuery,
)
r, _ = r.WithExpr(rewritten)
}

return c.next.Do(ctx, r)
}

func (c *prom2RangeCompatHandler) rewrite(ctx context.Context, query string) (parser.Expr, error) {
expr, err := parser.ParseExpr(query)
if err != nil {
return nil, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error())
}

mapper := astmapper.NewProm2RangeCompat(ctx)
rewritten, err := mapper.Map(expr)
if err != nil {
return nil, err
}

return rewritten, nil
}
93 changes: 93 additions & 0 deletions pkg/frontend/querymiddleware/prom2_range_compat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querymiddleware

import (
"context"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestProm2RangeCompat_Do(t *testing.T) {
newRangeRequest := func(q string) *PrometheusRangeQueryRequest {
return &PrometheusRangeQueryRequest{
queryExpr: parseQuery(t, q),
start: 100,
end: 200,
step: 10,
}
}

runHandler := func(ctx context.Context, inner MetricsQueryHandler, limits Limits, req MetricsQueryRequest) (Response, error) {
middleware := newProm2RangeCompatMiddleware(limits, log.NewNopLogger())
handler := middleware.Wrap(inner)
return handler.Do(ctx, req)
}

t.Run("no user set", func(t *testing.T) {
ctx := context.Background()
req := newRangeRequest("sum(rate(some_series[1m:1m]))")

innerRes := newEmptyPrometheusResponse()
inner := &mockHandler{}
inner.On("Do", mock.Anything, req).Return(innerRes, nil)

limits := &mockLimits{prom2RangeCompat: true}
res, err := runHandler(ctx, inner, limits, req)

require.NoError(t, err)
require.Equal(t, innerRes, res)
})

t.Run("disabled by config", func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1234")
req := newRangeRequest("sum(rate(some_series[1m:1m]))")

innerRes := newEmptyPrometheusResponse()
inner := &mockHandler{}
inner.On("Do", mock.Anything, req).Return(innerRes, nil)

limits := &mockLimits{prom2RangeCompat: false}
res, err := runHandler(ctx, inner, limits, req)

require.NoError(t, err)
require.Equal(t, innerRes, res)
})

t.Run("no resolution", func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1234")
req := newRangeRequest("sum(rate(some_series[1m]))")

innerRes := newEmptyPrometheusResponse()
inner := &mockHandler{}
inner.On("Do", mock.Anything, req).Return(innerRes, nil)

limits := &mockLimits{prom2RangeCompat: true}
res, err := runHandler(ctx, inner, limits, req)

require.NoError(t, err)
require.Equal(t, innerRes, res)
})

t.Run("query rewritten", func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1234")
orig := newRangeRequest("sum(rate(some_series[1m:1m]))")
rewritten := newRangeRequest("sum(rate(some_series[1m1ms:1m]))")

innerRes := newEmptyPrometheusResponse()
inner := &mockHandler{}
inner.On("Do", mock.Anything, mock.MatchedBy(func(req MetricsQueryRequest) bool {
return req.GetQuery() == rewritten.GetQuery()
})).Return(innerRes, nil)

limits := &mockLimits{prom2RangeCompat: true}
res, err := runHandler(ctx, inner, limits, orig)

require.NoError(t, err)
require.Equal(t, innerRes, res)
})
}
Loading

0 comments on commit 17afe02

Please sign in to comment.