Skip to content

Commit

Permalink
MQE: add support for group_left and group_right (aka many-to-one …
Browse files Browse the repository at this point in the history
…and one-to-many matching) (#10119)

* Add feature toggle

# Conflicts:
#	cmd/mimir/config-descriptor.json
#	cmd/mimir/help-all.txt.tmpl
#	docs/sources/mimir/configure/configuration-parameters/index.md
#	pkg/streamingpromql/config.go
#	pkg/streamingpromql/engine_test.go

* Enable upstream test cases

* Remove comments about and/or/unless.

* Add some test cases

* Make condition clearer

* Add comparison operation edge cases

* Add tests for comparison operators with group_left / group_right

* Update tests to reflect prometheus/prometheus#15471

* Expand test to confirm label handling behaviour

* Rename existing operator

* Add structure for new operator

* Initial implementation of SeriesMetadata

* Initial implementation

* Handle case where one set of many-side series many to many output series

* Use correct labels when grouping one side series

* Return a conflict error message if there are multiple samples at the same timestamp on the same "one" side for the same group with different additional labels

* Fix handling of cases where additional labels appear on the "many" side

* Return a non-misleading error message when a conflict occurs on the "many" side

* Update comments

* Fix regression in comparison operation output labels

* Disable one-to-one comparison operation cases that fail for known reasons

* Fix linting warnings and simplify `computeOutputSeries()`

* Add tests for annotations

* Add tests for case where additional labels are not present on series on either side

* Add series sorting test

* Add provenance comment

* Add benchmark

* Expand comments

* Fix typo in test names

* Add test cases with label names in different orders

* Add some test cases with native histograms

* Ensure buffers passed to labels.Labels.Bytes(), BytesWithLabels() and BytesWithoutLabels() are reused if resized

* Address PR feedback: use minimal number of points for binary operation slice

* Address PR feedback: rename `latestSeries` to `latestSeriesIndex`

* Address PR feedback: add docstring for `updatePresence`

* Address PR feedback: try to reuse slices in more cases

* Run mixed metrics tests in parallel

* Add `group_left` to mixed metrics tests

* Address PR feedback: refactor `vectorVectorBinaryOperationEvaluator.computeResult` to reduce nesting
  • Loading branch information
charleskorn authored Jan 5, 2025
1 parent 2a4ba9b commit 9e94f1c
Show file tree
Hide file tree
Showing 17 changed files with 2,182 additions and 380 deletions.
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2043,6 +2043,17 @@
"fieldFlag": "querier.mimir-query-engine.enable-histogram-quantile-function",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_one_to_many_and_many_to_one_binary_operations",
"required": false,
"desc": "Enable support for one-to-many and many-to-one binary operations (group_left/group_right) in the Mimir query engine. Only applies if the MQE is in use.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.mimir-query-engine.enable-one-to-many-and-many-to-one-binary-operations",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Enable support for binary logical operations in the Mimir query engine. Only applies if the MQE is in use. (default true)
-querier.mimir-query-engine.enable-histogram-quantile-function
[experimental] Enable support for the histogram_quantile function in the Mimir query engine. Only applies if the MQE is in use. (default true)
-querier.mimir-query-engine.enable-one-to-many-and-many-to-one-binary-operations
[experimental] Enable support for one-to-many and many-to-one binary operations (group_left/group_right) in the Mimir query engine. Only applies if the MQE is in use. (default true)
-querier.mimir-query-engine.enable-scalar-scalar-binary-comparison-operations
[experimental] Enable support for binary comparison operations between two scalars in the Mimir query engine. Only applies if the MQE is in use. (default true)
-querier.mimir-query-engine.enable-scalars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1531,6 +1531,12 @@ mimir_query_engine:
# Mimir query engine. Only applies if the MQE is in use.
# CLI flag: -querier.mimir-query-engine.enable-histogram-quantile-function
[enable_histogram_quantile_function: <boolean> | default = true]
# (experimental) Enable support for one-to-many and many-to-one binary
# operations (group_left/group_right) in the Mimir query engine. Only applies
# if the MQE is in use.
# CLI flag: -querier.mimir-query-engine.enable-one-to-many-and-many-to-one-binary-operations
[enable_one_to_many_and_many_to_one_binary_operations: <boolean> | default = true]
```

### frontend
Expand Down
3 changes: 3 additions & 0 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "nh_X / 2",
},
{
Expr: "h_X * on(l) group_left() a_X",
},
// Test the case where one side of a binary operation has many more series than the other.
{
Expr: `a_100{l=~"[13579]."} - b_100`,
Expand Down
3 changes: 3 additions & 0 deletions pkg/streamingpromql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type FeatureToggles struct {
EnableScalars bool `yaml:"enable_scalars" category:"experimental"`
EnableSubqueries bool `yaml:"enable_subqueries" category:"experimental"`
EnableHistogramQuantileFunction bool `yaml:"enable_histogram_quantile_function" category:"experimental"`
EnableOneToManyAndManyToOneBinaryOperations bool `yaml:"enable_one_to_many_and_many_to_one_binary_operations" category:"experimental"`
}

// EnableAllFeatures enables all features supported by MQE, including experimental or incomplete features.
Expand All @@ -39,6 +40,7 @@ var EnableAllFeatures = FeatureToggles{
true,
true,
true,
true,
}

func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -50,4 +52,5 @@ func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&t.EnableScalars, "querier.mimir-query-engine.enable-scalars", true, "Enable support for scalars in the Mimir query engine. Only applies if the MQE is in use.")
f.BoolVar(&t.EnableSubqueries, "querier.mimir-query-engine.enable-subqueries", true, "Enable support for subqueries in the Mimir query engine. Only applies if the MQE is in use.")
f.BoolVar(&t.EnableHistogramQuantileFunction, "querier.mimir-query-engine.enable-histogram-quantile-function", true, "Enable support for the histogram_quantile function in the Mimir query engine. Only applies if the MQE is in use.")
f.BoolVar(&t.EnableOneToManyAndManyToOneBinaryOperations, "querier.mimir-query-engine.enable-one-to-many-and-many-to-one-binary-operations", true, "Enable support for one-to-many and many-to-one binary operations (group_left/group_right) in the Mimir query engine. Only applies if the MQE is in use.")
}
82 changes: 61 additions & 21 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,10 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
"topk(5, metric{})": "'topk' aggregation with parameter",
`count_values("foo", metric{})`: "'count_values' aggregation with parameter",
"quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function",
"quantile(0.95, metric{})": "'quantile' aggregation with parameter",
"topk(5, metric{})": "'topk' aggregation with parameter",
`count_values("foo", metric{})`: "'count_values' aggregation with parameter",
"quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function",
"quantile(0.95, metric{})": "'quantile' aggregation with parameter",
}

for expression, expectedError := range unsupportedExpressions {
Expand Down Expand Up @@ -157,12 +155,20 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) {
requireQueryIsUnsupported(t, featureToggles, "sum_over_time(metric[1m:10s])", "subquery")
})

t.Run("classic histograms", func(t *testing.T) {
t.Run("histogram_quantile function", func(t *testing.T) {
featureToggles := EnableAllFeatures
featureToggles.EnableHistogramQuantileFunction = false

requireQueryIsUnsupported(t, featureToggles, "histogram_quantile(0.5, metric)", "'histogram_quantile' function")
})

t.Run("one-to-many and many-to-one binary operations", func(t *testing.T) {
featureToggles := EnableAllFeatures
featureToggles.EnableOneToManyAndManyToOneBinaryOperations = false

requireQueryIsUnsupported(t, featureToggles, "metric{} + on() group_left() other_metric{}", "binary expression with many-to-one matching")
requireQueryIsUnsupported(t, featureToggles, "metric{} + on() group_right() other_metric{}", "binary expression with one-to-many matching")
})
}

func requireQueryIsUnsupported(t *testing.T, toggles FeatureToggles, expression string, expectedError string) {
Expand Down Expand Up @@ -2435,6 +2441,12 @@ func TestBinaryOperationAnnotations(t *testing.T) {
testCases[name] = testCase
}

cardinalities := map[string]string{
"one-to-one": "",
"many-to-one": "group_left",
"one-to-many": "group_right",
}

for op, binop := range binaryOperations {
expressions := []string{op}

Expand All @@ -2443,14 +2455,18 @@ func TestBinaryOperationAnnotations(t *testing.T) {
}

for _, expr := range expressions {
addBinopTestCase(op, fmt.Sprintf("binary %v between two floats", expr), fmt.Sprintf(`metric{type="float"} %v ignoring(type) metric{type="float"}`, expr), "float", "float", true)
addBinopTestCase(op, fmt.Sprintf("binary %v between a float on the left side and a histogram on the right", expr), fmt.Sprintf(`metric{type="float"} %v ignoring(type) metric{type="histogram"}`, expr), "float", "histogram", binop.floatHistogramSupported)
addBinopTestCase(op, fmt.Sprintf("binary %v between a scalar on the left side and a histogram on the right", expr), fmt.Sprintf(`2 %v metric{type="histogram"}`, expr), "float", "histogram", binop.floatHistogramSupported)
addBinopTestCase(op, fmt.Sprintf("binary %v between a histogram on the left side and a float on the right", expr), fmt.Sprintf(`metric{type="histogram"} %v ignoring(type) metric{type="float"}`, expr), "histogram", "float", binop.histogramFloatSupported)
addBinopTestCase(op, fmt.Sprintf("binary %v between a histogram on the left side and a scalar on the right", expr), fmt.Sprintf(`metric{type="histogram"} %v 2`, expr), "histogram", "float", binop.histogramFloatSupported)
addBinopTestCase(op, fmt.Sprintf("binary %v between two histograms", expr), fmt.Sprintf(`metric{type="histogram"} %v ignoring(type) metric{type="histogram"}`, expr), "histogram", "histogram", binop.histogramHistogramSupported)

for cardinalityName, cardinalityModifier := range cardinalities {
addBinopTestCase(op, fmt.Sprintf("binary %v between two floats with %v matching", expr, cardinalityName), fmt.Sprintf(`metric{type="float"} %v ignoring(type) %v metric{type="float"}`, expr, cardinalityModifier), "float", "float", true)
addBinopTestCase(op, fmt.Sprintf("binary %v between a float on the left side and a histogram on the right with %v matching", expr, cardinalityName), fmt.Sprintf(`metric{type="float"} %v ignoring(type) %v metric{type="histogram"}`, expr, cardinalityModifier), "float", "histogram", binop.floatHistogramSupported)
addBinopTestCase(op, fmt.Sprintf("binary %v between a histogram on the left side and a float on the right with %v matching", expr, cardinalityName), fmt.Sprintf(`metric{type="histogram"} %v ignoring(type) %v metric{type="float"}`, expr, cardinalityModifier), "histogram", "float", binop.histogramFloatSupported)
addBinopTestCase(op, fmt.Sprintf("binary %v between two histograms with %v matching", expr, cardinalityName), fmt.Sprintf(`metric{type="histogram"} %v ignoring(type) %v metric{type="histogram"}`, expr, cardinalityModifier), "histogram", "histogram", binop.histogramHistogramSupported)
}
}
}

runAnnotationTests(t, testCases)
}

Expand Down Expand Up @@ -2649,6 +2665,8 @@ func runMixedMetricsTests(t *testing.T, expressions []string, pointsPerSeries in
}

func TestCompareVariousMixedMetricsFunctions(t *testing.T) {
t.Parallel()

labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(true)

// Test each label individually to catch edge cases in with single series
Expand Down Expand Up @@ -2682,6 +2700,8 @@ func TestCompareVariousMixedMetricsFunctions(t *testing.T) {
}

func TestCompareVariousMixedMetricsBinaryOperations(t *testing.T) {
t.Parallel()

labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(false)

// Generate combinations of 2 and 3 labels. (e.g., "a,b", "e,f", "c,d,e" etc)
Expand All @@ -2692,36 +2712,52 @@ func TestCompareVariousMixedMetricsBinaryOperations(t *testing.T) {

for _, labels := range labelCombinations {
for _, op := range []string{"+", "-", "*", "/", "and", "unless", "or"} {
binaryExpr := fmt.Sprintf(`series{label="%s"}`, labels[0])
expr := fmt.Sprintf(`series{label="%s"}`, labels[0])
for _, label := range labels[1:] {
binaryExpr += fmt.Sprintf(` %s series{label="%s"}`, op, label)
expr += fmt.Sprintf(` %s series{label="%s"}`, op, label)
}
expressions = append(expressions, binaryExpr)
expressions = append(expressions, expr)

// Same thing again, this time with grouping.
binaryExpr = fmt.Sprintf(`series{label="%s"}`, labels[0])
expr = fmt.Sprintf(`series{label="%s"}`, labels[0])
for i, label := range labels[1:] {
binaryExpr += fmt.Sprintf(` %s ignoring (label, group) `, op)
expr += fmt.Sprintf(` %s ignoring (label, group) `, op)

if i == 0 && len(labels) > 2 {
binaryExpr += "("
expr += "("
}

binaryExpr += fmt.Sprintf(`{label="%s"}`, label)
expr += fmt.Sprintf(`{label="%s"}`, label)
}

if len(labels) > 2 {
binaryExpr += ")"
expr += ")"
}
expressions = append(expressions, expr)
}

// Similar thing again, this time with group_left
expr := fmt.Sprintf(`series{label="%s"}`, labels[0])
for i, label := range labels[1:] {
expr += ` * on(group) group_left(label) `

if i == 0 && len(labels) > 2 {
expr += "("
}

expressions = append(expressions, binaryExpr)
expr += fmt.Sprintf(`{label="%s"}`, label)
}
if len(labels) > 2 {
expr += ")"
}
expressions = append(expressions, expr)
}

runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false)
}

func TestCompareVariousMixedMetricsAggregations(t *testing.T) {
t.Parallel()

labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(true)

// Test each label individually to catch edge cases in with single series
Expand Down Expand Up @@ -2750,6 +2786,8 @@ func TestCompareVariousMixedMetricsAggregations(t *testing.T) {
}

func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {
t.Parallel()

labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(true)

// Test each label individually to catch edge cases in with single series
Expand All @@ -2775,6 +2813,8 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {
}

func TestCompareVariousMixedMetricsComparisonOps(t *testing.T) {
t.Parallel()

labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(true)

// Test each label individually to catch edge cases in with single series
Expand Down
6 changes: 4 additions & 2 deletions pkg/streamingpromql/operators/aggregations/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ func (a *Aggregation) groupingWithoutLabelsSeriesToGroupFuncs() (seriesToGroupLa
// Why 1024 bytes? It's what labels.Labels.String() uses as a buffer size, so we use that as a sensible starting point too.
b := make([]byte, 0, 1024)
bytesFunc := func(l labels.Labels) []byte {
return l.BytesWithoutLabels(b, a.Grouping...) // NewAggregation will add __name__ to Grouping for 'without' aggregations, so no need to add it here.
b = l.BytesWithoutLabels(b, a.Grouping...) // NewAggregation will add __name__ to Grouping for 'without' aggregations, so no need to add it here.
return b
}

lb := labels.NewBuilder(labels.EmptyLabels())
Expand All @@ -231,7 +232,8 @@ func (a *Aggregation) groupingByLabelsSeriesToGroupFuncs() (seriesToGroupLabelsB
// Why 1024 bytes? It's what labels.Labels.String() uses as a buffer size, so we use that as a sensible starting point too.
b := make([]byte, 0, 1024)
bytesFunc := func(l labels.Labels) []byte {
return l.BytesWithLabels(b, a.Grouping...)
b = l.BytesWithLabels(b, a.Grouping...)
return b
}

lb := labels.NewBuilder(labels.EmptyLabels())
Expand Down
Loading

0 comments on commit 9e94f1c

Please sign in to comment.