-
Notifications
You must be signed in to change notification settings - Fork 545
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rewrite subqueries that have the same range and resolution (#10445)
* Rewrite subqueries that have the same range and resolution Fix queries that have the same range and resolution since this will only return a single point when using Prometheus 3 selectors, left open and right closed (compared to Prometheus 2 left closed and right closed). * Changelog Signed-off-by: Nick Pillitteri <[email protected]> --------- Signed-off-by: Nick Pillitteri <[email protected]>
- Loading branch information
1 parent
40e2e5d
commit b812677
Showing
13 changed files
with
326 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
44 changes: 44 additions & 0 deletions
44
pkg/frontend/querymiddleware/astmapper/prom2_range_compat.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package astmapper | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/prometheus/prometheus/promql/parser" | ||
) | ||
|
||
// NewProm2RangeCompat creates a new ASTMapper which modifies the range of subqueries | ||
// with identical ranges and steps (which used to returns results in Prometheus 2 since | ||
// range selectors were left closed right closed) to be compatible with Prometheus 3 | ||
// range selectors which are left open right closed. | ||
func NewProm2RangeCompat(ctx context.Context) ASTMapper { | ||
compat := &prom2RangeCompat{ctx: ctx} | ||
return NewASTExprMapper(compat) | ||
} | ||
|
||
type prom2RangeCompat struct { | ||
ctx context.Context | ||
} | ||
|
||
func (c prom2RangeCompat) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) { | ||
if err := c.ctx.Err(); err != nil { | ||
return nil, false, err | ||
} | ||
|
||
e, ok := expr.(*parser.SubqueryExpr) | ||
if !ok { | ||
return expr, false, nil | ||
} | ||
|
||
// Due to range selectors being left open right closed in Prometheus 3, subqueries with identical | ||
// range and step will only select a single datapoint which breaks functions that need multiple | ||
// points (rate, increase). Adjust the range here slightly to ensure that multiple data points | ||
// are returned to match the Prometheus 2 behavior. | ||
if e.Range == e.Step { | ||
e.Range = e.Range + time.Millisecond | ||
} | ||
|
||
return e, false, nil | ||
} |
64 changes: 64 additions & 0 deletions
64
pkg/frontend/querymiddleware/astmapper/prom2_range_compat_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package astmapper | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/prometheus/prometheus/promql/parser" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestProm2RangeCompat_Cancellation(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
cancel() | ||
|
||
query, _ := parser.ParseExpr(`up{foo="bar"}`) | ||
mapper := NewProm2RangeCompat(ctx) | ||
_, err := mapper.Map(query) | ||
|
||
require.ErrorIs(t, err, context.Canceled) | ||
} | ||
|
||
func TestProm2RangeCompat_Queries(t *testing.T) { | ||
type testCase struct { | ||
query string | ||
expectedQuery string | ||
} | ||
|
||
testCases := []testCase{ | ||
{ | ||
query: `sum(rate(some_series{job="foo"}[1m]))`, | ||
expectedQuery: `sum(rate(some_series{job="foo"}[1m]))`, | ||
}, | ||
{ | ||
query: `sum(rate(some_series{job="foo"}[1m:1m]))`, | ||
expectedQuery: `sum(rate(some_series{job="foo"}[1m1ms:1m]))`, | ||
}, | ||
{ | ||
query: `sum(rate(some_series{job="foo"}[1h]))`, | ||
expectedQuery: `sum(rate(some_series{job="foo"}[1h]))`, | ||
}, | ||
{ | ||
query: `sum(rate(some_series{job="foo"}[1h:1h]))`, | ||
expectedQuery: `sum(rate(some_series{job="foo"}[1h1ms:1h]))`, | ||
}, | ||
{ | ||
query: `sum(rate(some_series{job="foo"}[1h:1h])) / sum(rate(other_series{job="foo"}[1m:1m]))`, | ||
expectedQuery: `sum(rate(some_series{job="foo"}[1h1ms:1h])) / sum(rate(other_series{job="foo"}[1m1ms:1m]))`, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.query, func(t *testing.T) { | ||
query, err := parser.ParseExpr(tc.query) | ||
require.NoError(t, err) | ||
|
||
mapper := NewProm2RangeCompat(context.Background()) | ||
mapped, err := mapper.Map(query) | ||
require.NoError(t, err) | ||
require.Equal(t, tc.expectedQuery, mapped.String()) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package querymiddleware | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/grafana/dskit/tenant" | ||
"github.com/prometheus/prometheus/promql/parser" | ||
|
||
apierror "github.com/grafana/mimir/pkg/api/error" | ||
"github.com/grafana/mimir/pkg/frontend/querymiddleware/astmapper" | ||
"github.com/grafana/mimir/pkg/util/spanlogger" | ||
"github.com/grafana/mimir/pkg/util/validation" | ||
) | ||
|
||
// newProm2RangeCompatMiddleware creates a new query middleware that adjusts range and resolution | ||
// selectors in subqueries in some cases for compatibility with Prometheus 3. | ||
func newProm2RangeCompatMiddleware(limits Limits, logger log.Logger) MetricsQueryMiddleware { | ||
return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { | ||
return &prom2RangeCompatHandler{ | ||
next: next, | ||
limits: limits, | ||
logger: logger, | ||
} | ||
}) | ||
} | ||
|
||
type prom2RangeCompatHandler struct { | ||
next MetricsQueryHandler | ||
limits Limits | ||
logger log.Logger | ||
} | ||
|
||
func (c *prom2RangeCompatHandler) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) { | ||
spanLog := spanlogger.FromContext(ctx, c.logger) | ||
tenantIDs, err := tenant.TenantIDs(ctx) | ||
if err != nil { | ||
return c.next.Do(ctx, r) | ||
} | ||
|
||
if !validation.AllTrueBooleansPerTenant(tenantIDs, c.limits.Prom2RangeCompat) { | ||
return c.next.Do(ctx, r) | ||
} | ||
|
||
origQuery := r.GetQuery() | ||
rewritten, err := c.rewrite(ctx, origQuery) | ||
if err != nil { | ||
return c.next.Do(ctx, r) | ||
} | ||
|
||
rewrittenQuery := rewritten.String() | ||
if origQuery != rewrittenQuery { | ||
spanLog.DebugLog( | ||
"msg", "modified subquery for compatibility with Prometheus 3 range selectors", | ||
"original", origQuery, | ||
"rewritten", rewrittenQuery, | ||
) | ||
r, _ = r.WithExpr(rewritten) | ||
} | ||
|
||
return c.next.Do(ctx, r) | ||
} | ||
|
||
func (c *prom2RangeCompatHandler) rewrite(ctx context.Context, query string) (parser.Expr, error) { | ||
expr, err := parser.ParseExpr(query) | ||
if err != nil { | ||
return nil, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error()) | ||
} | ||
|
||
mapper := astmapper.NewProm2RangeCompat(ctx) | ||
rewritten, err := mapper.Map(expr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return rewritten, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package querymiddleware | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/grafana/dskit/user" | ||
"github.com/stretchr/testify/mock" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestProm2RangeCompat_Do(t *testing.T) { | ||
newRangeRequest := func(q string) *PrometheusRangeQueryRequest { | ||
return &PrometheusRangeQueryRequest{ | ||
queryExpr: parseQuery(t, q), | ||
start: 100, | ||
end: 200, | ||
step: 10, | ||
} | ||
} | ||
|
||
runHandler := func(ctx context.Context, inner MetricsQueryHandler, limits Limits, req MetricsQueryRequest) (Response, error) { | ||
middleware := newProm2RangeCompatMiddleware(limits, log.NewNopLogger()) | ||
handler := middleware.Wrap(inner) | ||
return handler.Do(ctx, req) | ||
} | ||
|
||
t.Run("no user set", func(t *testing.T) { | ||
ctx := context.Background() | ||
req := newRangeRequest("sum(rate(some_series[1m:1m]))") | ||
|
||
innerRes := newEmptyPrometheusResponse() | ||
inner := &mockHandler{} | ||
inner.On("Do", mock.Anything, req).Return(innerRes, nil) | ||
|
||
limits := &mockLimits{prom2RangeCompat: true} | ||
res, err := runHandler(ctx, inner, limits, req) | ||
|
||
require.NoError(t, err) | ||
require.Equal(t, innerRes, res) | ||
}) | ||
|
||
t.Run("disabled by config", func(t *testing.T) { | ||
ctx := user.InjectOrgID(context.Background(), "1234") | ||
req := newRangeRequest("sum(rate(some_series[1m:1m]))") | ||
|
||
innerRes := newEmptyPrometheusResponse() | ||
inner := &mockHandler{} | ||
inner.On("Do", mock.Anything, req).Return(innerRes, nil) | ||
|
||
limits := &mockLimits{prom2RangeCompat: false} | ||
res, err := runHandler(ctx, inner, limits, req) | ||
|
||
require.NoError(t, err) | ||
require.Equal(t, innerRes, res) | ||
}) | ||
|
||
t.Run("no resolution", func(t *testing.T) { | ||
ctx := user.InjectOrgID(context.Background(), "1234") | ||
req := newRangeRequest("sum(rate(some_series[1m]))") | ||
|
||
innerRes := newEmptyPrometheusResponse() | ||
inner := &mockHandler{} | ||
inner.On("Do", mock.Anything, req).Return(innerRes, nil) | ||
|
||
limits := &mockLimits{prom2RangeCompat: true} | ||
res, err := runHandler(ctx, inner, limits, req) | ||
|
||
require.NoError(t, err) | ||
require.Equal(t, innerRes, res) | ||
}) | ||
|
||
t.Run("query rewritten", func(t *testing.T) { | ||
ctx := user.InjectOrgID(context.Background(), "1234") | ||
orig := newRangeRequest("sum(rate(some_series[1m:1m]))") | ||
rewritten := newRangeRequest("sum(rate(some_series[1m1ms:1m]))") | ||
|
||
innerRes := newEmptyPrometheusResponse() | ||
inner := &mockHandler{} | ||
inner.On("Do", mock.Anything, mock.MatchedBy(func(req MetricsQueryRequest) bool { | ||
return req.GetQuery() == rewritten.GetQuery() | ||
})).Return(innerRes, nil) | ||
|
||
limits := &mockLimits{prom2RangeCompat: true} | ||
res, err := runHandler(ctx, inner, limits, orig) | ||
|
||
require.NoError(t, err) | ||
require.Equal(t, innerRes, res) | ||
}) | ||
} |
Oops, something went wrong.