From 4e2bf11ca0a6ee22ac7b84522152ad9cbbe9aed1 Mon Sep 17 00:00:00 2001 From: "antoine.choimet" Date: Wed, 20 Nov 2024 13:59:23 +0100 Subject: [PATCH] fix: check lag failed if acceptable lag crossed --- extkafka/check_topic_lag_for_consumer.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/extkafka/check_topic_lag_for_consumer.go b/extkafka/check_topic_lag_for_consumer.go index a2304cc..3190d37 100644 --- a/extkafka/check_topic_lag_for_consumer.go +++ b/extkafka/check_topic_lag_for_consumer.go @@ -29,6 +29,7 @@ type ConsumerGroupLagCheckState struct { End time.Time AcceptableLag int64 StateCheckSuccess bool + StateCheckFailed bool } // Make sure action implements all required interfaces @@ -157,6 +158,7 @@ func (m *ConsumerGroupLagCheckAction) Prepare(_ context.Context, state *Consumer } state.Topic = extutil.ToString(request.Config["topic"]) state.AcceptableLag = extutil.ToInt64(request.Config["acceptableLag"]) + state.StateCheckFailed = false duration := request.Config["duration"].(float64) end := time.Now().Add(time.Millisecond * time.Duration(duration)) @@ -211,10 +213,12 @@ func ConsumerGroupLagCheckStatus(ctx context.Context, state *ConsumerGroupLagChe var checkError *action_kit_api.ActionKitError if topicLag < state.AcceptableLag { state.StateCheckSuccess = true + } else { + state.StateCheckFailed = true } - if completed && !state.StateCheckSuccess { + if completed && state.StateCheckFailed { checkError = extutil.Ptr(action_kit_api.ActionKitError{ - Title: fmt.Sprintf("Consumer Group Lag was higher once than acceptable threshold '%d'.", + Title: fmt.Sprintf("Consumer Group Lag was higher at least once than acceptable threshold '%d'.", state.AcceptableLag), Status: extutil.Ptr(action_kit_api.Failed), })