diff --git a/extkafka/check_topic_lag_for_consumer.go b/extkafka/check_topic_lag_for_consumer.go index a2304cc..3190d37 100644 --- a/extkafka/check_topic_lag_for_consumer.go +++ b/extkafka/check_topic_lag_for_consumer.go @@ -29,6 +29,7 @@ type ConsumerGroupLagCheckState struct { End time.Time AcceptableLag int64 StateCheckSuccess bool + StateCheckFailed bool } // Make sure action implements all required interfaces @@ -157,6 +158,7 @@ func (m *ConsumerGroupLagCheckAction) Prepare(_ context.Context, state *Consumer } state.Topic = extutil.ToString(request.Config["topic"]) state.AcceptableLag = extutil.ToInt64(request.Config["acceptableLag"]) + state.StateCheckFailed = false duration := request.Config["duration"].(float64) end := time.Now().Add(time.Millisecond * time.Duration(duration)) @@ -211,10 +213,12 @@ func ConsumerGroupLagCheckStatus(ctx context.Context, state *ConsumerGroupLagChe var checkError *action_kit_api.ActionKitError if topicLag < state.AcceptableLag { state.StateCheckSuccess = true + } else { + state.StateCheckFailed = true } - if completed && !state.StateCheckSuccess { + if completed && state.StateCheckFailed { checkError = extutil.Ptr(action_kit_api.ActionKitError{ - Title: fmt.Sprintf("Consumer Group Lag was higher once than acceptable threshold '%d'.", + Title: fmt.Sprintf("Consumer Group Lag was higher at least once than acceptable threshold '%d'.", state.AcceptableLag), Status: extutil.Ptr(action_kit_api.Failed), })