Skip to content

Commit

Permalink
Rule Concurrency: Support self references (#10431)
Browse files Browse the repository at this point in the history
* Rule Concurrency: Support self references
After deployment of to dev, we are getting some of these warnings:
```
Cyclic rule dependencies detected, falling back to sequential rule evaluation
```

It turns out to be a recording rule that is self referencing (using `last_over_time` to fall back to its last value).

Whenever a rule's unevaluated dependencies consist of only itself, it is OK to be run. A caveat is that if there are two rules recording to a metric with the same name, but with different labels, the dependency tree cannot be built and concurrency is not possible

* Warn for all unexpected outcomes
  • Loading branch information
julienduchesne authored Jan 14, 2025
1 parent 2a289be commit 386f2a9
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 7 deletions.
21 changes: 21 additions & 0 deletions pkg/ruler/fixtures/rules_self_and_same_name_reference.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
groups:
- name: self_and_same_name_reference
rules:
# Because of these two rules, this rule group cannot be evaluated concurrently.
# While they are independent they have the same name, and they both self-reference.
# The expressions' labels are not parsed to determine dependencies, only the names.
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total{job="job1"}[1m])) or last_over_time(job:http_requests:rate1m{job="job1"}[1m])
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total{job="job2"}[1m])) or last_over_time(job:http_requests:rate1m{job="job2"}[1m])

- record: job1:http_requests:rate1m
expr: job:http_requests:rate1m{job="job1"}
- record: job1_cluster1:http_requests:rate1m
expr: job1:http_requests:rate1m{cluster="cluster1"}
- record: job1_cluster2:http_requests:rate1m
expr: job1:http_requests:rate1m{cluster="cluster2"}
- record: job1_cluster1_namespace1:http_requests:rate1m
expr: job1_cluster1:http_requests:rate1m{namespace="namespace1"}
- record: job1_cluster1_namespace2:http_requests:rate1m
expr: job1_cluster1:http_requests:rate1m{namespace="namespace2"}
20 changes: 20 additions & 0 deletions pkg/ruler/fixtures/rules_self_reference.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
groups:
- name: self_reference
rules:
# Evaluated concurrently, no dependencies
- record: job:http_requests:rate1m
expr: sum by (job)(rate(http_requests_total[1m])) or last_over_time(job:http_requests:rate1m[1m]) # New value or last value
- record: job:http_requests:rate5m
expr: sum by (job)(rate(http_requests_total[5m])) or last_over_time(job:http_requests:rate5m[5m]) # New value or last value

# These rules will be grouped into batches of independent rules
- record: job1:http_requests:rate1m
expr: job:http_requests:rate1m{job="job1"}
- record: job1_cluster1:http_requests:rate1m
expr: job1:http_requests:rate1m{cluster="cluster1"}
- record: job1_cluster2:http_requests:rate1m
expr: job1:http_requests:rate1m{cluster="cluster2"}
- record: job1_cluster1_namespace1:http_requests:rate1m
expr: job1_cluster1:http_requests:rate1m{namespace="namespace1"}
- record: job1_cluster1_namespace2:http_requests:rate1m
expr: job1_cluster1:http_requests:rate1m{namespace="namespace2"}
25 changes: 18 additions & 7 deletions pkg/ruler/rule_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,22 +209,34 @@ func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g
// This batch holds the rules that have no dependencies and will be run first.
firstBatch := rules.ConcurrentRules{}
for i, r := range g.Rules() {
if r.NoDependencyRules() {
firstBatch = append(firstBatch, i)
continue
dependencies := r.DependencyRules()
if dependencies == nil {
// This means that dependencies were not calculated.
level.Warn(logger).Log("msg", "Dependencies were not calculated for at least one rule, falling back to sequential rule evaluation.")
return nil
}

// Initialize the rule info with the rule's dependencies.
// Use a copy of the dependencies to avoid mutating the rule.
info := ruleInfo{ruleIdx: i, unevaluatedDependencies: map[rules.Rule]struct{}{}}
for _, dep := range r.DependencyRules() {
for _, dep := range dependencies {
if dep == r {
// Ignore self-references.
continue
}
info.unevaluatedDependencies[dep] = struct{}{}
}

if len(info.unevaluatedDependencies) == 0 {
firstBatch = append(firstBatch, i)
continue
}

remainingRules[r] = info
}
if len(firstBatch) == 0 {
// There are no rules without dependencies.
// Fall back to sequential evaluation.
level.Info(logger).Log("msg", "No rules without dependencies found, falling back to sequential rule evaluation.")
level.Warn(logger).Log("msg", "No rules without dependencies found, falling back to sequential rule evaluation.")
return nil
}
result := []rules.ConcurrentRules{firstBatch}
Expand Down Expand Up @@ -253,7 +265,6 @@ func (c *TenantConcurrencyController) SplitGroupIntoBatches(_ context.Context, g
if len(batch) == 0 {
// There is a cycle in the rules' dependencies.
// We can't evaluate them concurrently.
// Fall back to sequential evaluation.
level.Warn(logger).Log("msg", "Cyclic rule dependencies detected, falling back to sequential rule evaluation")
return nil
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/ruler/rule_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ var splitToBatchesTestCases = map[string]struct {
{43, 44, 47, 48, 49, 50, 53, 54, 57},
},
},
"self_reference": {
inputFile: "fixtures/rules_self_reference.yaml",
expectedGroups: []rules.ConcurrentRules{
{0, 1},
{2},
{3, 4},
{5, 6},
},
},
"self_and_same_name_reference": {
inputFile: "fixtures/rules_self_and_same_name_reference.yaml",
expectedGroups: nil,
},
}

func TestSplitGroupIntoBatches(t *testing.T) {
Expand Down

0 comments on commit 386f2a9

Please sign in to comment.