Skip to content

Commit

Permalink
Merge pull request #818 from forta-network/caner/cleanup-bot-client
Browse files Browse the repository at this point in the history
Use unused close signal channel and fix bot client test
  • Loading branch information
canercidam authored Oct 19, 2023
2 parents 6ee4f55 + 41e1adf commit 0d7fe71
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
49 changes: 29 additions & 20 deletions services/components/botio/bot_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ const (
DefaultHealthCheckInterval = time.Second * 30
)

// Global vars
var (
HealthCheckInterval = DefaultHealthCheckInterval
)

// botClient receives blocks and transactions, and produces results.
type botClient struct {
ctx context.Context
Expand Down Expand Up @@ -400,7 +405,11 @@ func processRequests[R any](
for {
select {
case <-ctx.Done():
logger.WithError(ctx.Err()).Info("bot context is done")
logger.WithError(ctx.Err()).Info("bot context is done - exiting request processing loop")
return

case <-closedCh:
logger.Info("bot client is closed - exiting request processing loop")
return

case request := <-reqCh:
Expand Down Expand Up @@ -450,12 +459,12 @@ func (bot *botClient) processBlocks() {
processRequests(bot.ctx, bot.blockRequests, bot.Closed(), lg, bot.processBlock)
}

func (bot *botClient) processHealthChecks() {
func (bot *botClient) processCombinationAlerts() {
lg := log.WithFields(
log.Fields{
"bot": bot.Config().ID,
"component": "bot-client",
"evaluate": "health-check",
"evaluate": "combination",
},
)

Expand All @@ -465,28 +474,15 @@ func (bot *botClient) processHealthChecks() {
case <-bot.Initialized():
}

ticker := time.NewTicker(DefaultHealthCheckInterval)

bot.doHealthCheck(bot.ctx, lg)
for {
select {
case <-bot.ctx.Done():
return
case <-ticker.C:
exit := bot.doHealthCheck(bot.ctx, lg)
if exit {
return
}
}
}
processRequests(bot.ctx, bot.combinationRequests, bot.Closed(), lg, bot.processCombinationAlert)
}

func (bot *botClient) processCombinationAlerts() {
func (bot *botClient) processHealthChecks() {
lg := log.WithFields(
log.Fields{
"bot": bot.Config().ID,
"component": "bot-client",
"evaluate": "combination",
"evaluate": "health-check",
},
)

Expand All @@ -496,7 +492,20 @@ func (bot *botClient) processCombinationAlerts() {
case <-bot.Initialized():
}

processRequests(bot.ctx, bot.combinationRequests, bot.Closed(), lg, bot.processCombinationAlert)
ticker := time.NewTicker(HealthCheckInterval)

bot.doHealthCheck(bot.ctx, lg)
for {
select {
case <-bot.ctx.Done():
return
case <-ticker.C:
exit := bot.doHealthCheck(bot.ctx, lg)
if exit {
return
}
}
}
}

func (bot *botClient) processTransaction(ctx context.Context, lg *log.Entry, request *botreq.TxRequest) (exit bool) {
Expand Down
7 changes: 4 additions & 3 deletions services/components/botio/bot_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func (s *BotClientSuite) TestStartProcessStop() {
s.lifecycleMetrics.EXPECT().ActionSubscribe(combinerSubscriptions)

// test health checks
s.botGrpc.EXPECT().DoHealthCheck(gomock.Any())
s.lifecycleMetrics.EXPECT().HealthCheckAttempt(s.botClient.configUnsafe)
s.lifecycleMetrics.EXPECT().HealthCheckSuccess(s.botClient.configUnsafe)
HealthCheckInterval = time.Second
s.botGrpc.EXPECT().DoHealthCheck(gomock.Any()).MinTimes(1)
s.lifecycleMetrics.EXPECT().HealthCheckAttempt(s.botClient.configUnsafe).MinTimes(1)
s.lifecycleMetrics.EXPECT().HealthCheckSuccess(s.botClient.configUnsafe).MinTimes(1)

s.msgClient.EXPECT().Publish(messaging.SubjectAgentsAlertSubscribe, combinerSubscriptions)
s.botClient.StartProcessing()
Expand Down

0 comments on commit 0d7fe71

Please sign in to comment.