Skip to content

Commit

Permalink
ruler: add "user" and "reason" labels to cortex_ruler_write and corte…
Browse files Browse the repository at this point in the history
…x_ruler_queries metrics

Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo committed Jan 30, 2025
1 parent 1712177 commit 47b0bc6
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 53 deletions.
59 changes: 30 additions & 29 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -38,8 +37,8 @@ type Pusher interface {
}

type PusherAppender struct {
failedWrites prometheus.Counter
totalWrites prometheus.Counter
failedWrites *prometheus.CounterVec
totalWrites *prometheus.CounterVec

ctx context.Context
pusher Pusher
Expand Down Expand Up @@ -91,7 +90,7 @@ func (a *PusherAppender) AppendHistogramCTZeroSample(storage.SeriesRef, labels.L
}

func (a *PusherAppender) Commit() error {
a.totalWrites.Inc()
a.totalWrites.WithLabelValues(a.userID).Inc()

// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
// We shouldn't call client.ReuseSlice here.
Expand All @@ -100,11 +99,13 @@ func (a *PusherAppender) Commit() error {
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req)

if err != nil {
// Don't report client errors, which are the same ones that would be reported with 4xx HTTP status code
// (e.g. series limits, duplicate samples, out of order, etc.)
if !mimirpb.IsClientError(err) {
a.failedWrites.Inc()
failureReason := "error"
if mimirpb.IsClientError(err) {
// Client errors, which are the same ones that would be reported with 4xx HTTP status code
// (e.g. series limits, duplicate samples, out of order, etc.) are reported with a separate reason.
failureReason = "4xx"
}
a.failedWrites.WithLabelValues(a.userID, failureReason).Inc()
}

a.labels = nil
Expand All @@ -123,11 +124,11 @@ type PusherAppendable struct {
pusher Pusher
userID string

totalWrites prometheus.Counter
failedWrites prometheus.Counter
totalWrites *prometheus.CounterVec
failedWrites *prometheus.CounterVec
}

func NewPusherAppendable(pusher Pusher, userID string, totalWrites, failedWrites prometheus.Counter) *PusherAppendable {
func NewPusherAppendable(pusher Pusher, userID string, totalWrites, failedWrites *prometheus.CounterVec) *PusherAppendable {
return &PusherAppendable{
pusher: pusher,
userID: userID,
Expand Down Expand Up @@ -209,16 +210,16 @@ type RulesLimits interface {
RulerMaxIndependentRuleEvaluationConcurrencyPerTenant(userID string) int64
}

func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter, remoteQuerier bool) rules.QueryFunc {
func MetricsQueryFunc(qf rules.QueryFunc, userID string, queries, failedQueries *prometheus.CounterVec, remoteQuerier bool) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
queries.Inc()
queries.WithLabelValues(userID).Inc()

result, err := qf(ctx, qs, t)
if err == nil {
return result, nil
}

// We only care about errors returned by underlying Queryable. Errors returned by PromQL engine are "user-errors",
// and not interesting here.
failureReason := "error"
qerr := QueryableError{}
if errors.As(err, &qerr) {
origErr := qerr.Unwrap()
Expand All @@ -233,17 +234,17 @@ func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Coun
// All errors will still be counted towards "evaluation failures" metrics and logged by Prometheus Ruler,
// but we only want internal errors here.
if _, ok := querier.TranslateToPromqlAPIError(origErr).(promql.ErrStorage); ok {
failedQueries.Inc()
failedQueries.WithLabelValues(userID, failureReason).Inc()
}

// Return unwrapped error.
return result, origErr
} else if remoteQuerier {
// When remote querier enabled, consider anything an error except those with 4xx status code.
st, ok := grpcutil.ErrorToStatus(err)
if !(ok && st.Code()/100 == 4) {
failedQueries.Inc()
// When remote querier enabled, consider anything an "error" except those with 4xx status code.
if mimirpb.IsClientError(err) {
failureReason = "4xx"
}
failedQueries.WithLabelValues(userID, failureReason).Inc()
}
return result, err
}
Expand Down Expand Up @@ -329,23 +330,23 @@ func DefaultTenantManagerFactory(
overrides RulesLimits,
reg prometheus.Registerer,
) ManagerFactory {
totalWrites := promauto.With(reg).NewCounter(prometheus.CounterOpts{
totalWrites := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_total",
Help: "Number of write requests to ingesters.",
})
failedWrites := promauto.With(reg).NewCounter(prometheus.CounterOpts{
}, []string{"user"})
failedWrites := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_failed_total",
Help: "Number of failed write requests to ingesters.",
})
}, []string{"user", "reason"})

totalQueries := promauto.With(reg).NewCounter(prometheus.CounterOpts{
totalQueries := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_total",
Help: "Number of queries executed by ruler.",
})
failedQueries := promauto.With(reg).NewCounter(prometheus.CounterOpts{
}, []string{"user"})
failedQueries := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_failed_total",
Help: "Number of failed queries by ruler.",
})
}, []string{"user", "reason"})
var rulerQuerySeconds *prometheus.CounterVec
var zeroFetchedSeriesQueries *prometheus.CounterVec
if cfg.EnableQueryStats {
Expand All @@ -368,7 +369,7 @@ func DefaultTenantManagerFactory(

// Wrap the query function with our custom logic.
wrappedQueryFunc := WrapQueryFuncWithReadConsistency(queryFunc, logger)
wrappedQueryFunc = MetricsQueryFunc(wrappedQueryFunc, totalQueries, failedQueries, cfg.QueryFrontend.Address != "")
wrappedQueryFunc = MetricsQueryFunc(wrappedQueryFunc, userID, totalQueries, failedQueries, cfg.QueryFrontend.Address != "")
wrappedQueryFunc = RecordAndReportRuleQueryMetrics(wrappedQueryFunc, queryTime, zeroFetchedSeriesCount, logger)

// Wrap the queryable with our custom logic.
Expand Down
66 changes: 42 additions & 24 deletions pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ func (p *fakePusher) Push(_ context.Context, r *mimirpb.WriteRequest) (*mimirpb.

func TestPusherAppendable(t *testing.T) {
pusher := &fakePusher{}
pa := NewPusherAppendable(pusher, "user-1", promauto.With(nil).NewCounter(prometheus.CounterOpts{}), promauto.With(nil).NewCounter(prometheus.CounterOpts{}))
pa := NewPusherAppendable(
pusher,
"user-1",
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user", "reason"}),
)

type sample struct {
series string
Expand Down Expand Up @@ -218,28 +223,31 @@ func TestPusherAppendable(t *testing.T) {

func TestPusherErrors(t *testing.T) {
for name, tc := range map[string]struct {
returnedError error
expectedWrites int
expectedFailures int
returnedError error
expectedWrites int
expectedFailures int
expectedFailureReason string // default to "error"
}{
"no error": {
expectedWrites: 1,
expectedFailures: 0,
},
"a 400 HTTPgRPC error is not reported as failure": {
returnedError: httpgrpc.Errorf(http.StatusBadRequest, "test error"),
expectedWrites: 1,
expectedFailures: 0,
"a 400 HTTPgRPC error is reported as client failure": {
returnedError: httpgrpc.Errorf(http.StatusBadRequest, "test error"),
expectedWrites: 1,
expectedFailures: 1,
expectedFailureReason: "4xx",
},
"a 500 HTTPgRPC error is reported as failure": {
returnedError: httpgrpc.Errorf(http.StatusInternalServerError, "test error"),
expectedWrites: 1,
expectedFailures: 1,
},
"a BAD_DATA push error is not reported as failure": {
returnedError: mustStatusWithDetails(codes.FailedPrecondition, mimirpb.BAD_DATA).Err(),
expectedWrites: 1,
expectedFailures: 0,
"a BAD_DATA push error is reported as client failure": {
returnedError: mustStatusWithDetails(codes.FailedPrecondition, mimirpb.BAD_DATA).Err(),
expectedWrites: 1,
expectedFailures: 1,
expectedFailureReason: "4xx",
},
"a METHOD_NOT_ALLOWED push error is reported as failure": {
returnedError: mustStatusWithDetails(codes.Unimplemented, mimirpb.METHOD_NOT_ALLOWED).Err(),
Expand All @@ -262,8 +270,8 @@ func TestPusherErrors(t *testing.T) {

pusher := &fakePusher{err: tc.returnedError, response: &mimirpb.WriteResponse{}}

writes := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
failures := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
writes := promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"})
failures := promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user", "reason"})
pa := NewPusherAppendable(pusher, "user-1", writes, failures)

lbls, err := parser.ParseMetric("foo_bar")
Expand All @@ -275,8 +283,13 @@ func TestPusherErrors(t *testing.T) {

require.Equal(t, tc.returnedError, a.Commit())

require.Equal(t, tc.expectedWrites, int(testutil.ToFloat64(writes)))
require.Equal(t, tc.expectedFailures, int(testutil.ToFloat64(failures)))
require.Equal(t, tc.expectedWrites, int(testutil.ToFloat64(writes.WithLabelValues("user-1"))))

expectedFailureReason := tc.expectedFailureReason
if expectedFailureReason == "" {
expectedFailureReason = "error"
}
require.Equal(t, tc.expectedFailures, int(testutil.ToFloat64(failures.WithLabelValues("user-1", expectedFailureReason))))
})
}
}
Expand Down Expand Up @@ -331,6 +344,7 @@ func TestMetricsQueryFuncErrors(t *testing.T) {
expectedError error
expectedQueries int
expectedFailedQueries int
expectedFailedReason string // default "error"
remoteQuerier bool
}
// Add special cases to test first.
Expand All @@ -339,18 +353,17 @@ func TestMetricsQueryFuncErrors(t *testing.T) {
returnedError: httpgrpc.Errorf(http.StatusBadRequest, "test error"),
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "test error"),
expectedQueries: 1,
expectedFailedQueries: 0, // 400 errors not reported as failures.
expectedFailedQueries: 1,
expectedFailedReason: "4xx", // 400 errors coming from remote querier are reported as their own reason.
remoteQuerier: true,
},

"httpgrpc 500 error": {
returnedError: httpgrpc.Errorf(http.StatusInternalServerError, "test error"),
expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "test error"),
expectedQueries: 1,
expectedFailedQueries: 1, // 500 errors are failures
remoteQuerier: true,
},

"unknown but non-queryable error from remote": {
returnedError: errors.New("test error"),
expectedError: errors.New("test error"),
Expand Down Expand Up @@ -379,19 +392,24 @@ func TestMetricsQueryFuncErrors(t *testing.T) {
}
for name, tc := range allCases {
t.Run(name, func(t *testing.T) {
queries := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
failures := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
queries := promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"})
failures := promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user", "reason"})

mockFunc := func(context.Context, string, time.Time) (promql.Vector, error) {
return promql.Vector{}, tc.returnedError
}
qf := MetricsQueryFunc(mockFunc, queries, failures, tc.remoteQuerier)
qf := MetricsQueryFunc(mockFunc, "user-1", queries, failures, tc.remoteQuerier)

_, err := qf(context.Background(), "test", time.Now())
require.Equal(t, tc.expectedError, err)

require.Equal(t, tc.expectedQueries, int(testutil.ToFloat64(queries)))
require.Equal(t, tc.expectedFailedQueries, int(testutil.ToFloat64(failures)))
require.Equal(t, tc.expectedQueries, int(testutil.ToFloat64(queries.WithLabelValues("user-1"))))

expectedFailedReason := tc.expectedFailedReason
if expectedFailedReason == "" {
expectedFailedReason = "error"
}
require.Equal(t, tc.expectedFailedQueries, int(testutil.ToFloat64(failures.WithLabelValues("user-1", expectedFailedReason))))
})
}
}
Expand Down

0 comments on commit 47b0bc6

Please sign in to comment.