diff --git a/pkg/ruler/fixtures/rules_self_and_same_name_reference.yaml b/pkg/ruler/fixtures/rules_self_and_same_name_reference.yaml new file mode 100644 index 0000000000..f50c9f23d3 --- /dev/null +++ b/pkg/ruler/fixtures/rules_self_and_same_name_reference.yaml @@ -0,0 +1,21 @@ +groups: + - name: self_and_same_name_reference + rules: + # Because of these two rules, this rule group cannot be evaluated concurrently. + # While they are independent they have the same name, and they both self-reference. + # The expressions' labels are not parsed to determine dependencies, only the names. + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total{job="job1"}[1m])) or last_over_time(job:http_requests:rate1m{job="job1"}[1m]) + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total{job="job2"}[1m])) or last_over_time(job:http_requests:rate1m{job="job2"}[1m]) + + - record: job1:http_requests:rate1m + expr: job:http_requests:rate1m{job="job1"} + - record: job1_cluster1:http_requests:rate1m + expr: job1:http_requests:rate1m{cluster="cluster1"} + - record: job1_cluster2:http_requests:rate1m + expr: job1:http_requests:rate1m{cluster="cluster2"} + - record: job1_cluster1_namespace1:http_requests:rate1m + expr: job1_cluster1:http_requests:rate1m{namespace="namespace1"} + - record: job1_cluster1_namespace2:http_requests:rate1m + expr: job1_cluster1:http_requests:rate1m{namespace="namespace2"} diff --git a/pkg/ruler/fixtures/rules_self_reference.yaml b/pkg/ruler/fixtures/rules_self_reference.yaml new file mode 100644 index 0000000000..933e043e92 --- /dev/null +++ b/pkg/ruler/fixtures/rules_self_reference.yaml @@ -0,0 +1,20 @@ +groups: + - name: self_reference + rules: + # Evaluated concurrently, no dependencies + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) or last_over_time(job:http_requests:rate1m[1m]) # New value or last value + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) or last_over_time(job:http_requests:rate5m[5m]) # New value or last value + + # These rules will be grouped into batches of independent rules + - record: job1:http_requests:rate1m + expr: job:http_requests:rate1m{job="job1"} + - record: job1_cluster1:http_requests:rate1m + expr: job1:http_requests:rate1m{cluster="cluster1"} + - record: job1_cluster2:http_requests:rate1m + expr: job1:http_requests:rate1m{cluster="cluster2"} + - record: job1_cluster1_namespace1:http_requests:rate1m + expr: job1_cluster1:http_requests:rate1m{namespace="namespace1"} + - record: job1_cluster1_namespace2:http_requests:rate1m + expr: job1_cluster1:http_requests:rate1m{namespace="namespace2"} diff --git a/pkg/ruler/rule_concurrency.go b/pkg/ruler/rule_concurrency.go index 09e721ff08..236342e0f9 100644 --- a/pkg/ruler/rule_concurrency.go +++ b/pkg/ruler/rule_concurrency.go @@ -209,22 +209,34 @@ func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g // This batch holds the rules that have no dependencies and will be run first. firstBatch := rules.ConcurrentRules{} for i, r := range g.Rules() { - if r.NoDependencyRules() { - firstBatch = append(firstBatch, i) - continue + dependencies := r.DependencyRules() + if dependencies == nil { + // This means that dependencies were not calculated. + level.Warn(logger).Log("msg", "Dependencies were not calculated for at least one rule, falling back to sequential rule evaluation.") + return nil } + // Initialize the rule info with the rule's dependencies. // Use a copy of the dependencies to avoid mutating the rule. info := ruleInfo{ruleIdx: i, unevaluatedDependencies: map[rules.Rule]struct{}{}} - for _, dep := range r.DependencyRules() { + for _, dep := range dependencies { + if dep == r { + // Ignore self-references. + continue + } info.unevaluatedDependencies[dep] = struct{}{} } + + if len(info.unevaluatedDependencies) == 0 { + firstBatch = append(firstBatch, i) + continue + } + remainingRules[r] = info } if len(firstBatch) == 0 { // There are no rules without dependencies. - // Fall back to sequential evaluation. - level.Info(logger).Log("msg", "No rules without dependencies found, falling back to sequential rule evaluation.") + level.Warn(logger).Log("msg", "No rules without dependencies found, falling back to sequential rule evaluation.") return nil } result := []rules.ConcurrentRules{firstBatch} @@ -253,7 +265,6 @@ func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g if len(batch) == 0 { // There is a cycle in the rules' dependencies. // We can't evaluate them concurrently. - // Fall back to sequential evaluation. level.Warn(logger).Log("msg", "Cyclic rule dependencies detected, falling back to sequential rule evaluation") return nil } diff --git a/pkg/ruler/rule_concurrency_test.go b/pkg/ruler/rule_concurrency_test.go index 36a88c1d30..3b555cc897 100644 --- a/pkg/ruler/rule_concurrency_test.go +++ b/pkg/ruler/rule_concurrency_test.go @@ -215,6 +215,19 @@ var splitToBatchesTestCases = map[string]struct { {43, 44, 47, 48, 49, 50, 53, 54, 57}, }, }, + "self_reference": { + inputFile: "fixtures/rules_self_reference.yaml", + expectedGroups: []rules.ConcurrentRules{ + {0, 1}, + {2}, + {3, 4}, + {5, 6}, + }, + }, + "self_and_same_name_reference": { + inputFile: "fixtures/rules_self_and_same_name_reference.yaml", + expectedGroups: nil, + }, } func TestSplitGroupIntoBatches(t *testing.T) {