diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index dcc0ebfb925..7de981485cc 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2043,6 +2043,17 @@ "fieldFlag": "querier.mimir-query-engine.enable-histogram-quantile-function", "fieldType": "boolean", "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "enable_one_to_many_and_many_to_one_binary_operations", + "required": false, + "desc": "Enable support for one-to-many and many-to-one binary operations (group_left/group_right) in the Mimir query engine. Only applies if the MQE is in use.", + "fieldValue": null, + "fieldDefaultValue": true, + "fieldFlag": "querier.mimir-query-engine.enable-one-to-many-and-many-to-one-binary-operations", + "fieldType": "boolean", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index bd0256e3f6c..40b8e586b76 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2103,6 +2103,8 @@ Usage of ./cmd/mimir/mimir: [experimental] Enable support for binary logical operations in the Mimir query engine. Only applies if the MQE is in use. (default true) -querier.mimir-query-engine.enable-histogram-quantile-function [experimental] Enable support for the histogram_quantile function in the Mimir query engine. Only applies if the MQE is in use. (default true) + -querier.mimir-query-engine.enable-one-to-many-and-many-to-one-binary-operations + [experimental] Enable support for one-to-many and many-to-one binary operations (group_left/group_right) in the Mimir query engine. Only applies if the MQE is in use. (default true) -querier.mimir-query-engine.enable-scalar-scalar-binary-comparison-operations [experimental] Enable support for binary comparison operations between two scalars in the Mimir query engine. Only applies if the MQE is in use. (default true) -querier.mimir-query-engine.enable-scalars diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index dfd84fbca61..284442448ca 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1531,6 +1531,12 @@ mimir_query_engine: # Mimir query engine. Only applies if the MQE is in use. # CLI flag: -querier.mimir-query-engine.enable-histogram-quantile-function [enable_histogram_quantile_function: | default = true] + + # (experimental) Enable support for one-to-many and many-to-one binary + # operations (group_left/group_right) in the Mimir query engine. Only applies + # if the MQE is in use. + # CLI flag: -querier.mimir-query-engine.enable-one-to-many-and-many-to-one-binary-operations + [enable_one_to_many_and_many_to_one_binary_operations: | default = true] ``` ### frontend diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index fc018f4a2e4..597c2704b37 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -164,6 +164,9 @@ func TestCases(metricSizes []int) []BenchCase { { Expr: "nh_X / 2", }, + { + Expr: "h_X * on(l) group_left() a_X", + }, // Test the case where one side of a binary operation has many more series than the other. { Expr: `a_100{l=~"[13579]."} - b_100`, diff --git a/pkg/streamingpromql/config.go b/pkg/streamingpromql/config.go index 279ed62c96f..3f241e7ecd4 100644 --- a/pkg/streamingpromql/config.go +++ b/pkg/streamingpromql/config.go @@ -26,6 +26,7 @@ type FeatureToggles struct { EnableScalars bool `yaml:"enable_scalars" category:"experimental"` EnableSubqueries bool `yaml:"enable_subqueries" category:"experimental"` EnableHistogramQuantileFunction bool `yaml:"enable_histogram_quantile_function" category:"experimental"` + EnableOneToManyAndManyToOneBinaryOperations bool `yaml:"enable_one_to_many_and_many_to_one_binary_operations" category:"experimental"` } // EnableAllFeatures enables all features supported by MQE, including experimental or incomplete features. @@ -39,6 +40,7 @@ var EnableAllFeatures = FeatureToggles{ true, true, true, + true, } func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) { @@ -50,4 +52,5 @@ func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&t.EnableScalars, "querier.mimir-query-engine.enable-scalars", true, "Enable support for scalars in the Mimir query engine. Only applies if the MQE is in use.") f.BoolVar(&t.EnableSubqueries, "querier.mimir-query-engine.enable-subqueries", true, "Enable support for subqueries in the Mimir query engine. Only applies if the MQE is in use.") f.BoolVar(&t.EnableHistogramQuantileFunction, "querier.mimir-query-engine.enable-histogram-quantile-function", true, "Enable support for the histogram_quantile function in the Mimir query engine. Only applies if the MQE is in use.") + f.BoolVar(&t.EnableOneToManyAndManyToOneBinaryOperations, "querier.mimir-query-engine.enable-one-to-many-and-many-to-one-binary-operations", true, "Enable support for one-to-many and many-to-one binary operations (group_left/group_right) in the Mimir query engine. Only applies if the MQE is in use.") } diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 99ee5664c25..4f0751fa5b6 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -50,12 +50,10 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // The goal of this is not to list every conceivable expression that is unsupported, but to cover all the // different cases and make sure we produce a reasonable error message when these cases are encountered. unsupportedExpressions := map[string]string{ - "metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching", - "metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching", - "topk(5, metric{})": "'topk' aggregation with parameter", - `count_values("foo", metric{})`: "'count_values' aggregation with parameter", - "quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function", - "quantile(0.95, metric{})": "'quantile' aggregation with parameter", + "topk(5, metric{})": "'topk' aggregation with parameter", + `count_values("foo", metric{})`: "'count_values' aggregation with parameter", + "quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function", + "quantile(0.95, metric{})": "'quantile' aggregation with parameter", } for expression, expectedError := range unsupportedExpressions { @@ -157,12 +155,20 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) { requireQueryIsUnsupported(t, featureToggles, "sum_over_time(metric[1m:10s])", "subquery") }) - t.Run("classic histograms", func(t *testing.T) { + t.Run("histogram_quantile function", func(t *testing.T) { featureToggles := EnableAllFeatures featureToggles.EnableHistogramQuantileFunction = false requireQueryIsUnsupported(t, featureToggles, "histogram_quantile(0.5, metric)", "'histogram_quantile' function") }) + + t.Run("one-to-many and many-to-one binary operations", func(t *testing.T) { + featureToggles := EnableAllFeatures + featureToggles.EnableOneToManyAndManyToOneBinaryOperations = false + + requireQueryIsUnsupported(t, featureToggles, "metric{} + on() group_left() other_metric{}", "binary expression with many-to-one matching") + requireQueryIsUnsupported(t, featureToggles, "metric{} + on() group_right() other_metric{}", "binary expression with one-to-many matching") + }) } func requireQueryIsUnsupported(t *testing.T, toggles FeatureToggles, expression string, expectedError string) { @@ -2435,6 +2441,12 @@ func TestBinaryOperationAnnotations(t *testing.T) { testCases[name] = testCase } + cardinalities := map[string]string{ + "one-to-one": "", + "many-to-one": "group_left", + "one-to-many": "group_right", + } + for op, binop := range binaryOperations { expressions := []string{op} @@ -2443,14 +2455,18 @@ func TestBinaryOperationAnnotations(t *testing.T) { } for _, expr := range expressions { - addBinopTestCase(op, fmt.Sprintf("binary %v between two floats", expr), fmt.Sprintf(`metric{type="float"} %v ignoring(type) metric{type="float"}`, expr), "float", "float", true) - addBinopTestCase(op, fmt.Sprintf("binary %v between a float on the left side and a histogram on the right", expr), fmt.Sprintf(`metric{type="float"} %v ignoring(type) metric{type="histogram"}`, expr), "float", "histogram", binop.floatHistogramSupported) addBinopTestCase(op, fmt.Sprintf("binary %v between a scalar on the left side and a histogram on the right", expr), fmt.Sprintf(`2 %v metric{type="histogram"}`, expr), "float", "histogram", binop.floatHistogramSupported) - addBinopTestCase(op, fmt.Sprintf("binary %v between a histogram on the left side and a float on the right", expr), fmt.Sprintf(`metric{type="histogram"} %v ignoring(type) metric{type="float"}`, expr), "histogram", "float", binop.histogramFloatSupported) addBinopTestCase(op, fmt.Sprintf("binary %v between a histogram on the left side and a scalar on the right", expr), fmt.Sprintf(`metric{type="histogram"} %v 2`, expr), "histogram", "float", binop.histogramFloatSupported) - addBinopTestCase(op, fmt.Sprintf("binary %v between two histograms", expr), fmt.Sprintf(`metric{type="histogram"} %v ignoring(type) metric{type="histogram"}`, expr), "histogram", "histogram", binop.histogramHistogramSupported) + + for cardinalityName, cardinalityModifier := range cardinalities { + addBinopTestCase(op, fmt.Sprintf("binary %v between two floats with %v matching", expr, cardinalityName), fmt.Sprintf(`metric{type="float"} %v ignoring(type) %v metric{type="float"}`, expr, cardinalityModifier), "float", "float", true) + addBinopTestCase(op, fmt.Sprintf("binary %v between a float on the left side and a histogram on the right with %v matching", expr, cardinalityName), fmt.Sprintf(`metric{type="float"} %v ignoring(type) %v metric{type="histogram"}`, expr, cardinalityModifier), "float", "histogram", binop.floatHistogramSupported) + addBinopTestCase(op, fmt.Sprintf("binary %v between a histogram on the left side and a float on the right with %v matching", expr, cardinalityName), fmt.Sprintf(`metric{type="histogram"} %v ignoring(type) %v metric{type="float"}`, expr, cardinalityModifier), "histogram", "float", binop.histogramFloatSupported) + addBinopTestCase(op, fmt.Sprintf("binary %v between two histograms with %v matching", expr, cardinalityName), fmt.Sprintf(`metric{type="histogram"} %v ignoring(type) %v metric{type="histogram"}`, expr, cardinalityModifier), "histogram", "histogram", binop.histogramHistogramSupported) + } } } + runAnnotationTests(t, testCases) } @@ -2649,6 +2665,8 @@ func runMixedMetricsTests(t *testing.T, expressions []string, pointsPerSeries in } func TestCompareVariousMixedMetricsFunctions(t *testing.T) { + t.Parallel() + labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(true) // Test each label individually to catch edge cases in with single series @@ -2682,6 +2700,8 @@ func TestCompareVariousMixedMetricsFunctions(t *testing.T) { } func TestCompareVariousMixedMetricsBinaryOperations(t *testing.T) { + t.Parallel() + labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(false) // Generate combinations of 2 and 3 labels. (e.g., "a,b", "e,f", "c,d,e" etc) @@ -2692,36 +2712,52 @@ func TestCompareVariousMixedMetricsBinaryOperations(t *testing.T) { for _, labels := range labelCombinations { for _, op := range []string{"+", "-", "*", "/", "and", "unless", "or"} { - binaryExpr := fmt.Sprintf(`series{label="%s"}`, labels[0]) + expr := fmt.Sprintf(`series{label="%s"}`, labels[0]) for _, label := range labels[1:] { - binaryExpr += fmt.Sprintf(` %s series{label="%s"}`, op, label) + expr += fmt.Sprintf(` %s series{label="%s"}`, op, label) } - expressions = append(expressions, binaryExpr) + expressions = append(expressions, expr) // Same thing again, this time with grouping. - binaryExpr = fmt.Sprintf(`series{label="%s"}`, labels[0]) + expr = fmt.Sprintf(`series{label="%s"}`, labels[0]) for i, label := range labels[1:] { - binaryExpr += fmt.Sprintf(` %s ignoring (label, group) `, op) + expr += fmt.Sprintf(` %s ignoring (label, group) `, op) if i == 0 && len(labels) > 2 { - binaryExpr += "(" + expr += "(" } - binaryExpr += fmt.Sprintf(`{label="%s"}`, label) + expr += fmt.Sprintf(`{label="%s"}`, label) } - if len(labels) > 2 { - binaryExpr += ")" + expr += ")" + } + expressions = append(expressions, expr) + } + + // Similar thing again, this time with group_left + expr := fmt.Sprintf(`series{label="%s"}`, labels[0]) + for i, label := range labels[1:] { + expr += ` * on(group) group_left(label) ` + + if i == 0 && len(labels) > 2 { + expr += "(" } - expressions = append(expressions, binaryExpr) + expr += fmt.Sprintf(`{label="%s"}`, label) + } + if len(labels) > 2 { + expr += ")" } + expressions = append(expressions, expr) } runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false) } func TestCompareVariousMixedMetricsAggregations(t *testing.T) { + t.Parallel() + labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(true) // Test each label individually to catch edge cases in with single series @@ -2750,6 +2786,8 @@ func TestCompareVariousMixedMetricsAggregations(t *testing.T) { } func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) { + t.Parallel() + labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(true) // Test each label individually to catch edge cases in with single series @@ -2775,6 +2813,8 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) { } func TestCompareVariousMixedMetricsComparisonOps(t *testing.T) { + t.Parallel() + labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests(true) // Test each label individually to catch edge cases in with single series diff --git a/pkg/streamingpromql/operators/aggregations/aggregation.go b/pkg/streamingpromql/operators/aggregations/aggregation.go index 6575aba9629..407cdf134cb 100644 --- a/pkg/streamingpromql/operators/aggregations/aggregation.go +++ b/pkg/streamingpromql/operators/aggregations/aggregation.go @@ -212,7 +212,8 @@ func (a *Aggregation) groupingWithoutLabelsSeriesToGroupFuncs() (seriesToGroupLa // Why 1024 bytes? It's what labels.Labels.String() uses as a buffer size, so we use that as a sensible starting point too. b := make([]byte, 0, 1024) bytesFunc := func(l labels.Labels) []byte { - return l.BytesWithoutLabels(b, a.Grouping...) // NewAggregation will add __name__ to Grouping for 'without' aggregations, so no need to add it here. + b = l.BytesWithoutLabels(b, a.Grouping...) // NewAggregation will add __name__ to Grouping for 'without' aggregations, so no need to add it here. + return b } lb := labels.NewBuilder(labels.EmptyLabels()) @@ -231,7 +232,8 @@ func (a *Aggregation) groupingByLabelsSeriesToGroupFuncs() (seriesToGroupLabelsB // Why 1024 bytes? It's what labels.Labels.String() uses as a buffer size, so we use that as a sensible starting point too. b := make([]byte, 0, 1024) bytesFunc := func(l labels.Labels) []byte { - return l.BytesWithLabels(b, a.Grouping...) + b = l.BytesWithLabels(b, a.Grouping...) + return b } lb := labels.NewBuilder(labels.EmptyLabels()) diff --git a/pkg/streamingpromql/operators/binops/binary_operation.go b/pkg/streamingpromql/operators/binops/binary_operation.go index 7b603a5d3ae..b499ad4f387 100644 --- a/pkg/streamingpromql/operators/binops/binary_operation.go +++ b/pkg/streamingpromql/operators/binops/binary_operation.go @@ -3,15 +3,19 @@ package binops import ( + "fmt" "slices" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" + "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/operators/functions" "github.com/grafana/mimir/pkg/streamingpromql/types" ) @@ -25,14 +29,16 @@ func vectorMatchingGroupKeyFunc(vectorMatching parser.VectorMatching) func(label slices.Sort(vectorMatching.MatchingLabels) return func(l labels.Labels) []byte { - return l.BytesWithLabels(buf, vectorMatching.MatchingLabels...) + buf = l.BytesWithLabels(buf, vectorMatching.MatchingLabels...) + return buf } } if len(vectorMatching.MatchingLabels) == 0 { // Fast path for common case for expressions like "a + b" with no 'on' or 'without' labels. return func(l labels.Labels) []byte { - return l.BytesWithoutLabels(buf, labels.MetricName) + buf = l.BytesWithoutLabels(buf, labels.MetricName) + return buf } } @@ -42,7 +48,37 @@ func vectorMatchingGroupKeyFunc(vectorMatching parser.VectorMatching) func(label slices.Sort(lbls) return func(l labels.Labels) []byte { - return l.BytesWithoutLabels(buf, lbls...) + buf = l.BytesWithoutLabels(buf, lbls...) + return buf + } +} + +// vectorMatchingGroupLabelsFunc returns a function that computes the labels of the output group a series belongs to. +func groupLabelsFunc(vectorMatching parser.VectorMatching, op parser.ItemType, returnBool bool) func(labels.Labels) labels.Labels { + lb := labels.NewBuilder(labels.EmptyLabels()) + + if vectorMatching.On { + return func(l labels.Labels) labels.Labels { + lb.Reset(l) + lb.Keep(vectorMatching.MatchingLabels...) + return lb.Labels() + } + } + + if op.IsComparisonOperator() && !returnBool { + // If this is a comparison operator, we want to retain the metric name, as the comparison acts like a filter. + return func(l labels.Labels) labels.Labels { + lb.Reset(l) + lb.Del(vectorMatching.MatchingLabels...) + return lb.Labels() + } + } + + return func(l labels.Labels) labels.Labels { + lb.Reset(l) + lb.Del(labels.MetricName) + lb.Del(vectorMatching.MatchingLabels...) + return lb.Labels() } } @@ -115,3 +151,233 @@ func sampleTypeDescription(h *histogram.FloatHistogram) string { return "histogram" } + +type vectorVectorBinaryOperationEvaluator struct { + op parser.ItemType + opFunc binaryOperationFunc + leftIterator types.InstantVectorSeriesDataIterator + rightIterator types.InstantVectorSeriesDataIterator + memoryConsumptionTracker *limiting.MemoryConsumptionTracker + annotations *annotations.Annotations + expressionPosition posrange.PositionRange +} + +func newVectorVectorBinaryOperationEvaluator( + op parser.ItemType, + returnBool bool, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + annotations *annotations.Annotations, + expressionPosition posrange.PositionRange, +) (vectorVectorBinaryOperationEvaluator, error) { + e := vectorVectorBinaryOperationEvaluator{ + op: op, + opFunc: nil, + memoryConsumptionTracker: memoryConsumptionTracker, + annotations: annotations, + expressionPosition: expressionPosition, + } + + if returnBool { + e.opFunc = boolComparisonOperationFuncs[op] + } else { + e.opFunc = arithmeticAndComparisonOperationFuncs[op] + } + + if e.opFunc == nil { + return vectorVectorBinaryOperationEvaluator{}, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op)) + } + + return e, nil + +} + +func (e *vectorVectorBinaryOperationEvaluator) computeResult(left types.InstantVectorSeriesData, right types.InstantVectorSeriesData, takeOwnershipOfLeft bool, takeOwnershipOfRight bool) (types.InstantVectorSeriesData, error) { + var fPoints []promql.FPoint + var hPoints []promql.HPoint + + // For arithmetic and comparison operators, we'll never produce more points than the smaller input side. + // Because floats and histograms can be multiplied together, we use the sum of both the float and histogram points. + // We also don't know if the output will be exclusively floats or histograms, so we'll use the same size slice for both. + // We only assign the slices once we see the associated point type so it shouldn't be common that we allocate both. + canReturnLeftFPointSlice, canReturnLeftHPointSlice, canReturnRightFPointSlice, canReturnRightHPointSlice := takeOwnershipOfLeft, takeOwnershipOfLeft, takeOwnershipOfRight, takeOwnershipOfRight + leftPoints := len(left.Floats) + len(left.Histograms) + rightPoints := len(right.Floats) + len(right.Histograms) + minPoints := min(leftPoints, rightPoints) + + // We cannot re-use any slices when the series contain a mix of floats and histograms. + // Consider the following, where f is a float at a particular step, and h is a histogram. + // load 5m + // series1 f f f h h + // series2 h h f f h + // eval range from 0 to 25m step 5m series1 * series2 + // {} h h f h f + // We can fit the resulting 3 histograms into series2 existing slice. However, the second + // last step (index 3) produces a histogram which would be stored over the existing histogram + // at the end of series2 (also index 3). + // It should be pretty uncommon that metric contains both histograms and floats, so we will + // accept the cost of a new slice. + mixedPoints := (len(left.Floats) > 0 && len(left.Histograms) > 0) || (len(right.Floats) > 0 && len(right.Histograms) > 0) + + prepareFSlice := func() error { + canFitInLeftSide := minPoints <= cap(left.Floats) + leftSideIsSmaller := cap(left.Floats) < cap(right.Floats) + safeToReuseLeftSide := !mixedPoints && canFitInLeftSide && takeOwnershipOfLeft + canFitInRightSide := minPoints <= cap(right.Floats) + safeToReuseRightSide := !mixedPoints && canFitInRightSide && takeOwnershipOfRight + + if safeToReuseLeftSide && (leftSideIsSmaller || !safeToReuseRightSide) { + canReturnLeftFPointSlice = false + fPoints = left.Floats[:0] + return nil + } + + if safeToReuseRightSide { + canReturnRightFPointSlice = false + fPoints = right.Floats[:0] + return nil + } + + // We can't reuse either existing slice, so create a new one. + var err error + if fPoints, err = types.FPointSlicePool.Get(minPoints, e.memoryConsumptionTracker); err != nil { + return err + } + return nil + } + + prepareHSlice := func() error { + canFitInLeftSide := minPoints <= cap(left.Histograms) + leftSideIsSmaller := cap(left.Histograms) < cap(right.Histograms) + safeToReuseLeftSide := !mixedPoints && canFitInLeftSide && takeOwnershipOfLeft + canFitInRightSide := minPoints <= cap(right.Histograms) + safeToReuseRightSide := !mixedPoints && canFitInRightSide && takeOwnershipOfRight + + if safeToReuseLeftSide && (leftSideIsSmaller || !safeToReuseRightSide) { + canReturnLeftHPointSlice = false + hPoints = left.Histograms[:0] + return nil + } + + if safeToReuseRightSide { + canReturnRightHPointSlice = false + hPoints = right.Histograms[:0] + return nil + } + + // We can't reuse either existing slice, so create a new one. + var err error + if hPoints, err = types.HPointSlicePool.Get(minPoints, e.memoryConsumptionTracker); err != nil { + return err + } + return nil + } + + e.leftIterator.Reset(left) + e.rightIterator.Reset(right) + + // Get first sample from left and right + lT, lF, lH, lOk := e.leftIterator.Next() + rT, rF, rH, rOk := e.rightIterator.Next() + + appendHistogram := func(t int64, h *histogram.FloatHistogram) error { + if hPoints == nil { + if err := prepareHSlice(); err != nil { + return err + } + } + + hPoints = append(hPoints, promql.HPoint{ + H: h, + T: t, + }) + + return nil + } + + appendFloat := func(t int64, f float64) error { + if fPoints == nil { + if err := prepareFSlice(); err != nil { + return err + } + } + + fPoints = append(fPoints, promql.FPoint{ + F: f, + T: t, + }) + + return nil + } + + appendNextSample := func() error { + resultFloat, resultHist, keep, valid, err := e.opFunc(lF, rF, lH, rH) + + if err != nil { + err = functions.NativeHistogramErrorToAnnotation(err, e.emitAnnotation) + if err != nil { + return err + } + + // Else: error was converted to an annotation, continue without emitting a sample here. + keep = false + } + + if !valid { + emitIncompatibleTypesAnnotation(e.annotations, e.op, lH, rH, e.expressionPosition) + } + + if !keep { + return nil + } + + if resultHist != nil { + return appendHistogram(lT, resultHist) + } + + return appendFloat(lT, resultFloat) + } + + // Continue iterating until we exhaust either the LHS or RHS + // denoted by lOk or rOk being false. + for lOk && rOk { + if lT == rT { + // We have samples on both sides at this timestep. + if err := appendNextSample(); err != nil { + return types.InstantVectorSeriesData{}, err + } + } + + // Advance the iterator with the lower timestamp, or both if equal + if lT == rT { + lT, lF, lH, lOk = e.leftIterator.Next() + rT, rF, rH, rOk = e.rightIterator.Next() + } else if lT < rT { + lT, lF, lH, lOk = e.leftIterator.Next() + } else { + rT, rF, rH, rOk = e.rightIterator.Next() + } + } + + // Cleanup the unused slices. + if canReturnLeftFPointSlice { + types.FPointSlicePool.Put(left.Floats, e.memoryConsumptionTracker) + } + if canReturnLeftHPointSlice { + types.HPointSlicePool.Put(left.Histograms, e.memoryConsumptionTracker) + } + if canReturnRightFPointSlice { + types.FPointSlicePool.Put(right.Floats, e.memoryConsumptionTracker) + } + if canReturnRightHPointSlice { + types.HPointSlicePool.Put(right.Histograms, e.memoryConsumptionTracker) + } + + return types.InstantVectorSeriesData{ + Floats: fPoints, + Histograms: hPoints, + }, nil +} + +func (e *vectorVectorBinaryOperationEvaluator) emitAnnotation(generator types.AnnotationGenerator) { + e.annotations.Add(generator("", e.expressionPosition)) +} diff --git a/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go b/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go new file mode 100644 index 00000000000..206044c3051 --- /dev/null +++ b/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation.go @@ -0,0 +1,761 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package binops + +import ( + "context" + "errors" + "fmt" + "slices" + "sort" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/parser/posrange" + "github.com/prometheus/prometheus/util/annotations" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/operators" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +var errMultipleMatchesOnManySide = errors.New("multiple matches for labels: grouping labels must ensure unique matches") + +// GroupedVectorVectorBinaryOperation represents a one-to-many or many-to-one binary operation between instant vectors such as " + group_left " or " - group_right ". +// One-to-one binary operations between instant vectors are not supported. +type GroupedVectorVectorBinaryOperation struct { + Left types.InstantVectorOperator + Right types.InstantVectorOperator + Op parser.ItemType + ReturnBool bool + MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + + VectorMatching parser.VectorMatching + + expressionPosition posrange.PositionRange + annotations *annotations.Annotations + timeRange types.QueryTimeRange + + evaluator vectorVectorBinaryOperationEvaluator + remainingSeries []*groupedBinaryOperationOutputSeries + oneSide types.InstantVectorOperator // Either Left or Right + manySide types.InstantVectorOperator + oneSideBuffer *operators.InstantVectorOperatorBuffer + manySideBuffer *operators.InstantVectorOperatorBuffer + + // We need to retain these so that NextSeries() can return an error message with the series labels when + // multiple points match on a single side. + // Note that we don't retain the output series metadata: if we need to return an error message, we can compute + // the output series labels from these again. + oneSideMetadata []types.SeriesMetadata + manySideMetadata []types.SeriesMetadata +} + +var _ types.InstantVectorOperator = &GroupedVectorVectorBinaryOperation{} + +type groupedBinaryOperationOutputSeries struct { + manySide *manySide + oneSide *oneSide +} + +type groupedBinaryOperationOutputSeriesWithLabels struct { + labels labels.Labels + outputSeries *groupedBinaryOperationOutputSeries +} + +type manySide struct { + // If this side has not been populated, seriesIndices will not be nil and mergedData will be empty. + // If this side has been populated, seriesIndices will be nil. + seriesIndices []int + mergedData types.InstantVectorSeriesData + + outputSeriesCount int +} + +// latestSeriesIndex returns the index of the last series from this side. +// +// It assumes that seriesIndices is sorted in ascending order. +func (s manySide) latestSeriesIndex() int { + return s.seriesIndices[len(s.seriesIndices)-1] +} + +type oneSide struct { + // If this side has not been populated, seriesIndices will not be nil and mergedData will be empty. + // If this side has been populated, seriesIndices will be nil. + seriesIndices []int + mergedData types.InstantVectorSeriesData + + outputSeriesCount int // The number of output series that refer to this side. + + matchGroup *matchGroup // nil if this is the only "one" side in this group. +} + +// latestSeriesIndex returns the index of the last series from this side. +// +// It assumes that seriesIndices is sorted in ascending order. +func (s oneSide) latestSeriesIndex() int { + return s.seriesIndices[len(s.seriesIndices)-1] +} + +type matchGroup struct { + // Time steps at which we've seen samples for any "one" side in this group. + // Each value is the index of the source series of the sample, or -1 if no sample has been seen for this time step yet. + presence []int + + oneSideCount int +} + +// updatePresence records the presence of a sample from the series with index seriesIdx at the timestamp with index timestampIdx. +// +// If there is already a sample present from another series at the same timestamp, updatePresence returns that series' index, or +// -1 if there was no sample present at the same timestamp from another series. +func (g *matchGroup) updatePresence(timestampIdx int64, seriesIdx int) int { + if existing := g.presence[timestampIdx]; existing != -1 { + return existing + } + + g.presence[timestampIdx] = seriesIdx + return -1 +} + +func NewGroupedVectorVectorBinaryOperation( + left types.InstantVectorOperator, + right types.InstantVectorOperator, + vectorMatching parser.VectorMatching, + op parser.ItemType, + returnBool bool, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + annotations *annotations.Annotations, + expressionPosition posrange.PositionRange, + timeRange types.QueryTimeRange, +) (*GroupedVectorVectorBinaryOperation, error) { + e, err := newVectorVectorBinaryOperationEvaluator(op, returnBool, memoryConsumptionTracker, annotations, expressionPosition) + if err != nil { + return nil, err + } + + g := &GroupedVectorVectorBinaryOperation{ + Left: left, + Right: right, + VectorMatching: vectorMatching, + Op: op, + ReturnBool: returnBool, + MemoryConsumptionTracker: memoryConsumptionTracker, + + evaluator: e, + expressionPosition: expressionPosition, + annotations: annotations, + timeRange: timeRange, + } + + switch g.VectorMatching.Card { + case parser.CardOneToMany: + g.oneSide, g.manySide = g.Left, g.Right + case parser.CardManyToOne: + g.manySide, g.oneSide = g.Left, g.Right + default: + return nil, fmt.Errorf("unsupported cardinality '%v'", g.VectorMatching.Card) + } + + slices.Sort(g.VectorMatching.Include) + + return g, nil +} + +// SeriesMetadata returns the series expected to be produced by this operator. +// +// Note that it is possible that this method returns a series which will not have any points, as the +// list of possible output series is generated based solely on the series labels, not their data. +// +// For example, if this operator is for a range query with the expression "left_metric + right_metric", but +// left_metric has points at T=0 and T=1 in the query range, and right_metric has points at T=2 and T=3 in the +// query range, then SeriesMetadata will return a series, but NextSeries will return no points for that series. +// +// If this affects many series in the query, this may cause consuming operators to be less efficient, but in +// practice this rarely happens. +// +// (The alternative would be to compute the entire result here in SeriesMetadata and only return the series that +// contain points, but that would mean we'd need to hold the entire result in memory at once, which we want to +// avoid.) +func (g *GroupedVectorVectorBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { + if canProduceAnySeries, err := g.loadSeriesMetadata(ctx); err != nil { + return nil, err + } else if !canProduceAnySeries { + return nil, nil + } + + allMetadata, allSeries, oneSideSeriesUsed, manySideSeriesUsed, err := g.computeOutputSeries() + if err != nil { + return nil, err + } + + g.sortSeries(allMetadata, allSeries) + g.remainingSeries = allSeries + + g.oneSideBuffer = operators.NewInstantVectorOperatorBuffer(g.oneSide, oneSideSeriesUsed, g.MemoryConsumptionTracker) + g.manySideBuffer = operators.NewInstantVectorOperatorBuffer(g.manySide, manySideSeriesUsed, g.MemoryConsumptionTracker) + + return allMetadata, nil +} + +// loadSeriesMetadata loads series metadata from both sides of this operation. +// It returns false if one side returned no series and that means there is no way for this operation to return any series. +// (eg. if doing A + B and either A or B have no series, then there is no way for this operation to produce any series) +func (g *GroupedVectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Context) (bool, error) { + // We retain the series labels for later so we can use them to generate error messages. + // We'll return them to the pool in Close(). + + var err error + g.oneSideMetadata, err = g.oneSide.SeriesMetadata(ctx) + if err != nil { + return false, err + } + + if len(g.oneSideMetadata) == 0 { + // No series on left-hand side, we'll never have any output series. + return false, nil + } + + g.manySideMetadata, err = g.manySide.SeriesMetadata(ctx) + if err != nil { + return false, err + } + + if len(g.manySideMetadata) == 0 { + // No series on right-hand side, we'll never have any output series. + return false, nil + } + + return true, nil +} + +// computeOutputSeries determines the possible output series from this operator. +// It assumes oneSideMetadata and manySideMetadata have already been populated. +// +// It returns: +// - a list of all possible series this operator could return +// - a corresponding list of the source series for each output series +// - a list indicating which series from the "one" side are needed to compute the output +// - a list indicating which series from the "many" side are needed to compute the output +func (g *GroupedVectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*groupedBinaryOperationOutputSeries, []bool, []bool, error) { + groupKeyFunc := vectorMatchingGroupKeyFunc(g.VectorMatching) + + // First, iterate through all the series on the "one" side and determine all the possible groups. + // For example, if we are matching on the "env" label and "region" is an additional label, + // oneSideMap would look something like this once we're done: + // [env=test][region=au]: {...} + // [env=test][region=eu]: {...} + // [env=test][region=us]: {...} + // [env=prod][region=au]: {...} + // [env=prod][region=eu]: {...} + // [env=prod][region=us]: {...} + additionalLabelsKeyFunc := g.additionalLabelsKeyFunc() + oneSideMap := map[string]map[string]*oneSide{} + + for idx, s := range g.oneSideMetadata { + groupKey := groupKeyFunc(s.Labels) + oneSideGroup, exists := oneSideMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. + + if !exists { + oneSideGroup = map[string]*oneSide{} + oneSideMap[string(groupKey)] = oneSideGroup + } + + additionalLabelsKey := additionalLabelsKeyFunc(s.Labels) + side, exists := oneSideGroup[string(additionalLabelsKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. + + if !exists { + side = &oneSide{} + oneSideGroup[string(additionalLabelsKey)] = side + } + + side.seriesIndices = append(side.seriesIndices, idx) + } + + // Now iterate through all series on the "many" side and determine all the possible output series, as + // well as which series from the "many" side we'll actually need. + outputSeriesMap := map[string]groupedBinaryOperationOutputSeriesWithLabels{} // All output series, keyed by their labels. + manySideMap := map[string]*manySide{} // Series from the "many" side, grouped by which output series they'll contribute to. + manySideGroupKeyFunc := g.manySideGroupKeyFunc() + outputSeriesLabelsFunc := g.outputSeriesLabelsFunc() + buf := make([]byte, 0, 1024) + + manySideSeriesUsed, err := types.BoolSlicePool.Get(len(g.manySideMetadata), g.MemoryConsumptionTracker) + if err != nil { + return nil, nil, nil, nil, err + } + manySideSeriesUsed = manySideSeriesUsed[:len(g.manySideMetadata)] + + for idx, s := range g.manySideMetadata { + groupKey := groupKeyFunc(s.Labels) + oneSideGroup, exists := oneSideMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. + + if !exists { + // There are no series on the "one" side that match this series, so we'll produce no output series for this series. + continue + } + + manySideSeriesUsed[idx] = true + manySideGroupKey := manySideGroupKeyFunc(s.Labels) + thisManySide, exists := manySideMap[string(manySideGroupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. + + if exists { + // There is already at least one other "many" side series that contributes to the same set of output series, so just append this series to the same output series. + thisManySide.seriesIndices = append(thisManySide.seriesIndices, idx) + continue + } + + thisManySide = &manySide{ + seriesIndices: []int{idx}, + } + + manySideMap[string(manySideGroupKey)] = thisManySide + + for _, oneSide := range oneSideGroup { + // Most of the time, the output series won't already exist (unless we have input series with different metric names), + // so just create the series labels directly rather than trying to avoid their creation until we know for sure we'll + // need them. + l := outputSeriesLabelsFunc(g.oneSideMetadata[oneSide.seriesIndices[0]].Labels, s.Labels) + _, exists := outputSeriesMap[string(l.Bytes(buf))] + + if !exists { + oneSide.outputSeriesCount++ + thisManySide.outputSeriesCount++ + + outputSeriesMap[string(l.Bytes(buf))] = groupedBinaryOperationOutputSeriesWithLabels{ + labels: l, + outputSeries: &groupedBinaryOperationOutputSeries{ + manySide: thisManySide, + oneSide: oneSide, + }, + } + } + } + } + + // Next, go through all the "one" side groups again, and determine which of the "one" side series we'll actually need. + oneSideSeriesUsed, err := types.BoolSlicePool.Get(len(g.oneSideMetadata), g.MemoryConsumptionTracker) + if err != nil { + return nil, nil, nil, nil, err + } + + oneSideSeriesUsed = oneSideSeriesUsed[:len(g.oneSideMetadata)] + + for _, oneSideGroup := range oneSideMap { + var thisMatchGroup *matchGroup + + for _, oneSide := range oneSideGroup { + if oneSide.outputSeriesCount == 0 { + // If any part of a group has no output series, then no parts of that group will have output series. + break + } else if thisMatchGroup == nil && len(oneSideGroup) > 1 { + // We only need a matchGroup to detect conflicts between series on the "one" side that have the same grouping labels. + // So if there is only one "one" side, we don't need to bother with this and can skip creating the matchGroup. + thisMatchGroup = &matchGroup{oneSideCount: len(oneSideGroup)} + } + + oneSide.matchGroup = thisMatchGroup + + for _, idx := range oneSide.seriesIndices { + oneSideSeriesUsed[idx] = true + } + } + } + + // Finally, construct the list of series that this operator will return. + outputMetadata := types.GetSeriesMetadataSlice(len(outputSeriesMap)) + outputSeries := make([]*groupedBinaryOperationOutputSeries, 0, len(outputSeriesMap)) + + for _, o := range outputSeriesMap { + outputMetadata = append(outputMetadata, types.SeriesMetadata{Labels: o.labels}) + outputSeries = append(outputSeries, o.outputSeries) + } + + return outputMetadata, outputSeries, oneSideSeriesUsed, manySideSeriesUsed, nil +} + +// additionalLabelsKeyFunc returns a function that extracts a key representing the additional labels from a "one" side series that will +// be included in the final output series labels. +func (g *GroupedVectorVectorBinaryOperation) additionalLabelsKeyFunc() func(oneSideLabels labels.Labels) []byte { + if len(g.VectorMatching.Include) == 0 { + return func(_ labels.Labels) []byte { + return nil + } + } + + buf := make([]byte, 0, 1024) + + return func(oneSideLabels labels.Labels) []byte { + buf = oneSideLabels.BytesWithLabels(buf, g.VectorMatching.Include...) + return buf + } +} + +// manySideGroupKeyFunc returns a function that extracts a key representing the set of labels from the "many" side that will contribute +// to the same set of output series. +func (g *GroupedVectorVectorBinaryOperation) manySideGroupKeyFunc() func(manySideLabels labels.Labels) []byte { + buf := make([]byte, 0, 1024) + + if !g.shouldRemoveMetricNameFromManySide() && len(g.VectorMatching.Include) == 0 { + return func(manySideLabels labels.Labels) []byte { + buf = manySideLabels.Bytes(buf) // FIXME: it'd be nice if we could avoid Bytes() copying the slice here + return buf + } + } + + if len(g.VectorMatching.Include) == 0 { + return func(manySideLabels labels.Labels) []byte { + buf = manySideLabels.BytesWithoutLabels(buf, labels.MetricName) + return buf + } + } + + labelsToRemove := g.VectorMatching.Include + + if g.shouldRemoveMetricNameFromManySide() { + labelsToRemove = append(labelsToRemove, labels.MetricName) + slices.Sort(labelsToRemove) + } + + return func(manySideLabels labels.Labels) []byte { + buf = manySideLabels.BytesWithoutLabels(buf, labelsToRemove...) + return buf + } +} + +// outputSeriesLabelsFunc returns a function that determines the final output series labels for given series on both sides. +func (g *GroupedVectorVectorBinaryOperation) outputSeriesLabelsFunc() func(oneSideLabels labels.Labels, manySideLabels labels.Labels) labels.Labels { + if len(g.VectorMatching.Include) == 0 { + if g.shouldRemoveMetricNameFromManySide() { + return func(_ labels.Labels, manySideLabels labels.Labels) labels.Labels { + return manySideLabels.DropMetricName() + } + } + + return func(_ labels.Labels, manySideLabels labels.Labels) labels.Labels { + return manySideLabels + } + } + + lb := labels.NewBuilder(labels.EmptyLabels()) + + if g.shouldRemoveMetricNameFromManySide() { + return func(oneSideLabels labels.Labels, manySideLabels labels.Labels) labels.Labels { + lb.Reset(manySideLabels) + lb.Del(labels.MetricName) + + for _, l := range g.VectorMatching.Include { + lb.Set(l, oneSideLabels.Get(l)) + } + + return lb.Labels() + } + } + + return func(oneSideLabels labels.Labels, manySideLabels labels.Labels) labels.Labels { + lb.Reset(manySideLabels) + + for _, l := range g.VectorMatching.Include { + lb.Set(l, oneSideLabels.Get(l)) + } + + return lb.Labels() + } +} + +func (g *GroupedVectorVectorBinaryOperation) shouldRemoveMetricNameFromManySide() bool { + if g.Op.IsComparisonOperator() { + return g.ReturnBool + } + + return true +} + +// sortSeries sorts metadata and series in place to try to minimise the number of input series we'll need to buffer in memory. +// +// This is critical for minimising the memory consumption of this operator: if we choose a poor ordering of series, +// we'll need to buffer many input series in memory. +// +// At present, sortSeries uses a very basic heuristic to guess the best way to sort the output series, but we could make +// this more sophisticated in the future. +func (g *GroupedVectorVectorBinaryOperation) sortSeries(metadata []types.SeriesMetadata, series []*groupedBinaryOperationOutputSeries) { + // Each series from the "many" side is usually used for at most one output series, so sort the output series so that we buffer as little of the + // "many" side series as possible. + // + // This isn't necessarily perfect: it may be that this still requires us to buffer many series from the "many" side if many + // series from the "many" side map to one output series, but this is expected to be rare. + sort.Sort(newFavourManySideSorter(metadata, series)) +} + +type favourManySideSorter struct { + metadata []types.SeriesMetadata + series []*groupedBinaryOperationOutputSeries +} + +func newFavourManySideSorter(metadata []types.SeriesMetadata, series []*groupedBinaryOperationOutputSeries) sort.Interface { + return favourManySideSorter{metadata, series} +} + +func (s favourManySideSorter) Len() int { + return len(s.metadata) +} + +func (s favourManySideSorter) Less(i, j int) bool { + iMany := s.series[i].manySide.latestSeriesIndex() + jMany := s.series[j].manySide.latestSeriesIndex() + if iMany != jMany { + return iMany < jMany + } + + return s.series[i].oneSide.latestSeriesIndex() < s.series[j].oneSide.latestSeriesIndex() +} + +func (s favourManySideSorter) Swap(i, j int) { + s.metadata[i], s.metadata[j] = s.metadata[j], s.metadata[i] + s.series[i], s.series[j] = s.series[j], s.series[i] +} + +func (g *GroupedVectorVectorBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { + if len(g.remainingSeries) == 0 { + return types.InstantVectorSeriesData{}, types.EOS + } + + thisSeries := g.remainingSeries[0] + g.remainingSeries = g.remainingSeries[1:] + + if err := g.ensureOneSidePopulated(ctx, thisSeries.oneSide); err != nil { + return types.InstantVectorSeriesData{}, err + } + + if err := g.ensureManySidePopulated(ctx, thisSeries.manySide); err != nil { + return types.InstantVectorSeriesData{}, err + } + + thisSeries.oneSide.outputSeriesCount-- + isLastOutputSeriesForOneSide := thisSeries.oneSide.outputSeriesCount == 0 + + thisSeries.manySide.outputSeriesCount-- + isLastOutputSeriesForManySide := thisSeries.manySide.outputSeriesCount == 0 + + var result types.InstantVectorSeriesData + var err error + + switch g.VectorMatching.Card { + case parser.CardOneToMany: + result, err = g.evaluator.computeResult(thisSeries.oneSide.mergedData, thisSeries.manySide.mergedData, isLastOutputSeriesForOneSide, isLastOutputSeriesForManySide) + case parser.CardManyToOne: + result, err = g.evaluator.computeResult(thisSeries.manySide.mergedData, thisSeries.oneSide.mergedData, isLastOutputSeriesForManySide, isLastOutputSeriesForOneSide) + default: + panic(fmt.Sprintf("unsupported cardinality '%v'", g.VectorMatching.Card)) + } + + if err != nil { + return types.InstantVectorSeriesData{}, err + } + + return result, nil +} + +func (g *GroupedVectorVectorBinaryOperation) ensureOneSidePopulated(ctx context.Context, side *oneSide) error { + if side.seriesIndices == nil { + // Already populated. + return nil + } + + // First time we've used this "one" side, populate it. + data, err := g.oneSideBuffer.GetSeries(ctx, side.seriesIndices) + if err != nil { + return err + } + + if err := g.updateOneSidePresence(side, data); err != nil { + return err + } + + side.mergedData, err = g.mergeOneSide(data, side.seriesIndices) + if err != nil { + return err + } + + // Clear seriesIndices to indicate that we've populated it. + side.seriesIndices = nil + + return nil +} + +func (g *GroupedVectorVectorBinaryOperation) updateOneSidePresence(side *oneSide, data []types.InstantVectorSeriesData) error { + matchGroup := side.matchGroup + if matchGroup == nil { + // If there is only one set of additional labels for this set of grouping labels, then there's nothing to do. + return nil + } + + // If there are multiple sets of additional labels for the same set of grouping labels, check that there is only one series at each + // time step for each set of grouping labels. + + if matchGroup.presence == nil { + var err error + matchGroup.presence, err = types.IntSlicePool.Get(g.timeRange.StepCount, g.MemoryConsumptionTracker) + + if err != nil { + return err + } + + matchGroup.presence = matchGroup.presence[:g.timeRange.StepCount] + + for idx := range matchGroup.presence { + matchGroup.presence[idx] = -1 + } + } + + for dataIdx, seriesData := range data { + seriesIdx := side.seriesIndices[dataIdx] + + for _, p := range seriesData.Floats { + if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 { + return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness()) + } + } + + for _, p := range seriesData.Histograms { + if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 { + return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness()) + } + } + } + + matchGroup.oneSideCount-- + + if matchGroup.oneSideCount == 0 { + types.IntSlicePool.Put(matchGroup.presence, g.MemoryConsumptionTracker) + } + + return nil +} + +func (g *GroupedVectorVectorBinaryOperation) mergeOneSide(data []types.InstantVectorSeriesData, sourceSeriesIndices []int) (types.InstantVectorSeriesData, error) { + merged, conflict, err := operators.MergeSeries(data, sourceSeriesIndices, g.MemoryConsumptionTracker) + + if err != nil { + return types.InstantVectorSeriesData{}, err + } + + if conflict != nil { + return types.InstantVectorSeriesData{}, g.formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, g.oneSideMetadata, g.oneSideHandedness()) + } + + return merged, nil +} + +func (g *GroupedVectorVectorBinaryOperation) ensureManySidePopulated(ctx context.Context, side *manySide) error { + if side.seriesIndices == nil { + // Already populated. + return nil + } + + // First time we've used this "one" side, populate it. + data, err := g.manySideBuffer.GetSeries(ctx, side.seriesIndices) + if err != nil { + return err + } + + side.mergedData, err = g.mergeManySide(data, side.seriesIndices) + if err != nil { + return err + } + + // Clear seriesIndices to indicate that we've populated it. + side.seriesIndices = nil + + return nil +} + +func (g *GroupedVectorVectorBinaryOperation) mergeManySide(data []types.InstantVectorSeriesData, sourceSeriesIndices []int) (types.InstantVectorSeriesData, error) { + merged, conflict, err := operators.MergeSeries(data, sourceSeriesIndices, g.MemoryConsumptionTracker) + + if err != nil { + return types.InstantVectorSeriesData{}, err + } + + if conflict != nil { + return types.InstantVectorSeriesData{}, errMultipleMatchesOnManySide + } + + return merged, nil +} + +func (g *GroupedVectorVectorBinaryOperation) formatConflictError( + firstConflictingSeriesIndex int, + secondConflictingSeriesIndex int, + description string, + ts int64, + sourceSeriesMetadata []types.SeriesMetadata, + side string, +) error { + firstConflictingSeriesLabels := sourceSeriesMetadata[firstConflictingSeriesIndex].Labels + groupLabels := groupLabelsFunc(g.VectorMatching, g.Op, g.ReturnBool)(firstConflictingSeriesLabels) + + if secondConflictingSeriesIndex == -1 { + return fmt.Errorf( + "found %s for the match group %s on the %s side of the operation at timestamp %s", + description, + groupLabels, + side, + timestamp.Time(ts).Format(time.RFC3339Nano), + ) + } + + secondConflictingSeriesLabels := sourceSeriesMetadata[secondConflictingSeriesIndex].Labels + + return fmt.Errorf( + "found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s", + description, + groupLabels, + side, + timestamp.Time(ts).Format(time.RFC3339Nano), + firstConflictingSeriesLabels, + secondConflictingSeriesLabels, + ) +} + +func (g *GroupedVectorVectorBinaryOperation) oneSideHandedness() string { + switch g.VectorMatching.Card { + case parser.CardOneToMany: + return "left" + case parser.CardManyToOne: + return "right" + default: + panic(fmt.Sprintf("unsupported cardinality '%v'", g.VectorMatching.Card)) + } +} + +func (g *GroupedVectorVectorBinaryOperation) ExpressionPosition() posrange.PositionRange { + return g.expressionPosition +} + +func (g *GroupedVectorVectorBinaryOperation) Close() { + g.Left.Close() + g.Right.Close() + // We don't need to close g.oneSide or g.manySide, as these are either g.Left or g.Right and so have been closed above. + + if g.oneSideMetadata != nil { + types.PutSeriesMetadataSlice(g.oneSideMetadata) + } + + if g.manySideMetadata != nil { + types.PutSeriesMetadataSlice(g.manySideMetadata) + } + + if g.oneSideBuffer != nil { + g.oneSideBuffer.Close() + } + + if g.manySideBuffer != nil { + g.manySideBuffer.Close() + } +} diff --git a/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation_test.go b/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation_test.go new file mode 100644 index 00000000000..55efb38de31 --- /dev/null +++ b/pkg/streamingpromql/operators/binops/grouped_vector_vector_binary_operation_test.go @@ -0,0 +1,318 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package binops + +import ( + "context" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/parser/posrange" + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/operators" + "github.com/grafana/mimir/pkg/streamingpromql/testutils" + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +func TestGroupedVectorVectorBinaryOperation_OutputSeriesSorting(t *testing.T) { + testCases := map[string]struct { + leftSeries []labels.Labels + rightSeries []labels.Labels + + matching parser.VectorMatching + op parser.ItemType + returnBool bool + + expectedOutputSeries []labels.Labels + }{ + "no series on either side": { + leftSeries: []labels.Labels{}, + rightSeries: []labels.Labels{}, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardManyToOne}, + + expectedOutputSeries: []labels.Labels{}, + }, + + "no series on left side": { + leftSeries: []labels.Labels{}, + rightSeries: []labels.Labels{ + labels.FromStrings("series", "a"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardManyToOne}, + + expectedOutputSeries: []labels.Labels{}, + }, + + "no series on right side": { + leftSeries: []labels.Labels{ + labels.FromStrings("series", "a"), + }, + rightSeries: []labels.Labels{}, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardManyToOne}, + + expectedOutputSeries: []labels.Labels{}, + }, + + "single series on each side matched and both sides' series are in the same order": { + leftSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "left", "group", "a"), + labels.FromStrings(labels.MetricName, "left", "group", "b"), + }, + rightSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "right", "group", "a"), + labels.FromStrings(labels.MetricName, "right", "group", "b"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardManyToOne, MatchingLabels: []string{"group"}, On: true}, + + expectedOutputSeries: []labels.Labels{ + labels.FromStrings("group", "a"), + labels.FromStrings("group", "b"), + }, + }, + + "single series on each side matched and both sides' series are in different order with group_left": { + leftSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "left", "group", "a"), + labels.FromStrings(labels.MetricName, "left", "group", "b"), + }, + rightSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "right", "group", "b"), + labels.FromStrings(labels.MetricName, "right", "group", "a"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardManyToOne, MatchingLabels: []string{"group"}, On: true}, + + // Should be sorted to avoid buffering "many" side. + expectedOutputSeries: []labels.Labels{ + labels.FromStrings("group", "a"), + labels.FromStrings("group", "b"), + }, + }, + + "single series on each side matched and both sides' series are in different order with group_right": { + leftSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "left", "group", "a"), + labels.FromStrings(labels.MetricName, "left", "group", "b"), + }, + rightSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "right", "group", "b"), + labels.FromStrings(labels.MetricName, "right", "group", "a"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardOneToMany, MatchingLabels: []string{"group"}, On: true}, + + // Should be sorted to avoid buffering "many" side. + expectedOutputSeries: []labels.Labels{ + labels.FromStrings("group", "b"), + labels.FromStrings("group", "a"), + }, + }, + + "multiple series on left side match to a single series on right side with group_left": { + leftSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx", "1"), + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx", "2"), + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx", "3"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx", "3"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx", "1"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx", "2"), + }, + rightSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "right", "group", "b"), + labels.FromStrings(labels.MetricName, "right", "group", "a"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardManyToOne, MatchingLabels: []string{"group"}, On: true}, + + // Should be sorted to avoid buffering "many" side. + expectedOutputSeries: []labels.Labels{ + labels.FromStrings("group", "a", "idx", "1"), + labels.FromStrings("group", "a", "idx", "2"), + labels.FromStrings("group", "a", "idx", "3"), + labels.FromStrings("group", "b", "idx", "3"), + labels.FromStrings("group", "b", "idx", "1"), + labels.FromStrings("group", "b", "idx", "2"), + }, + }, + + "multiple series on left side match to a single series on right side with group_right": { + leftSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx", "1"), + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx", "2"), + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx", "3"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx", "3"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx", "1"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx", "2"), + }, + rightSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "right", "group", "b"), + labels.FromStrings(labels.MetricName, "right", "group", "a"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardOneToMany, MatchingLabels: []string{"group"}, On: true}, + + // Should be sorted to avoid buffering "many" side. + expectedOutputSeries: []labels.Labels{ + labels.FromStrings("group", "b"), + labels.FromStrings("group", "a"), + }, + }, + + "single series on left side match to multiple series on right side with group_left": { + leftSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "left", "group", "a"), + labels.FromStrings(labels.MetricName, "left", "group", "b"), + }, + rightSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx", "1"), + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx", "2"), + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx", "3"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx", "3"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx", "1"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx", "2"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardManyToOne, MatchingLabels: []string{"group"}, On: true}, + + // Should be sorted to avoid buffering "many" side. + expectedOutputSeries: []labels.Labels{ + labels.FromStrings("group", "a"), + labels.FromStrings("group", "b"), + }, + }, + + "single series on left side match to multiple series on right side with group_right": { + leftSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "left", "group", "a"), + labels.FromStrings(labels.MetricName, "left", "group", "b"), + }, + rightSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx", "1"), + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx", "2"), + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx", "3"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx", "3"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx", "1"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx", "2"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardOneToMany, MatchingLabels: []string{"group"}, On: true}, + + // Should be sorted to avoid buffering "many" side. + expectedOutputSeries: []labels.Labels{ + labels.FromStrings("group", "b", "idx", "1"), + labels.FromStrings("group", "b", "idx", "2"), + labels.FromStrings("group", "b", "idx", "3"), + labels.FromStrings("group", "a", "idx", "3"), + labels.FromStrings("group", "a", "idx", "1"), + labels.FromStrings("group", "a", "idx", "2"), + }, + }, + + "multiple series on left side match to multiple series on right side with group_left": { + leftSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx_left", "1"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx_left", "3"), + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx_left", "2"), + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx_left", "3"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx_left", "1"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx_left", "2"), + }, + rightSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx_right", "4"), + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx_right", "5"), + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx_right", "6"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx_right", "5"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx_right", "4"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx_right", "6"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardManyToOne, MatchingLabels: []string{"group"}, On: true}, + + // Should be sorted to avoid buffering "many" side. + expectedOutputSeries: []labels.Labels{ + labels.FromStrings("group", "a", "idx_left", "1"), + labels.FromStrings("group", "b", "idx_left", "3"), + labels.FromStrings("group", "a", "idx_left", "2"), + labels.FromStrings("group", "a", "idx_left", "3"), + labels.FromStrings("group", "b", "idx_left", "1"), + labels.FromStrings("group", "b", "idx_left", "2"), + }, + }, + + "multiple series on left side match to multiple series on right side with group_right": { + leftSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx_left", "1"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx_left", "3"), + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx_left", "2"), + labels.FromStrings(labels.MetricName, "left", "group", "a", "idx_left", "3"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx_left", "1"), + labels.FromStrings(labels.MetricName, "left", "group", "b", "idx_left", "2"), + }, + rightSeries: []labels.Labels{ + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx_right", "4"), + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx_right", "5"), + labels.FromStrings(labels.MetricName, "right", "group", "b", "idx_right", "6"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx_right", "5"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx_right", "4"), + labels.FromStrings(labels.MetricName, "right", "group", "a", "idx_right", "6"), + }, + + op: parser.ADD, + matching: parser.VectorMatching{Card: parser.CardOneToMany, MatchingLabels: []string{"group"}, On: true}, + + // Should be sorted to avoid buffering "many" side. + expectedOutputSeries: []labels.Labels{ + labels.FromStrings("group", "b", "idx_right", "4"), + labels.FromStrings("group", "b", "idx_right", "5"), + labels.FromStrings("group", "b", "idx_right", "6"), + labels.FromStrings("group", "a", "idx_right", "5"), + labels.FromStrings("group", "a", "idx_right", "4"), + labels.FromStrings("group", "a", "idx_right", "6"), + }, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + left := &operators.TestOperator{Series: testCase.leftSeries} + right := &operators.TestOperator{Series: testCase.rightSeries} + + o, err := NewGroupedVectorVectorBinaryOperation( + left, + right, + testCase.matching, + testCase.op, + testCase.returnBool, + limiting.NewMemoryConsumptionTracker(0, nil), + nil, + posrange.PositionRange{}, + types.QueryTimeRange{}, + ) + + require.NoError(t, err) + + outputSeries, err := o.SeriesMetadata(context.Background()) + require.NoError(t, err) + + require.Equal(t, testutils.LabelsToSeriesMetadata(testCase.expectedOutputSeries), outputSeries) + }) + } +} diff --git a/pkg/streamingpromql/operators/binops/vector_vector_binary_operation.go b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go similarity index 65% rename from pkg/streamingpromql/operators/binops/vector_vector_binary_operation.go rename to pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go index 78fdc5d4cd8..70f1204d7bf 100644 --- a/pkg/streamingpromql/operators/binops/vector_vector_binary_operation.go +++ b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation.go @@ -13,22 +13,19 @@ import ( "time" "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" - "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/operators" - "github.com/grafana/mimir/pkg/streamingpromql/operators/functions" "github.com/grafana/mimir/pkg/streamingpromql/types" ) -// VectorVectorBinaryOperation represents a binary operation between instant vectors such as " + " or " - ". -type VectorVectorBinaryOperation struct { +// OneToOneVectorVectorBinaryOperation represents a one-to-one binary operation between instant vectors such as " + " or " - ". +// One-to-many and many-to-one binary operations between instant vectors are not supported. +type OneToOneVectorVectorBinaryOperation struct { Left types.InstantVectorOperator Right types.InstantVectorOperator Op parser.ItemType @@ -44,20 +41,18 @@ type VectorVectorBinaryOperation struct { leftMetadata []types.SeriesMetadata rightMetadata []types.SeriesMetadata - remainingSeries []*binaryOperationOutputSeries + remainingSeries []*oneToOneBinaryOperationOutputSeries leftBuffer *operators.InstantVectorOperatorBuffer rightBuffer *operators.InstantVectorOperatorBuffer - leftIterator types.InstantVectorSeriesDataIterator - rightIterator types.InstantVectorSeriesDataIterator - opFunc binaryOperationFunc + evaluator vectorVectorBinaryOperationEvaluator expressionPosition posrange.PositionRange annotations *annotations.Annotations } -var _ types.InstantVectorOperator = &VectorVectorBinaryOperation{} +var _ types.InstantVectorOperator = &OneToOneVectorVectorBinaryOperation{} -type binaryOperationOutputSeries struct { +type oneToOneBinaryOperationOutputSeries struct { leftSeriesIndices []int rightSeriesIndices []int } @@ -65,18 +60,18 @@ type binaryOperationOutputSeries struct { // latestLeftSeries returns the index of the last series from the left source needed for this output series. // // It assumes that leftSeriesIndices is sorted in ascending order. -func (s binaryOperationOutputSeries) latestLeftSeries() int { +func (s oneToOneBinaryOperationOutputSeries) latestLeftSeries() int { return s.leftSeriesIndices[len(s.leftSeriesIndices)-1] } // latestRightSeries returns the index of the last series from the right source needed for this output series. // // It assumes that rightSeriesIndices is sorted in ascending order. -func (s binaryOperationOutputSeries) latestRightSeries() int { +func (s oneToOneBinaryOperationOutputSeries) latestRightSeries() int { return s.rightSeriesIndices[len(s.rightSeriesIndices)-1] } -func NewVectorVectorBinaryOperation( +func NewOneToOneVectorVectorBinaryOperation( left types.InstantVectorOperator, right types.InstantVectorOperator, vectorMatching parser.VectorMatching, @@ -85,35 +80,29 @@ func NewVectorVectorBinaryOperation( memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, -) (*VectorVectorBinaryOperation, error) { - b := &VectorVectorBinaryOperation{ +) (*OneToOneVectorVectorBinaryOperation, error) { + e, err := newVectorVectorBinaryOperationEvaluator(op, returnBool, memoryConsumptionTracker, annotations, expressionPosition) + if err != nil { + return nil, err + } + + b := &OneToOneVectorVectorBinaryOperation{ Left: left, Right: right, - leftIterator: types.InstantVectorSeriesDataIterator{}, - rightIterator: types.InstantVectorSeriesDataIterator{}, VectorMatching: vectorMatching, Op: op, ReturnBool: returnBool, MemoryConsumptionTracker: memoryConsumptionTracker, + evaluator: e, expressionPosition: expressionPosition, annotations: annotations, } - if returnBool { - b.opFunc = boolComparisonOperationFuncs[op] - } else { - b.opFunc = arithmeticAndComparisonOperationFuncs[op] - } - - if b.opFunc == nil { - return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%s'", op)) - } - return b, nil } -func (b *VectorVectorBinaryOperation) ExpressionPosition() posrange.PositionRange { +func (b *OneToOneVectorVectorBinaryOperation) ExpressionPosition() posrange.PositionRange { return b.expressionPosition } @@ -132,7 +121,7 @@ func (b *VectorVectorBinaryOperation) ExpressionPosition() posrange.PositionRang // (The alternative would be to compute the entire result here in SeriesMetadata and only return the series that // contain points, but that would mean we'd need to hold the entire result in memory at once, which we want to // avoid.) -func (b *VectorVectorBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { +func (b *OneToOneVectorVectorBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { if canProduceAnySeries, err := b.loadSeriesMetadata(ctx); err != nil { return nil, err } else if !canProduceAnySeries { @@ -156,7 +145,7 @@ func (b *VectorVectorBinaryOperation) SeriesMetadata(ctx context.Context) ([]typ // loadSeriesMetadata loads series metadata from both sides of this operation. // It returns false if one side returned no series and that means there is no way for this operation to return any series. // (eg. if doing A + B and either A or B have no series, then there is no way for this operation to produce any series) -func (b *VectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Context) (bool, error) { +func (b *OneToOneVectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Context) (bool, error) { // We retain the series labels for later so we can use them to generate error messages. // We'll return them to the pool in Close(). @@ -167,7 +156,6 @@ func (b *VectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Context) (b } if len(b.leftMetadata) == 0 { - // FIXME: this is incorrect for 'or' // No series on left-hand side, we'll never have any output series. return false, nil } @@ -178,7 +166,6 @@ func (b *VectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Context) (b } if len(b.rightMetadata) == 0 { - // FIXME: this is incorrect for 'or' and 'unless' // No series on right-hand side, we'll never have any output series. return false, nil } @@ -194,15 +181,14 @@ func (b *VectorVectorBinaryOperation) loadSeriesMetadata(ctx context.Context) (b // - a corresponding list of the source series for each output series // - a list indicating which series from the left side are needed to compute the output // - a list indicating which series from the right side are needed to compute the output -func (b *VectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*binaryOperationOutputSeries, []bool, []bool, error) { - labelsFunc := b.groupLabelsFunc() +func (b *OneToOneVectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*oneToOneBinaryOperationOutputSeries, []bool, []bool, error) { + labelsFunc := groupLabelsFunc(b.VectorMatching, b.Op, b.ReturnBool) groupKeyFunc := vectorMatchingGroupKeyFunc(b.VectorMatching) - outputSeriesMap := map[string]*binaryOperationOutputSeries{} + outputSeriesMap := map[string]*oneToOneBinaryOperationOutputSeries{} // Use the smaller side to populate the map of possible output series first. // This should ensure we don't unnecessarily populate the output series map with series that will never match in most cases. // (It's possible that all the series on the larger side all belong to the same group, but this is expected to be rare.) - // FIXME: this doesn't work as-is for 'unless'. smallerSide := b.leftMetadata largerSide := b.rightMetadata smallerSideIsLeftSide := len(b.leftMetadata) < len(b.rightMetadata) @@ -217,7 +203,7 @@ func (b *VectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetad series, exists := outputSeriesMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. if !exists { - series = &binaryOperationOutputSeries{} + series = &oneToOneBinaryOperationOutputSeries{} outputSeriesMap[string(groupKey)] = series } @@ -240,21 +226,18 @@ func (b *VectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetad series.leftSeriesIndices = append(series.leftSeriesIndices, idx) } } - - // FIXME: if this is an 'or' operation, then we need to create the right side even if the left doesn't exist (or vice-versa) } // Remove series that cannot produce samples. for seriesLabels, outputSeries := range outputSeriesMap { if len(outputSeries.leftSeriesIndices) == 0 || len(outputSeries.rightSeriesIndices) == 0 { - // FIXME: this is incorrect for 'or' and 'unless' // No matching series on at least one side for this output series, so output series will have no samples. Remove it. delete(outputSeriesMap, seriesLabels) } } allMetadata := types.GetSeriesMetadataSlice(len(outputSeriesMap)) - allSeries := make([]*binaryOperationOutputSeries, 0, len(outputSeriesMap)) + allSeries := make([]*oneToOneBinaryOperationOutputSeries, 0, len(outputSeriesMap)) leftSeriesUsed, err := types.BoolSlicePool.Get(len(b.leftMetadata), b.MemoryConsumptionTracker) if err != nil { @@ -293,17 +276,12 @@ func (b *VectorVectorBinaryOperation) computeOutputSeries() ([]types.SeriesMetad // // At present, sortSeries uses a very basic heuristic to guess the best way to sort the output series, but we could make // this more sophisticated in the future. -func (b *VectorVectorBinaryOperation) sortSeries(metadata []types.SeriesMetadata, series []*binaryOperationOutputSeries) { +func (b *OneToOneVectorVectorBinaryOperation) sortSeries(metadata []types.SeriesMetadata, series []*oneToOneBinaryOperationOutputSeries) { // For one-to-one matching, we assume that each output series takes one series from each side of the operator. // If this is true, then the best order is the one in which we read from the highest cardinality side in order. // If we do this, then in the worst case, we'll have to buffer the whole of the lower cardinality side. // (Compare this with sorting so that we read the lowest cardinality side in order: in the worst case, we'll have // to buffer the whole of the higher cardinality side.) - // - // FIXME: this is reasonable for one-to-one matching, but likely not for one-to-many / many-to-one. - // For one-to-many / many-to-one, it would likely be best to buffer the side used for multiple output series (the "one" side), - // as we'll need to retain these series for multiple output series anyway. - var sortInterface sort.Interface if len(b.leftMetadata) < len(b.rightMetadata) { @@ -317,14 +295,14 @@ func (b *VectorVectorBinaryOperation) sortSeries(metadata []types.SeriesMetadata type binaryOperationOutputSorter struct { metadata []types.SeriesMetadata - series []*binaryOperationOutputSeries + series []*oneToOneBinaryOperationOutputSeries } type favourLeftSideSorter struct { binaryOperationOutputSorter } -func newFavourLeftSideSorter(metadata []types.SeriesMetadata, series []*binaryOperationOutputSeries) favourLeftSideSorter { +func newFavourLeftSideSorter(metadata []types.SeriesMetadata, series []*oneToOneBinaryOperationOutputSeries) favourLeftSideSorter { return favourLeftSideSorter{binaryOperationOutputSorter{metadata, series}} } @@ -332,7 +310,7 @@ type favourRightSideSorter struct { binaryOperationOutputSorter } -func newFavourRightSideSorter(metadata []types.SeriesMetadata, series []*binaryOperationOutputSeries) favourRightSideSorter { +func newFavourRightSideSorter(metadata []types.SeriesMetadata, series []*oneToOneBinaryOperationOutputSeries) favourRightSideSorter { return favourRightSideSorter{binaryOperationOutputSorter{metadata, series}} } @@ -365,36 +343,7 @@ func (g favourRightSideSorter) Less(i, j int) bool { return g.series[i].latestLeftSeries() < g.series[j].latestLeftSeries() } -// groupLabelsFunc returns a function that computes the labels of the output group this series belongs to. -func (b *VectorVectorBinaryOperation) groupLabelsFunc() func(labels.Labels) labels.Labels { - lb := labels.NewBuilder(labels.EmptyLabels()) - - if b.VectorMatching.On { - return func(l labels.Labels) labels.Labels { - lb.Reset(l) - lb.Keep(b.VectorMatching.MatchingLabels...) - return lb.Labels() - } - } - - if b.Op.IsComparisonOperator() && !b.ReturnBool { - // If this is a comparison operator, we want to retain the metric name, as the comparison acts like a filter. - return func(l labels.Labels) labels.Labels { - lb.Reset(l) - lb.Del(b.VectorMatching.MatchingLabels...) - return lb.Labels() - } - } - - return func(l labels.Labels) labels.Labels { - lb.Reset(l) - lb.Del(labels.MetricName) - lb.Del(b.VectorMatching.MatchingLabels...) - return lb.Labels() - } -} - -func (b *VectorVectorBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { +func (b *OneToOneVectorVectorBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { if len(b.remainingSeries) == 0 { return types.InstantVectorSeriesData{}, types.EOS } @@ -407,7 +356,7 @@ func (b *VectorVectorBinaryOperation) NextSeries(ctx context.Context) (types.Ins return types.InstantVectorSeriesData{}, err } - mergedLeftSide, err := b.mergeOneSide(allLeftSeries, thisSeries.leftSeriesIndices, b.leftMetadata, "left") + mergedLeftSide, err := b.mergeSingleSide(allLeftSeries, thisSeries.leftSeriesIndices, b.leftMetadata, "left") if err != nil { return types.InstantVectorSeriesData{}, err } @@ -417,15 +366,15 @@ func (b *VectorVectorBinaryOperation) NextSeries(ctx context.Context) (types.Ins return types.InstantVectorSeriesData{}, err } - mergedRightSide, err := b.mergeOneSide(allRightSeries, thisSeries.rightSeriesIndices, b.rightMetadata, "right") + mergedRightSide, err := b.mergeSingleSide(allRightSeries, thisSeries.rightSeriesIndices, b.rightMetadata, "right") if err != nil { return types.InstantVectorSeriesData{}, err } - return b.computeResult(mergedLeftSide, mergedRightSide) + return b.evaluator.computeResult(mergedLeftSide, mergedRightSide, true, true) } -// mergeOneSide exists to handle the case where one side of an output series has different source series at different time steps. +// mergeSingleSide exists to handle the case where one side of an output series has different source series at different time steps. // // For example, consider the query "left_side + on (env) right_side" with the following source data: // @@ -433,14 +382,12 @@ func (b *VectorVectorBinaryOperation) NextSeries(ctx context.Context) (types.Ins // left_side{env="test", pod="b"} _ _ 3 // right_side{env="test"} 100 200 300 // -// mergeOneSide will take in both series for left_side and return a single series with the points [1, 2, 3]. -// -// mergeOneSide is optimised for the case where there is only one source series, or the source series do not overlap, as in the example above. +// mergeSingleSide will take in both series for left_side and return a single series with the points [1, 2, 3]. // -// NOTE: mergeOneSide has the side effect of re-ordering both data and sourceSeriesIndices. +// mergeSingleSide is optimised for the case where there is only one source series, or the source series do not overlap, as in the example above. // -// FIXME: for many-to-one / one-to-many matching, we could avoid re-merging each time for the side used multiple times -func (b *VectorVectorBinaryOperation) mergeOneSide(data []types.InstantVectorSeriesData, sourceSeriesIndices []int, sourceSeriesMetadata []types.SeriesMetadata, side string) (types.InstantVectorSeriesData, error) { +// mergeSingleSide has the side effect of re-ordering both data and sourceSeriesIndices. +func (b *OneToOneVectorVectorBinaryOperation) mergeSingleSide(data []types.InstantVectorSeriesData, sourceSeriesIndices []int, sourceSeriesMetadata []types.SeriesMetadata, side string) (types.InstantVectorSeriesData, error) { merged, conflict, err := operators.MergeSeries(data, sourceSeriesIndices, b.MemoryConsumptionTracker) if err != nil { @@ -454,9 +401,9 @@ func (b *VectorVectorBinaryOperation) mergeOneSide(data []types.InstantVectorSer return merged, nil } -func (b *VectorVectorBinaryOperation) mergeConflictToError(conflict *operators.MergeConflict, sourceSeriesMetadata []types.SeriesMetadata, side string) error { +func (b *OneToOneVectorVectorBinaryOperation) mergeConflictToError(conflict *operators.MergeConflict, sourceSeriesMetadata []types.SeriesMetadata, side string) error { firstConflictingSeriesLabels := sourceSeriesMetadata[conflict.FirstConflictingSeriesIndex].Labels - groupLabels := b.groupLabelsFunc()(firstConflictingSeriesLabels) + groupLabels := groupLabelsFunc(b.VectorMatching, b.Op, b.ReturnBool)(firstConflictingSeriesLabels) if conflict.SecondConflictingSeriesIndex == -1 { return fmt.Errorf( @@ -481,161 +428,7 @@ func (b *VectorVectorBinaryOperation) mergeConflictToError(conflict *operators.M ) } -func (b *VectorVectorBinaryOperation) computeResult(left types.InstantVectorSeriesData, right types.InstantVectorSeriesData) (types.InstantVectorSeriesData, error) { - var fPoints []promql.FPoint - var hPoints []promql.HPoint - - // For one-to-one matching for arithmetic operators, we'll never produce more points than the smaller input side. - // Because floats and histograms can be multiplied together, we use the sum of both the float and histogram points. - // We also don't know if the output will be exclusively floats or histograms, so we'll use the same size slice for both. - // We only assign the slices once we see the associated point type so it shouldn't be common that we allocate both. - // - // FIXME: this is not safe to do for one-to-many or many-to-one matching, as we may need the input series for later output series. - canReturnLeftFPointSlice, canReturnLeftHPointSlice, canReturnRightFPointSlice, canReturnRightHPointSlice := true, true, true, true - leftPoints := len(left.Floats) + len(left.Histograms) - rightPoints := len(right.Floats) + len(right.Histograms) - maxPoints := max(leftPoints, rightPoints) - - // We cannot re-use any slices when the series contain a mix of floats and histograms. - // Consider the following, where f is a float at a particular step, and h is a histogram. - // load 5m - // series1 f f f h h - // series2 h h f f h - // eval range from 0 to 25m step 5m series1 * series2 - // {} h h f h f - // We can fit the resulting 3 histograms into series2 existing slice. However, the second - // last step (index 3) produces a histogram which would be stored over the existing histogram - // at the end of series2 (also index 3). - // It should be pretty uncommon that metric contains both histograms and floats, so we will - // accept the cost of a new slice. - mixedPoints := len(left.Floats) > 0 && len(left.Histograms) > 0 || len(right.Floats) > 0 && len(right.Histograms) > 0 - - prepareFSlice := func() error { - if !mixedPoints && maxPoints <= cap(left.Floats) && cap(left.Floats) < cap(right.Floats) { - // Can fit output in left side, and the left side is smaller than the right - canReturnLeftFPointSlice = false - fPoints = left.Floats[:0] - return nil - } - if !mixedPoints && maxPoints <= cap(right.Floats) { - // Can otherwise fit in the right side - canReturnRightFPointSlice = false - fPoints = right.Floats[:0] - return nil - } - // Either we have mixed points or we can't fit in either left or right side, so create a new slice - var err error - if fPoints, err = types.FPointSlicePool.Get(maxPoints, b.MemoryConsumptionTracker); err != nil { - return err - } - return nil - } - - prepareHSlice := func() error { - if !mixedPoints && maxPoints <= cap(left.Histograms) && cap(left.Histograms) < cap(right.Histograms) { - // Can fit output in left side, and the left side is smaller than the right - canReturnLeftHPointSlice = false - hPoints = left.Histograms[:0] - return nil - } - if !mixedPoints && maxPoints <= cap(right.Histograms) { - // Can otherwise fit in the right side - canReturnRightHPointSlice = false - hPoints = right.Histograms[:0] - return nil - } - // Either we have mixed points or we can't fit in either left or right side, so create a new slice - var err error - if hPoints, err = types.HPointSlicePool.Get(maxPoints, b.MemoryConsumptionTracker); err != nil { - return err - } - return nil - } - - b.leftIterator.Reset(left) - b.rightIterator.Reset(right) - - // Get first sample from left and right - lT, lF, lH, lOk := b.leftIterator.Next() - rT, rF, rH, rOk := b.rightIterator.Next() - // Continue iterating until we exhaust either the LHS or RHS - // denoted by lOk or rOk being false. - for lOk && rOk { - if lT == rT { - // We have samples on both sides at this timestep. - resultFloat, resultHist, keep, valid, err := b.opFunc(lF, rF, lH, rH) - - if err != nil { - err = functions.NativeHistogramErrorToAnnotation(err, b.emitAnnotation) - if err != nil { - return types.InstantVectorSeriesData{}, err - } - - // Else: error was converted to an annotation, continue without emitting a sample here. - keep = false - } - - if !valid { - emitIncompatibleTypesAnnotation(b.annotations, b.Op, lH, rH, b.expressionPosition) - } - - if keep { - if resultHist != nil { - if hPoints == nil { - if err = prepareHSlice(); err != nil { - return types.InstantVectorSeriesData{}, err - } - } - hPoints = append(hPoints, promql.HPoint{ - H: resultHist, - T: lT, - }) - } else { - if fPoints == nil { - if err = prepareFSlice(); err != nil { - return types.InstantVectorSeriesData{}, err - } - } - fPoints = append(fPoints, promql.FPoint{ - F: resultFloat, - T: lT, - }) - } - } - } - - // Advance the iterator with the lower timestamp, or both if equal - if lT == rT { - lT, lF, lH, lOk = b.leftIterator.Next() - rT, rF, rH, rOk = b.rightIterator.Next() - } else if lT < rT { - lT, lF, lH, lOk = b.leftIterator.Next() - } else { - rT, rF, rH, rOk = b.rightIterator.Next() - } - } - - // Cleanup the unused slices. - if canReturnLeftFPointSlice { - types.FPointSlicePool.Put(left.Floats, b.MemoryConsumptionTracker) - } - if canReturnLeftHPointSlice { - types.HPointSlicePool.Put(left.Histograms, b.MemoryConsumptionTracker) - } - if canReturnRightFPointSlice { - types.FPointSlicePool.Put(right.Floats, b.MemoryConsumptionTracker) - } - if canReturnRightHPointSlice { - types.HPointSlicePool.Put(right.Histograms, b.MemoryConsumptionTracker) - } - - return types.InstantVectorSeriesData{ - Floats: fPoints, - Histograms: hPoints, - }, nil -} - -func (b *VectorVectorBinaryOperation) Close() { +func (b *OneToOneVectorVectorBinaryOperation) Close() { b.Left.Close() b.Right.Close() @@ -656,10 +449,6 @@ func (b *VectorVectorBinaryOperation) Close() { } } -func (b *VectorVectorBinaryOperation) emitAnnotation(generator types.AnnotationGenerator) { - b.annotations.Add(generator("", b.expressionPosition)) -} - type binaryOperationFunc func(lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram) (f float64, h *histogram.FloatHistogram, keep bool, valid bool, err error) // FIXME(jhesketh): Investigate avoiding copying histograms for binary ops. diff --git a/pkg/streamingpromql/operators/binops/vector_vector_binary_operation_test.go b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go similarity index 91% rename from pkg/streamingpromql/operators/binops/vector_vector_binary_operation_test.go rename to pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go index 75b8c45d2d1..bd697398b3a 100644 --- a/pkg/streamingpromql/operators/binops/vector_vector_binary_operation_test.go +++ b/pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go @@ -24,8 +24,8 @@ import ( // The merging behaviour has many edge cases, so it's easier to test it directly from Go. // // Most of the edge cases are already covered by TestMergeSeries, so we focus on the logic -// unique to VectorVectorBinaryOperation: converting conflicts to user-friendly error messages. -func TestVectorVectorBinaryOperation_SeriesMerging(t *testing.T) { +// unique to OneToOneVectorVectorBinaryOperation: converting conflicts to user-friendly error messages. +func TestOneToOneVectorVectorBinaryOperation_SeriesMerging(t *testing.T) { testCases := map[string]struct { input []types.InstantVectorSeriesData sourceSeriesIndices []int @@ -188,7 +188,7 @@ func TestVectorVectorBinaryOperation_SeriesMerging(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil) - o := &VectorVectorBinaryOperation{ + o := &OneToOneVectorVectorBinaryOperation{ // Simulate an expression with "on (env)". // This is used to generate error messages. VectorMatching: parser.VectorMatching{ @@ -202,7 +202,7 @@ func TestVectorVectorBinaryOperation_SeriesMerging(t *testing.T) { require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*uint64(len(s.Floats))+types.HPointSize*uint64(len(s.Histograms)))) } - result, err := o.mergeOneSide(testCase.input, testCase.sourceSeriesIndices, testCase.sourceSeriesMetadata, "right") + result, err := o.mergeSingleSide(testCase.input, testCase.sourceSeriesIndices, testCase.sourceSeriesMetadata, "right") if testCase.expectedError == "" { require.NoError(t, err) @@ -214,21 +214,21 @@ func TestVectorVectorBinaryOperation_SeriesMerging(t *testing.T) { } } -func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { +func TestOneToOneVectorVectorBinaryOperation_Sorting(t *testing.T) { testCases := map[string]struct { - series []*binaryOperationOutputSeries + series []*oneToOneBinaryOperationOutputSeries expectedOrderFavouringLeftSide []int expectedOrderFavouringRightSide []int }{ "no output series": { - series: []*binaryOperationOutputSeries{}, + series: []*oneToOneBinaryOperationOutputSeries{}, expectedOrderFavouringLeftSide: []int{}, expectedOrderFavouringRightSide: []int{}, }, "single output series": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{4}, rightSeriesIndices: []int{1}, @@ -239,7 +239,7 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { expectedOrderFavouringRightSide: []int{0}, }, "two output series, both with one input series, read from both sides in same order and already sorted correctly": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{1}, rightSeriesIndices: []int{1}, @@ -254,7 +254,7 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { expectedOrderFavouringRightSide: []int{0, 1}, }, "two output series, both with one input series, read from both sides in same order but sorted incorrectly": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{2}, rightSeriesIndices: []int{2}, @@ -269,7 +269,7 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { expectedOrderFavouringRightSide: []int{1, 0}, }, "two output series, both with one input series, read from both sides in different order": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{1}, rightSeriesIndices: []int{2}, @@ -284,7 +284,7 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { expectedOrderFavouringRightSide: []int{1, 0}, }, "two output series, both with multiple input series": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{1, 2}, rightSeriesIndices: []int{0, 3}, @@ -299,7 +299,7 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { expectedOrderFavouringRightSide: []int{1, 0}, }, "multiple output series, both with one input series, read from both sides in same order and already sorted correctly": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{1}, rightSeriesIndices: []int{1}, @@ -318,7 +318,7 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { expectedOrderFavouringRightSide: []int{0, 1, 2}, }, "multiple output series, both with one input series, read from both sides in same order but sorted incorrectly": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{2}, rightSeriesIndices: []int{2}, @@ -337,7 +337,7 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { expectedOrderFavouringRightSide: []int{2, 0, 1}, }, "multiple output series, both with one input series, read from both sides in different order": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{1}, rightSeriesIndices: []int{2}, @@ -356,7 +356,7 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { expectedOrderFavouringRightSide: []int{2, 0, 1}, }, "multiple output series, with multiple input series each": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{4, 5, 10}, rightSeriesIndices: []int{2, 20}, @@ -375,7 +375,7 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { expectedOrderFavouringRightSide: []int{0, 2, 1}, }, "multiple output series which depend on the same input series": { - series: []*binaryOperationOutputSeries{ + series: []*oneToOneBinaryOperationOutputSeries{ { leftSeriesIndices: []int{1}, rightSeriesIndices: []int{2}, @@ -409,8 +409,8 @@ func TestVectorVectorBinaryOperation_Sorting(t *testing.T) { metadata[i] = types.SeriesMetadata{Labels: labels.FromStrings("series", strconv.Itoa(i))} } - test := func(t *testing.T, series []*binaryOperationOutputSeries, metadata []types.SeriesMetadata, sorter sort.Interface, expectedOrder []int) { - expectedSeriesOrder := make([]*binaryOperationOutputSeries, len(series)) + test := func(t *testing.T, series []*oneToOneBinaryOperationOutputSeries, metadata []types.SeriesMetadata, sorter sort.Interface, expectedOrder []int) { + expectedSeriesOrder := make([]*oneToOneBinaryOperationOutputSeries, len(series)) expectedMetadataOrder := make([]types.SeriesMetadata, len(metadata)) for outputIndex, inputIndex := range expectedOrder { diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 571effc3e97..3d9be241537 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -249,7 +249,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with '%v'", e.Op)) } - if !e.Op.IsSetOperator() && e.VectorMatching.Card != parser.CardOneToOne { + if !e.Op.IsSetOperator() && e.VectorMatching.Card != parser.CardOneToOne && !q.engine.featureToggles.EnableOneToManyAndManyToOneBinaryOperations { return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with %v matching", e.VectorMatching.Card)) } @@ -269,7 +269,14 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types case parser.LOR: return binops.NewOrBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, timeRange, e.PositionRange()), nil default: - return binops.NewVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) + switch e.VectorMatching.Card { + case parser.CardOneToMany, parser.CardManyToOne: + return binops.NewGroupedVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange(), timeRange) + case parser.CardOneToOne: + return binops.NewOneToOneVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) + default: + return nil, compat.NewNotSupportedError(fmt.Sprintf("binary expression with %v matching for '%v'", e.VectorMatching.Card, e.Op)) + } } case *parser.UnaryExpr: diff --git a/pkg/streamingpromql/testdata/ours/binary_operators.test b/pkg/streamingpromql/testdata/ours/binary_operators.test index 72f3f130d1a..cb33558432e 100644 --- a/pkg/streamingpromql/testdata/ours/binary_operators.test +++ b/pkg/streamingpromql/testdata/ours/binary_operators.test @@ -977,3 +977,604 @@ load 6m # Test the case where both sides of 'or' contain series with the same labels. eval range from 0 to 48m step 6m min(series_1) or min(series_2) {} 9 1 2 9 4 5 9 7 9 + +clear + +# Many-to-one / one-to-many matching. +load 6m + method_code:http_errors:rate5m{method="get", code="500"} 24 240 240 _ + method_code:http_errors:rate5m{method="get", code="404"} 30 300 _ _ + method_code:http_errors:rate5m{method="put", code="501"} 3 30 _ _ + method_code:http_errors:rate5m{method="post", code="500"} 6 60 _ 60 + method_code:http_errors:rate5m{method="post", code="404"} 21 210 _ 210 + method:http_requests:rate5m{method="get", foo="bar"} 600 _ 2400 2400 + method:http_requests:rate5m{method="get", foo="bar2"} _ 1200 1200 1200 + method:http_requests:rate5m{method="del", foo="baz"} 34 80 _ _ + method:http_requests:rate5m{method="post", foo="blah"} 120 100 _ 100 + +eval instant at 0 method_code:http_errors:rate5m / ignoring(code, foo) group_left() method:http_requests:rate5m + {method="get", code="500"} 0.04 + {method="get", code="404"} 0.05 + {method="post", code="500"} 0.05 + {method="post", code="404"} 0.175 + +eval instant at 0 method_code:http_errors:rate5m / on(method) group_left() method:http_requests:rate5m + {method="get", code="500"} 0.04 + {method="get", code="404"} 0.05 + {method="post", code="500"} 0.05 + {method="post", code="404"} 0.175 + +eval instant at 0 method_code:http_errors:rate5m / ignoring(code, foo) group_left(foo) method:http_requests:rate5m + {method="get", code="500", foo="bar"} 0.04 + {method="get", code="404", foo="bar"} 0.05 + {method="post", code="500", foo="blah"} 0.05 + {method="post", code="404", foo="blah"} 0.175 + +eval instant at 0 method_code:http_errors:rate5m / on(method) group_left(foo) method:http_requests:rate5m + {method="get", code="500", foo="bar"} 0.04 + {method="get", code="404", foo="bar"} 0.05 + {method="post", code="500", foo="blah"} 0.05 + {method="post", code="404", foo="blah"} 0.175 + +eval instant at 6m method_code:http_errors:rate5m / ignoring(code, foo) group_left() method:http_requests:rate5m + {method="get", code="500"} 0.2 + {method="get", code="404"} 0.25 + {method="post", code="500"} 0.6 + {method="post", code="404"} 2.1 + +eval instant at 6m method_code:http_errors:rate5m / on(method) group_left() method:http_requests:rate5m + {method="get", code="500"} 0.2 + {method="get", code="404"} 0.25 + {method="post", code="500"} 0.6 + {method="post", code="404"} 2.1 + +eval instant at 6m method_code:http_errors:rate5m / ignoring(code, foo) group_left(foo) method:http_requests:rate5m + {method="get", code="500", foo="bar2"} 0.2 + {method="get", code="404", foo="bar2"} 0.25 + {method="post", code="500", foo="blah"} 0.6 + {method="post", code="404", foo="blah"} 2.1 + +eval instant at 6m method_code:http_errors:rate5m / on(method) group_left(foo) method:http_requests:rate5m + {method="get", code="500", foo="bar2"} 0.2 + {method="get", code="404", foo="bar2"} 0.25 + {method="post", code="500", foo="blah"} 0.6 + {method="post", code="404", foo="blah"} 2.1 + +eval range from 0 to 6m step 6m method_code:http_errors:rate5m / ignoring(code, foo) group_left() method:http_requests:rate5m + {method="get", code="500"} 0.04 0.2 + {method="get", code="404"} 0.05 0.25 + {method="post", code="500"} 0.05 0.6 + {method="post", code="404"} 0.175 2.1 + +eval range from 0 to 6m step 6m method_code:http_errors:rate5m / on(method) group_left() method:http_requests:rate5m + {method="get", code="500"} 0.04 0.2 + {method="get", code="404"} 0.05 0.25 + {method="post", code="500"} 0.05 0.6 + {method="post", code="404"} 0.175 2.1 + +eval range from 0 to 6m step 6m method_code:http_errors:rate5m / ignoring(code, foo) group_left(foo) method:http_requests:rate5m + {method="get", code="500", foo="bar"} 0.04 _ + {method="get", code="404", foo="bar"} 0.05 _ + {method="get", code="500", foo="bar2"} _ 0.2 + {method="get", code="404", foo="bar2"} _ 0.25 + {method="post", code="500", foo="blah"} 0.05 0.6 + {method="post", code="404", foo="blah"} 0.175 2.1 + +eval range from 0 to 6m step 6m method_code:http_errors:rate5m / on(method) group_left(foo) method:http_requests:rate5m + {method="get", code="500", foo="bar"} 0.04 _ + {method="get", code="404", foo="bar"} 0.05 _ + {method="get", code="500", foo="bar2"} _ 0.2 + {method="get", code="404", foo="bar2"} _ 0.25 + {method="post", code="500", foo="blah"} 0.05 0.6 + {method="post", code="404", foo="blah"} 0.175 2.1 + +# Fail if multiple series on "one" side, even if they differ on the additional labels. +# We run these tests as range queries with a single step to avoid promqltest's instant query time shifting, which makes using an explicit error message pattern more difficult. +eval_fail range from 12m to 12m step 1m method_code:http_errors:rate5m / ignoring(code, foo) group_left() method:http_requests:rate5m + expected_fail_regexp found duplicate series for the match group \{method="get"\} on the right (hand-)?side of the operation( at timestamp 1970-01-01T00:12:00Z)?: \[?\{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(,| and) \{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(\];many-to-many matching not allowed: matching labels must be unique on one side)? + +eval_fail range from 12m to 12m step 1m method_code:http_errors:rate5m / on(method) group_left() method:http_requests:rate5m + expected_fail_regexp found duplicate series for the match group \{method="get"\} on the right (hand-)?side of the operation( at timestamp 1970-01-01T00:12:00Z)?: \[?\{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(,| and) \{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(\];many-to-many matching not allowed: matching labels must be unique on one side)? + +eval_fail range from 12m to 12m step 1m method_code:http_errors:rate5m / ignoring(code, foo) group_left(foo) method:http_requests:rate5m + expected_fail_regexp found duplicate series for the match group \{method="get"\} on the right (hand-)?side of the operation( at timestamp 1970-01-01T00:12:00Z)?: \[?\{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(,| and) \{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(\];many-to-many matching not allowed: matching labels must be unique on one side)? + +eval_fail range from 12m to 12m step 1m method_code:http_errors:rate5m / on(method) group_left(foo) method:http_requests:rate5m + expected_fail_regexp found duplicate series for the match group \{method="get"\} on the right (hand-)?side of the operation( at timestamp 1970-01-01T00:12:00Z)?: \[?\{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(,| and) \{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(\];many-to-many matching not allowed: matching labels must be unique on one side)? + +# Same cases as above, but with group_right and expressions swapped. +eval instant at 0 method:http_requests:rate5m / ignoring(code, foo) group_right() method_code:http_errors:rate5m + {method="get", code="500"} 25 + {method="get", code="404"} 20 + {method="post", code="500"} 20 + {method="post", code="404"} 5.7142857143 + +eval instant at 0 method:http_requests:rate5m / on(method) group_right() method_code:http_errors:rate5m + {method="get", code="500"} 25 + {method="get", code="404"} 20 + {method="post", code="500"} 20 + {method="post", code="404"} 5.7142857143 + +eval instant at 0 method:http_requests:rate5m / ignoring(code, foo) group_right(foo) method_code:http_errors:rate5m + {method="get", code="500", foo="bar"} 25 + {method="get", code="404", foo="bar"} 20 + {method="post", code="500", foo="blah"} 20 + {method="post", code="404", foo="blah"} 5.7142857143 + +eval instant at 0 method:http_requests:rate5m / on(method) group_right(foo) method_code:http_errors:rate5m + {method="get", code="500", foo="bar"} 25 + {method="get", code="404", foo="bar"} 20 + {method="post", code="500", foo="blah"} 20 + {method="post", code="404", foo="blah"} 5.7142857143 + +eval instant at 6m method:http_requests:rate5m / ignoring(code, foo) group_right() method_code:http_errors:rate5m + {method="get", code="500"} 5 + {method="get", code="404"} 4 + {method="post", code="500"} 1.6666666667 + {method="post", code="404"} 0.4761904762 + +eval instant at 6m method:http_requests:rate5m / on(method) group_right() method_code:http_errors:rate5m + {method="get", code="500"} 5 + {method="get", code="404"} 4 + {method="post", code="500"} 1.6666666667 + {method="post", code="404"} 0.4761904762 + +eval instant at 6m method:http_requests:rate5m / ignoring(code, foo) group_right(foo) method_code:http_errors:rate5m + {method="get", code="500", foo="bar2"} 5 + {method="get", code="404", foo="bar2"} 4 + {method="post", code="500", foo="blah"} 1.6666666667 + {method="post", code="404", foo="blah"} 0.4761904762 + +eval instant at 6m method:http_requests:rate5m / on(method) group_right(foo) method_code:http_errors:rate5m + {method="get", code="500", foo="bar2"} 5 + {method="get", code="404", foo="bar2"} 4 + {method="post", code="500", foo="blah"} 1.6666666667 + {method="post", code="404", foo="blah"} 0.4761904762 + +eval range from 0 to 6m step 6m method:http_requests:rate5m / ignoring(code, foo) group_right() method_code:http_errors:rate5m + {method="get", code="500"} 25 5 + {method="get", code="404"} 20 4 + {method="post", code="500"} 20 1.6666666667 + {method="post", code="404"} 5.7142857143 0.4761904762 + +eval range from 0 to 6m step 6m method:http_requests:rate5m / on(method) group_right() method_code:http_errors:rate5m + {method="get", code="500"} 25 5 + {method="get", code="404"} 20 4 + {method="post", code="500"} 20 1.6666666667 + {method="post", code="404"} 5.7142857143 0.4761904762 + +eval range from 0 to 6m step 6m method:http_requests:rate5m / ignoring(code, foo) group_right(foo) method_code:http_errors:rate5m + {method="get", code="500", foo="bar"} 25 _ + {method="get", code="404", foo="bar"} 20 _ + {method="get", code="500", foo="bar2"} _ 5 + {method="get", code="404", foo="bar2"} _ 4 + {method="post", code="500", foo="blah"} 20 1.6666666667 + {method="post", code="404", foo="blah"} 5.7142857143 0.4761904762 + +eval range from 0 to 6m step 6m method:http_requests:rate5m / on(method) group_right(foo) method_code:http_errors:rate5m + {method="get", code="500", foo="bar"} 25 _ + {method="get", code="404", foo="bar"} 20 _ + {method="get", code="500", foo="bar2"} _ 5 + {method="get", code="404", foo="bar2"} _ 4 + {method="post", code="500", foo="blah"} 20 1.6666666667 + {method="post", code="404", foo="blah"} 5.7142857143 0.4761904762 + +# Fail if multiple series on "one" side, even if they differ on the additional labels +# We run these tests as range queries with a single step to avoid promqltest's instant query time shifting, which makes using an explicit error message pattern more difficult. +eval_fail range from 12m to 12m step 1m method:http_requests:rate5m / ignoring(code, foo) group_right() method_code:http_errors:rate5m + expected_fail_regexp found duplicate series for the match group \{method="get"\} on the left (hand-)?side of the operation( at timestamp 1970-01-01T00:12:00Z)?: \[?\{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(,| and) \{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(\];many-to-many matching not allowed: matching labels must be unique on one side)? + +eval_fail range from 12m to 12m step 1m method:http_requests:rate5m / on(method) group_right() method_code:http_errors:rate5m + expected_fail_regexp found duplicate series for the match group \{method="get"\} on the left (hand-)?side of the operation( at timestamp 1970-01-01T00:12:00Z)?: \[?\{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(,| and) \{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(\];many-to-many matching not allowed: matching labels must be unique on one side)? + +eval_fail range from 12m to 12m step 1m method:http_requests:rate5m / ignoring(code, foo) group_right(foo) method_code:http_errors:rate5m + expected_fail_regexp found duplicate series for the match group \{method="get"\} on the left (hand-)?side of the operation( at timestamp 1970-01-01T00:12:00Z)?: \[?\{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(,| and) \{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(\];many-to-many matching not allowed: matching labels must be unique on one side)? + +eval_fail range from 12m to 12m step 1m method:http_requests:rate5m / on(method) group_right(foo) method_code:http_errors:rate5m + expected_fail_regexp found duplicate series for the match group \{method="get"\} on the left (hand-)?side of the operation( at timestamp 1970-01-01T00:12:00Z)?: \[?\{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(,| and) \{__name__="method:http_requests:rate5m", foo="(bar|bar2)", method="get"\}(\];many-to-many matching not allowed: matching labels must be unique on one side)? + +clear + +# Test group_left / group_right where the additional labels from "one" side replace labels with the same name from the the "many" side +load 6m + left{method="get", code="500", foo="left-1"} 1 + left{method="get", code="404", foo="left-2"} 2 + right{method="get", foo="right-1"} 4 + +eval instant at 0 left / on(method) group_left(foo) right + {method="get", code="500", foo="right-1"} 0.25 + {method="get", code="404", foo="right-1"} 0.5 + +eval instant at 0 right / on(method) group_right(foo) left + {method="get", code="500", foo="right-1"} 4 + {method="get", code="404", foo="right-1"} 2 + +clear + +# Test group_left / group_right where both sides contain the additional labels. +load 6m + series_a{method="get", code="500", foo="left-1"} 1 _ 10 + series_a{method="get", code="404", foo="left-2"} _ 4 20 + series_b{method="get", code="999", foo="right-1"} 4 8 40 + +eval range from 0 to 6m step 6m series_a / on(method) group_left(foo, code) series_b + {method="get", code="999", foo="right-1"} 0.25 0.5 + +eval range from 0 to 6m step 6m series_b / on(method) group_right(foo, code) series_a + {method="get", code="999", foo="right-1"} 4 2 + +# Cannot have multiple matches from the "many" side. +eval_fail instant at 12m series_a / on(method) group_left(foo, code) series_b + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +eval_fail instant at 12m series_b / on(method) group_right(foo, code) series_a + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +# Same thing, but with 'ignoring'. +eval range from 0 to 6m step 6m series_a / ignoring(code, foo) group_left(foo, code) series_b + {method="get", code="999", foo="right-1"} 0.25 0.5 + +eval range from 0 to 6m step 6m series_b / ignoring(code, foo) group_right(foo, code) series_a + {method="get", code="999", foo="right-1"} 4 2 + +# Cannot have multiple matches from the "many" side. +eval_fail instant at 12m series_a / ignoring(code, foo) group_left(foo, code) series_b + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +eval_fail instant at 12m series_b / ignoring(code, foo) group_right(foo, code) series_a + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +clear + +# Same as above, but this time where the additional labels are present on the "many" side but not the "one" side. +# (They should be taken from the "one" side.) + +load 6m + series_a{method="get", code="500", foo="left-1"} 1 _ 10 + series_a{method="get", code="404", foo="left-2"} _ 4 20 + series_b{method="get", code="999"} 4 8 40 + +eval range from 0 to 6m step 6m series_a / on(method) group_left(foo, code) series_b + {method="get", code="999"} 0.25 0.5 + +eval range from 0 to 6m step 6m series_b / on(method) group_right(foo, code) series_a + {method="get", code="999"} 4 2 + +# Cannot have multiple matches from the "many" side. +eval_fail instant at 12m series_a / on(method) group_left(foo, code) series_b + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +eval_fail instant at 12m series_b / on(method) group_right(foo, code) series_a + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +# Same thing, but with 'ignoring'. +eval range from 0 to 6m step 6m series_a / ignoring(code, foo) group_left(foo, code) series_b + {method="get", code="999"} 0.25 0.5 + +eval range from 0 to 6m step 6m series_b / ignoring(code, foo) group_right(foo, code) series_a + {method="get", code="999"} 4 2 + +# Cannot have multiple matches from the "many" side. +eval_fail instant at 12m series_a / ignoring(code, foo) group_left(foo, code) series_b + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +eval_fail instant at 12m series_b / ignoring(code, foo) group_right(foo, code) series_a + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +clear + +# Same as above, but this time the additional labels are not present on either side. +load 6m + series_a{method="get", code="500"} 1 _ 10 + series_a{method="get", code="404"} _ 4 20 + series_b{method="get", code="999"} 4 8 40 + +eval range from 0 to 6m step 6m series_a / on(method) group_left(foo, code) series_b + {method="get", code="999"} 0.25 0.5 + +eval range from 0 to 6m step 6m series_b / on(method) group_right(foo, code) series_a + {method="get", code="999"} 4 2 + +# Cannot have multiple matches from the "many" side. +eval_fail instant at 12m series_a / on(method) group_left(foo, code) series_b + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +eval_fail instant at 12m series_b / on(method) group_right(foo, code) series_a + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +# Same thing, but with 'ignoring'. +eval range from 0 to 6m step 6m series_a / ignoring(code, foo) group_left(foo, code) series_b + {method="get", code="999"} 0.25 0.5 + +eval range from 0 to 6m step 6m series_b / ignoring(code, foo) group_right(foo, code) series_a + {method="get", code="999"} 4 2 + +# Cannot have multiple matches from the "many" side. +eval_fail instant at 12m series_a / ignoring(code, foo) group_left(foo, code) series_b + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +eval_fail instant at 12m series_b / ignoring(code, foo) group_right(foo, code) series_a + expected_fail_message multiple matches for labels: grouping labels must ensure unique matches + +clear + +# Test group_left and group_right with label names in different orders in 'on' and 'ignoring'. +load 6m + left_side{env="test", pod="a", group="foo"} 1 2 3 + left_side{env="test", pod="b", group="bar"} 4 5 6 + left_side{env="prod", pod="a", group="baz"} 7 8 9 + right_side{env="test", pod="a", group="bar"} 10 20 30 + right_side{env="test", pod="b", group="baz"} 40 50 60 + right_side{env="prod", pod="a", group="foo"} 70 80 90 + +eval range from 0 to 18m step 6m left_side - on(env, pod) group_left() right_side + {env="prod", pod="a", group="baz"} -63 -72 -81 + {env="test", pod="a", group="foo"} -9 -18 -27 + {env="test", pod="b", group="bar"} -36 -45 -54 + +# Test the same thing again with the grouping labels in a different order. +# (The implementation of binary operations relies on grouping labels being sorted in some places, +# so this test exists to ensure this is done correctly.) +eval range from 0 to 18m step 6m left_side - on(pod, env) group_left() right_side + {env="prod", pod="a", group="baz"} -63 -72 -81 + {env="test", pod="a", group="foo"} -9 -18 -27 + {env="test", pod="b", group="bar"} -36 -45 -54 + +eval range from 0 to 18m step 6m left_side - ignoring(env, pod) group_left() right_side + {env="prod", pod="a", group="baz"} -33 -42 -51 + {env="test", pod="b", group="bar"} -6 -15 -24 + {env="test", pod="a", group="foo"} -69 -78 -87 + +# Test the same thing again with the grouping labels in a different order. +# (The implementation of binary operations relies on grouping labels being sorted in some places, +# so this test exists to ensure this is done correctly.) +eval range from 0 to 18m step 6m left_side - ignoring(pod, env) group_left() right_side + {env="prod", pod="a", group="baz"} -33 -42 -51 + {env="test", pod="b", group="bar"} -6 -15 -24 + {env="test", pod="a", group="foo"} -69 -78 -87 + +# Same thing, but with the additional labels given in group_left / group_right in different orders. +load 6m + many_side{group="a", idx="x"} 1 2 3 + many_side{group="b", idx="y"} 4 5 6 + one_side{group="a", env="test", pod="1"} 10 20 30 + one_side{group="b", env="prod", pod="2"} 100 110 120 + +eval range from 0 to 18m step 6m many_side - on(group) group_left(env, pod) one_side + {group="a", env="test", pod="1", idx="x"} -9 -18 -27 + {group="b", env="prod", pod="2", idx="y"} -96 -105 -114 + +eval range from 0 to 18m step 6m many_side - on(group) group_left(pod, env) one_side + {group="a", env="test", pod="1", idx="x"} -9 -18 -27 + {group="b", env="prod", pod="2", idx="y"} -96 -105 -114 + +eval range from 0 to 18m step 6m one_side - on(group) group_right(env, pod) many_side + {group="a", env="test", pod="1", idx="x"} 9 18 27 + {group="b", env="prod", pod="2", idx="y"} 96 105 114 + +eval range from 0 to 18m step 6m one_side - on(group) group_right(pod, env) many_side + {group="a", env="test", pod="1", idx="x"} 9 18 27 + {group="b", env="prod", pod="2", idx="y"} 96 105 114 + +clear + +# Binary operations on native histograms with group_left. +# We don't bother testing all the combinations of label matching, group_right etc. given that's covered by floats above. +load 5m + first_histogram{job="test"} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} + second_histogram{job="test"} {{schema:0 sum:10 count:6 buckets:[1 2 1]}} + metric{job="test"} 2 + +eval instant at 0 first_histogram + on(job) group_left second_histogram + {job="test"} {{schema:0 sum:15 count:10 buckets:[2 4 2]}} + +eval instant at 0 second_histogram - on(job) group_left first_histogram + {job="test"} {{schema:0 sum:5 count:2 buckets:[0 0 0]}} + +# Cannot multiply two histograms +eval_info instant at 0 first_histogram * on(job) group_left second_histogram + +# Cannot divide a histogram by a histogram +eval_info instant at 0 first_histogram / on(job) group_left second_histogram + +# Histogram multiplied by float +eval instant at 0 first_histogram * on(job) group_left metric + {job="test"} {{schema:0 count:8 sum:10 buckets:[2 4 2]}} + +# Works in either order +eval instant at 0 metric * on(job) group_left first_histogram + {job="test"} {{schema:0 count:8 sum:10 buckets:[2 4 2]}} + +# Histogram divide by float +eval instant at 0 first_histogram / on(job) group_left metric + {job="test"} {{schema:0 count:2 sum:2.5 buckets:[0.5 1 0.5]}} + +clear + +# Test comparison operator edge cases. +load 6m + left_side_a{env="test", pod="a"} 1 2 3 4 + left_side_b{env="test", pod="a"} 5 6 7 8 + right_side{env="test", pod="a"} 2 2 7 7 + +# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order. +#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side +# left_side_a{pod="a"} _ 2 _ _ +# left_side_b{pod="a"} _ _ 7 _ + +eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool ignoring(env) right_side + expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) + +eval_fail range from 0 to 18m step 6m right_side == ignoring(env) {__name__=~"left_side.*"} + expected_fail_regexp found duplicate series for the match group .* on the right (hand-)?side of the operation + +eval_fail range from 0 to 18m step 6m right_side == bool ignoring(env) {__name__=~"left_side.*"} + expected_fail_regexp found duplicate series for the match group .* on the right (hand-)?side of the operation + +# This should return: +# left_side_a{pod="a"} _ 2 _ _ +# left_side_b{pod="a"} _ _ 7 _ +# but instead both engines drop the metric names in the output. +# This is accepted behaviour: https://github.com/prometheus/prometheus/issues/5326 +# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order. +#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == on(pod) right_side +# {pod="a"} _ 2 7 _ + +eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool on(pod) right_side + expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) + +eval_fail range from 0 to 18m step 6m right_side == on(pod) {__name__=~"left_side.*"} + expected_fail_regexp found duplicate series for the match group .* on the right (hand-)?side of the operation + +eval_fail range from 0 to 18m step 6m right_side == bool on(pod) {__name__=~"left_side.*"} + expected_fail_regexp found duplicate series for the match group .* on the right (hand-)?side of the operation + +clear + +# If we change the data slightly... (note the second point for left_side_b is now 2) + +load 6m + left_side_a{env="test", pod="a"} 1 2 3 4 + left_side_b{env="test", pod="a"} 5 2 7 8 + right_side{env="test", pod="a"} 2 2 7 7 + +eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side + expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) + +eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool ignoring(env) right_side + expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) + +eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == on(pod) right_side + expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) + +eval_fail range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool on(pod) right_side + expected_fail_regexp (multiple matches for labels: many-to-one matching must be explicit|found duplicate series for the match group .* on the left side of the operation) + +clear + +# Same thing as above, but with the same metric name for all series on left side. +load 6m + left{pod="a"} 1 2 3 4 + left{pod="b"} 5 6 7 8 + right 2 2 7 7 + +# FIXME: MQE currently does not correctly handle this case because it performs filtering after merging input series, whereas we should do it in the other order. +# eval range from 0 to 18m step 6m left == ignoring(pod) right +# left _ 2 7 _ + +clear + +# Same thing as above, but with no overlapping samples on left side. +load 6m + left_side_a{env="test", pod="a"} 1 2 _ _ + left_side_b{env="test", pod="a"} _ _ 7 8 + right_side{env="test", pod="a"} 2 2 7 7 + +# FIXME: MQE currently does not correctly handle this case. +#eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == ignoring(env) right_side +# left_side_a{pod="a"} _ 2 _ _ +# left_side_b{pod="a"} _ _ 7 _ + +eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool ignoring(env) right_side + {pod="a"} 0 1 1 0 + +# This should return: +# left_side_a{pod="a"} _ 2 _ _ +# left_side_b{pod="a"} _ _ 7 _ +# but instead both engines drop the metric names in the output. +# This is accepted behaviour: https://github.com/prometheus/prometheus/issues/5326 +eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == on(pod) right_side + {pod="a"} _ 2 7 _ + +eval range from 0 to 18m step 6m {__name__=~"left_side.*"} == bool on(pod) right_side + {pod="a"} 0 1 1 0 + +clear + +# Comparison operations with group_left / group_right. + +load 6m + side_a{env="test", pod="a", region="au"} 1 2 3 9 10 + side_a{env="test", pod="a", region="us"} 6 7 8 4 5 + side_b{env="test", dc="1", ignored="1"} 2 _ _ 5 _ + side_b{env="test", dc="2", ignored="1"} _ 3 _ _ 6 + +eval range from 0 to 24m step 6m side_a < on(env) group_left(dc) side_b + side_a{env="test", pod="a", region="au", dc="1"} 1 _ _ _ _ + side_a{env="test", pod="a", region="au", dc="2"} _ 2 _ _ _ + side_a{env="test", pod="a", region="us", dc="1"} _ _ _ 4 _ + side_a{env="test", pod="a", region="us", dc="2"} _ _ _ _ 5 + +eval range from 0 to 24m step 6m sum without () (side_a) < on(env) group_left(dc) side_b + {env="test", pod="a", region="au", dc="1"} 1 _ _ _ _ + {env="test", pod="a", region="au", dc="2"} _ 2 _ _ _ + {env="test", pod="a", region="us", dc="1"} _ _ _ 4 _ + {env="test", pod="a", region="us", dc="2"} _ _ _ _ 5 + +eval range from 0 to 24m step 6m side_a < bool on(env) group_left(dc) side_b + {env="test", pod="a", region="au", dc="1"} 1 _ _ 0 _ + {env="test", pod="a", region="au", dc="2"} _ 1 _ _ 0 + {env="test", pod="a", region="us", dc="1"} 0 _ _ 1 _ + {env="test", pod="a", region="us", dc="2"} _ 0 _ _ 1 + +eval range from 0 to 24m step 6m side_a < ignoring(pod, region, dc, ignored) group_left(dc) side_b + side_a{env="test", pod="a", region="au", dc="1"} 1 _ _ _ _ + side_a{env="test", pod="a", region="au", dc="2"} _ 2 _ _ _ + side_a{env="test", pod="a", region="us", dc="1"} _ _ _ 4 _ + side_a{env="test", pod="a", region="us", dc="2"} _ _ _ _ 5 + +eval range from 0 to 24m step 6m sum without () (side_a) < ignoring(pod, region, dc, ignored) group_left(dc) side_b + {env="test", pod="a", region="au", dc="1"} 1 _ _ _ _ + {env="test", pod="a", region="au", dc="2"} _ 2 _ _ _ + {env="test", pod="a", region="us", dc="1"} _ _ _ 4 _ + {env="test", pod="a", region="us", dc="2"} _ _ _ _ 5 + +eval range from 0 to 24m step 6m side_a < bool ignoring(pod, region, dc, ignored) group_left(dc) side_b + {env="test", pod="a", region="au", dc="1"} 1 _ _ 0 _ + {env="test", pod="a", region="au", dc="2"} _ 1 _ _ 0 + {env="test", pod="a", region="us", dc="1"} 0 _ _ 1 _ + {env="test", pod="a", region="us", dc="2"} _ 0 _ _ 1 + +# The docs say this should return series with name "side_b" from the left, but it is accepted that this will return +# "side_a" from the right: see https://github.com/prometheus/prometheus/issues/15471. +eval range from 0 to 24m step 6m side_b > on(env) group_right(dc) side_a + side_a{env="test", pod="a", region="au", dc="1"} 2 _ _ _ _ + side_a{env="test", pod="a", region="au", dc="2"} _ 3 _ _ _ + side_a{env="test", pod="a", region="us", dc="1"} _ _ _ 5 _ + side_a{env="test", pod="a", region="us", dc="2"} _ _ _ _ 6 + +# The docs say this should return series with no name, but it is accepted that this will return +# "side_a" from the right: see https://github.com/prometheus/prometheus/issues/15471. +eval range from 0 to 24m step 6m sum without () (side_b) > on(env) group_right(dc) side_a + side_a{env="test", pod="a", region="au", dc="1"} 2 _ _ _ _ + side_a{env="test", pod="a", region="au", dc="2"} _ 3 _ _ _ + side_a{env="test", pod="a", region="us", dc="1"} _ _ _ 5 _ + side_a{env="test", pod="a", region="us", dc="2"} _ _ _ _ 6 + +eval range from 0 to 24m step 6m side_b > bool on(env) group_right(dc) side_a + {env="test", pod="a", region="au", dc="1"} 1 _ _ 0 _ + {env="test", pod="a", region="au", dc="2"} _ 1 _ _ 0 + {env="test", pod="a", region="us", dc="1"} 0 _ _ 1 _ + {env="test", pod="a", region="us", dc="2"} _ 0 _ _ 1 + +# The docs say this should return series with name "side_b" from the left, but it is accepted that this will return +# "side_a" from the right: see https://github.com/prometheus/prometheus/issues/15471. +eval range from 0 to 24m step 6m side_b > ignoring(pod, region, dc, ignored) group_right(dc) side_a + side_a{env="test", pod="a", region="au", dc="1"} 2 _ _ _ _ + side_a{env="test", pod="a", region="au", dc="2"} _ 3 _ _ _ + side_a{env="test", pod="a", region="us", dc="1"} _ _ _ 5 _ + side_a{env="test", pod="a", region="us", dc="2"} _ _ _ _ 6 + +# The docs say this should return series with no name (ie. the metric name from the left), but it is accepted that this will return +# "side_a" from the right: see https://github.com/prometheus/prometheus/issues/15471. +eval range from 0 to 24m step 6m sum without () (side_b) > ignoring(pod, region, dc, ignored) group_right(dc) side_a + side_a{env="test", pod="a", region="au", dc="1"} 2 _ _ _ _ + side_a{env="test", pod="a", region="au", dc="2"} _ 3 _ _ _ + side_a{env="test", pod="a", region="us", dc="1"} _ _ _ 5 _ + side_a{env="test", pod="a", region="us", dc="2"} _ _ _ _ 6 + +eval range from 0 to 24m step 6m side_b > bool ignoring(pod, region, dc, ignored) group_right(dc) side_a + {env="test", pod="a", region="au", dc="1"} 1 _ _ 0 _ + {env="test", pod="a", region="au", dc="2"} _ 1 _ _ 0 + {env="test", pod="a", region="us", dc="1"} 0 _ _ 1 _ + {env="test", pod="a", region="us", dc="2"} _ 0 _ _ 1 diff --git a/pkg/streamingpromql/testdata/upstream/collision.test b/pkg/streamingpromql/testdata/upstream/collision.test index c5484d19adc..11401326159 100644 --- a/pkg/streamingpromql/testdata/upstream/collision.test +++ b/pkg/streamingpromql/testdata/upstream/collision.test @@ -10,11 +10,10 @@ load 1s node_cpu_seconds_total{cpu="35",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"} 449 node_cpu_seconds_total{cpu="89",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"} 449 -# Unsupported by streaming engine. -# eval instant at 4s count by(namespace, pod, cpu) (node_cpu_seconds_total{cpu=~".*",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v"}) * on(namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{namespace="observability",pod="node-exporter-l454v"} -# {cpu="10",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1 -# {cpu="35",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1 -# {cpu="89",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1 +eval instant at 4s count by(namespace, pod, cpu) (node_cpu_seconds_total{cpu=~".*",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v"}) * on(namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{namespace="observability",pod="node-exporter-l454v"} + {cpu="10",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1 + {cpu="35",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1 + {cpu="89",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1 clear diff --git a/pkg/streamingpromql/testdata/upstream/operators.test b/pkg/streamingpromql/testdata/upstream/operators.test index 97aac16c642..1b8da810caa 100644 --- a/pkg/streamingpromql/testdata/upstream/operators.test +++ b/pkg/streamingpromql/testdata/upstream/operators.test @@ -334,98 +334,82 @@ load 5m threshold{instance="abc",job="node",target="a@b.com"} 0 # Copy machine role to node variable. -# Unsupported by streaming engine. -# eval instant at 1m node_role * on (instance) group_right (role) node_var -# {instance="abc",job="node",role="prometheus"} 2 +eval instant at 1m node_role * on (instance) group_right (role) node_var + {instance="abc",job="node",role="prometheus"} 2 -# Unsupported by streaming engine. -# eval instant at 1m node_var * on (instance) group_left (role) node_role -# {instance="abc",job="node",role="prometheus"} 2 +eval instant at 1m node_var * on (instance) group_left (role) node_role + {instance="abc",job="node",role="prometheus"} 2 -# Unsupported by streaming engine. -# eval instant at 1m node_var * ignoring (role) group_left (role) node_role -# {instance="abc",job="node",role="prometheus"} 2 +eval instant at 1m node_var * ignoring (role) group_left (role) node_role + {instance="abc",job="node",role="prometheus"} 2 -# Unsupported by streaming engine. -# eval instant at 1m node_role * ignoring (role) group_right (role) node_var -# {instance="abc",job="node",role="prometheus"} 2 +eval instant at 1m node_role * ignoring (role) group_right (role) node_var + {instance="abc",job="node",role="prometheus"} 2 # Copy machine role to node variable with instrumentation labels. -# Unsupported by streaming engine. -# eval instant at 1m node_cpu * ignoring (role, mode) group_left (role) node_role -# {instance="abc",job="node",mode="idle",role="prometheus"} 3 -# {instance="abc",job="node",mode="user",role="prometheus"} 1 +eval instant at 1m node_cpu * ignoring (role, mode) group_left (role) node_role + {instance="abc",job="node",mode="idle",role="prometheus"} 3 + {instance="abc",job="node",mode="user",role="prometheus"} 1 -# Unsupported by streaming engine. -# eval instant at 1m node_cpu * on (instance) group_left (role) node_role -# {instance="abc",job="node",mode="idle",role="prometheus"} 3 -# {instance="abc",job="node",mode="user",role="prometheus"} 1 +eval instant at 1m node_cpu * on (instance) group_left (role) node_role + {instance="abc",job="node",mode="idle",role="prometheus"} 3 + {instance="abc",job="node",mode="user",role="prometheus"} 1 # Ratio of total. -# Unsupported by streaming engine. -# eval instant at 1m node_cpu / on (instance) group_left sum by (instance,job)(node_cpu) -# {instance="abc",job="node",mode="idle"} .75 -# {instance="abc",job="node",mode="user"} .25 -# {instance="def",job="node",mode="idle"} .80 -# {instance="def",job="node",mode="user"} .20 - -# Unsupported by streaming engine. -# eval instant at 1m sum by (mode, job)(node_cpu) / on (job) group_left sum by (job)(node_cpu) -# {job="node",mode="idle"} 0.7857142857142857 -# {job="node",mode="user"} 0.21428571428571427 - -# Unsupported by streaming engine. -# eval instant at 1m sum(sum by (mode, job)(node_cpu) / on (job) group_left sum by (job)(node_cpu)) -# {} 1.0 - - -# Unsupported by streaming engine. -# eval instant at 1m node_cpu / ignoring (mode) group_left sum without (mode)(node_cpu) -# {instance="abc",job="node",mode="idle"} .75 -# {instance="abc",job="node",mode="user"} .25 -# {instance="def",job="node",mode="idle"} .80 -# {instance="def",job="node",mode="user"} .20 - -# Unsupported by streaming engine. -# eval instant at 1m node_cpu / ignoring (mode) group_left(dummy) sum without (mode)(node_cpu) -# {instance="abc",job="node",mode="idle"} .75 -# {instance="abc",job="node",mode="user"} .25 -# {instance="def",job="node",mode="idle"} .80 -# {instance="def",job="node",mode="user"} .20 - -# Unsupported by streaming engine. -# eval instant at 1m sum without (instance)(node_cpu) / ignoring (mode) group_left sum without (instance, mode)(node_cpu) -# {job="node",mode="idle"} 0.7857142857142857 -# {job="node",mode="user"} 0.21428571428571427 - -# Unsupported by streaming engine. -# eval instant at 1m sum(sum without (instance)(node_cpu) / ignoring (mode) group_left sum without (instance, mode)(node_cpu)) -# {} 1.0 +eval instant at 1m node_cpu / on (instance) group_left sum by (instance,job)(node_cpu) + {instance="abc",job="node",mode="idle"} .75 + {instance="abc",job="node",mode="user"} .25 + {instance="def",job="node",mode="idle"} .80 + {instance="def",job="node",mode="user"} .20 + +eval instant at 1m sum by (mode, job)(node_cpu) / on (job) group_left sum by (job)(node_cpu) + {job="node",mode="idle"} 0.7857142857142857 + {job="node",mode="user"} 0.21428571428571427 + +eval instant at 1m sum(sum by (mode, job)(node_cpu) / on (job) group_left sum by (job)(node_cpu)) + {} 1.0 + + +eval instant at 1m node_cpu / ignoring (mode) group_left sum without (mode)(node_cpu) + {instance="abc",job="node",mode="idle"} .75 + {instance="abc",job="node",mode="user"} .25 + {instance="def",job="node",mode="idle"} .80 + {instance="def",job="node",mode="user"} .20 + +eval instant at 1m node_cpu / ignoring (mode) group_left(dummy) sum without (mode)(node_cpu) + {instance="abc",job="node",mode="idle"} .75 + {instance="abc",job="node",mode="user"} .25 + {instance="def",job="node",mode="idle"} .80 + {instance="def",job="node",mode="user"} .20 + +eval instant at 1m sum without (instance)(node_cpu) / ignoring (mode) group_left sum without (instance, mode)(node_cpu) + {job="node",mode="idle"} 0.7857142857142857 + {job="node",mode="user"} 0.21428571428571427 + +eval instant at 1m sum(sum without (instance)(node_cpu) / ignoring (mode) group_left sum without (instance, mode)(node_cpu)) + {} 1.0 # Copy over label from metric with no matching labels, without having to list cross-job target labels ('job' here). -# Unsupported by streaming engine. -# eval instant at 1m node_cpu + on(dummy) group_left(foo) random*0 -# {instance="abc",job="node",mode="idle",foo="bar"} 3 -# {instance="abc",job="node",mode="user",foo="bar"} 1 -# {instance="def",job="node",mode="idle",foo="bar"} 8 -# {instance="def",job="node",mode="user",foo="bar"} 2 +eval instant at 1m node_cpu + on(dummy) group_left(foo) random*0 + {instance="abc",job="node",mode="idle",foo="bar"} 3 + {instance="abc",job="node",mode="user",foo="bar"} 1 + {instance="def",job="node",mode="idle",foo="bar"} 8 + {instance="def",job="node",mode="user",foo="bar"} 2 # Use threshold from metric, and copy over target. -# Unsupported by streaming engine. -# eval instant at 1m node_cpu > on(job, instance) group_left(target) threshold -# node_cpu{instance="abc",job="node",mode="idle",target="a@b.com"} 3 -# node_cpu{instance="abc",job="node",mode="user",target="a@b.com"} 1 +eval instant at 1m node_cpu > on(job, instance) group_left(target) threshold + node_cpu{instance="abc",job="node",mode="idle",target="a@b.com"} 3 + node_cpu{instance="abc",job="node",mode="user",target="a@b.com"} 1 # Use threshold from metric, and a default (1) if it's not present. -# Unsupported by streaming engine. -# eval instant at 1m node_cpu > on(job, instance) group_left(target) (threshold or on (job, instance) (sum by (job, instance)(node_cpu) * 0 + 1)) -# node_cpu{instance="abc",job="node",mode="idle",target="a@b.com"} 3 -# node_cpu{instance="abc",job="node",mode="user",target="a@b.com"} 1 -# node_cpu{instance="def",job="node",mode="idle"} 8 -# node_cpu{instance="def",job="node",mode="user"} 2 +eval instant at 1m node_cpu > on(job, instance) group_left(target) (threshold or on (job, instance) (sum by (job, instance)(node_cpu) * 0 + 1)) + node_cpu{instance="abc",job="node",mode="idle",target="a@b.com"} 3 + node_cpu{instance="abc",job="node",mode="user",target="a@b.com"} 1 + node_cpu{instance="def",job="node",mode="idle"} 8 + node_cpu{instance="def",job="node",mode="user"} 2 # Check that binops drop the metric name. diff --git a/pkg/streamingpromql/types/limiting_pool.go b/pkg/streamingpromql/types/limiting_pool.go index d7efe165a81..2a614f4b798 100644 --- a/pkg/streamingpromql/types/limiting_pool.go +++ b/pkg/streamingpromql/types/limiting_pool.go @@ -23,6 +23,7 @@ const ( HPointSize = uint64(FPointSize * nativeHistogramSampleSizeFactor) VectorSampleSize = uint64(unsafe.Sizeof(promql.Sample{})) // This assumes each sample is a float sample, not a histogram. Float64Size = uint64(unsafe.Sizeof(float64(0))) + IntSize = uint64(unsafe.Sizeof(int(0))) BoolSize = uint64(unsafe.Sizeof(false)) HistogramPointerSize = uint64(unsafe.Sizeof((*histogram.FloatHistogram)(nil))) StringSize = uint64(unsafe.Sizeof("")) @@ -73,6 +74,15 @@ var ( nil, ) + IntSlicePool = NewLimitingBucketedPool( + pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []int { + return make([]int, 0, size) + }), + IntSize, + true, + nil, + ) + BoolSlicePool = NewLimitingBucketedPool( pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []bool { return make([]bool, 0, size)