diff --git a/CHANGELOG.md b/CHANGELOG.md index 8be25291..b40c60ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) * [#266](https://github.com/babylonlabs-io/finality-provider/pull/266) Change default config * [#262](https://github.com/babylonlabs-io/finality-provider/pull/262) Add new command to export pop * [#284](https://github.com/babylonlabs-io/finality-provider/pull/284) Add new command to delete pop +* [#277](https://github.com/babylonlabs-io/finality-provider/pull/277) Poll many blocks in poller ## v0.14.3 diff --git a/clientcontroller/babylon.go b/clientcontroller/babylon.go index ed0916e0..054bd481 100644 --- a/clientcontroller/babylon.go +++ b/clientcontroller/babylon.go @@ -332,13 +332,13 @@ func (bc *BabylonController) QueryLastCommittedPublicRand(fpPk *btcec.PublicKey, return res.PubRandCommitMap, nil } -func (bc *BabylonController) QueryBlocks(startHeight, endHeight uint64, limit uint32) ([]*types.BlockInfo, error) { +func (bc *BabylonController) QueryBlocks(startHeight, endHeight uint64, limit uint64) ([]*types.BlockInfo, error) { if endHeight < startHeight { return nil, fmt.Errorf("the startHeight %v should not be higher than the endHeight %v", startHeight, endHeight) } count := endHeight - startHeight + 1 - if count > uint64(limit) { - count = uint64(limit) + if count > limit { + count = limit } return bc.queryLatestBlocks(sdk.Uint64ToBigEndian(startHeight), count, finalitytypes.QueriedBlockStatus_ANY, false) diff --git a/clientcontroller/interface.go b/clientcontroller/interface.go index 8aaf29d9..74a6be50 100644 --- a/clientcontroller/interface.go +++ b/clientcontroller/interface.go @@ -73,7 +73,7 @@ type ClientController interface { QueryBlock(height uint64) (*types.BlockInfo, error) // QueryBlocks returns a list of blocks from startHeight to endHeight - QueryBlocks(startHeight, endHeight uint64, limit uint32) ([]*types.BlockInfo, error) + QueryBlocks(startHeight, endHeight uint64, limit uint64) ([]*types.BlockInfo, error) // QueryBestBlock queries the tip block of the consumer chain QueryBestBlock() (*types.BlockInfo, error) diff --git a/finality-provider/service/chain_poller.go b/finality-provider/service/chain_poller.go index 08d32791..2d59e69f 100644 --- a/finality-provider/service/chain_poller.go +++ b/finality-provider/service/chain_poller.go @@ -47,6 +47,7 @@ type ChainPoller struct { skipHeightChan chan *skipHeightRequest nextHeight uint64 logger *zap.Logger + mu sync.RWMutex } func NewChainPoller( @@ -113,24 +114,30 @@ func (cp *ChainPoller) GetBlockInfoChan() <-chan *types.BlockInfo { return cp.blockInfoChan } -func (cp *ChainPoller) blockWithRetry(height uint64) (*types.BlockInfo, error) { +func (cp *ChainPoller) blocksWithRetry(start, end, limit uint64) ([]*types.BlockInfo, error) { var ( - block *types.BlockInfo + block []*types.BlockInfo err error ) if err := retry.Do(func() error { - block, err = cp.cc.QueryBlock(height) + block, err = cp.cc.QueryBlocks(start, end, limit) if err != nil { return err } + if len(block) == 0 { + return fmt.Errorf("no blocks found for range %d-%d", start, end) + } + return nil }, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { cp.logger.Debug( - "failed to query the consumer chain for the latest block", + "failed to query the consumer chain for block range", zap.Uint("attempt", n+1), zap.Uint("max_attempts", RtyAttNum), - zap.Uint64("height", height), + zap.Uint64("start_height", start), + zap.Uint64("end_height", end), + zap.Uint64("limit", limit), zap.Error(err), ) })); err != nil { @@ -140,6 +147,33 @@ func (cp *ChainPoller) blockWithRetry(height uint64) (*types.BlockInfo, error) { return block, nil } +func (cp *ChainPoller) getLatestBlockWithRetry() (*types.BlockInfo, error) { + var ( + latestBlock *types.BlockInfo + err error + ) + + if err := retry.Do(func() error { + latestBlock, err = cp.cc.QueryBestBlock() + if err != nil { + return err + } + + return nil + }, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { + cp.logger.Debug( + "failed to query the consumer chain for the latest block", + zap.Uint("attempt", n+1), + zap.Uint("max_attempts", RtyAttNum), + zap.Error(err), + ) + })); err != nil { + return nil, err + } + + return latestBlock, nil +} + // waitForActivation waits until BTC staking is activated func (cp *ChainPoller) waitForActivation() { // ensure that the startHeight is no lower than the activated height @@ -171,31 +205,52 @@ func (cp *ChainPoller) pollChain() { var failedCycles uint32 for { - // start polling in the first iteration - blockToRetrieve := cp.nextHeight - block, err := cp.blockWithRetry(blockToRetrieve) + latestBlock, err := cp.getLatestBlockWithRetry() if err != nil { failedCycles++ cp.logger.Debug( - "failed to query the consumer chain for the block", + "failed to query the consumer chain for the latest block", zap.Uint32("current_failures", failedCycles), - zap.Uint64("block_to_retrieve", blockToRetrieve), zap.Error(err), ) } else { - // no error and we got the header we wanted to get, bump the state and push - // notification about data - cp.nextHeight = blockToRetrieve + 1 - failedCycles = 0 - cp.metrics.RecordLastPolledHeight(block.Height) - - cp.logger.Info("the poller retrieved the block from the consumer chain", - zap.Uint64("height", block.Height)) - - // push the data to the channel - // Note: if the consumer is too slow -- the buffer is full - // the channel will block, and we will stop retrieving data from the node - cp.blockInfoChan <- block + // start polling in the first iteration + blockToRetrieve := cp.NextHeight() + + blocks, err := cp.blocksWithRetry(blockToRetrieve, latestBlock.Height, latestBlock.Height) + if err != nil { + failedCycles++ + cp.logger.Debug( + "failed to query the consumer chain for the block range", + zap.Uint32("current_failures", failedCycles), + zap.Uint64("start_height", blockToRetrieve), + zap.Uint64("end_height", latestBlock.Height), + zap.Error(err), + ) + } else { + // no error and we got the header we wanted to get, bump the state and push + // notification about data + failedCycles = 0 + if len(blocks) == 0 { + continue + } + + lb := blocks[len(blocks)-1] + cp.setNextHeight(lb.Height + 1) + + cp.metrics.RecordLastPolledHeight(lb.Height) + + cp.logger.Info("the poller retrieved the blocks from the consumer chain", + zap.Uint64("start_height", blockToRetrieve), + zap.Uint64("end_height", lb.Height)) + + // push the data to the channel + // Note: if the consumer is too slow -- the buffer is full + // the channel will block, and we will stop retrieving data from the node + for _, block := range blocks { + cp.blockInfoChan <- block + } + } } if failedCycles > maxFailedCycles { @@ -222,7 +277,7 @@ func (cp *ChainPoller) pollChain() { cp.clearChanBufferUpToHeight(targetHeight) // set the next height to the skip height - cp.nextHeight = targetHeight + cp.setNextHeight(targetHeight) cp.logger.Debug("the poller has skipped height(s)", zap.Uint64("next_height", req.height)) @@ -261,9 +316,19 @@ func (cp *ChainPoller) SkipToHeight(height uint64) error { } func (cp *ChainPoller) NextHeight() uint64 { + cp.mu.RLock() + defer cp.mu.RUnlock() + return cp.nextHeight } +func (cp *ChainPoller) setNextHeight(height uint64) { + cp.mu.Lock() + defer cp.mu.Unlock() + + cp.nextHeight = height +} + func (cp *ChainPoller) clearChanBufferUpToHeight(upToHeight uint64) { for len(cp.blockInfoChan) > 0 { block := <-cp.blockInfoChan diff --git a/finality-provider/service/chain_poller_test.go b/finality-provider/service/chain_poller_test.go index b7f37592..9284f647 100644 --- a/finality-provider/service/chain_poller_test.go +++ b/finality-provider/service/chain_poller_test.go @@ -34,15 +34,16 @@ func FuzzChainPoller_Start(f *testing.F) { mockClientController.EXPECT().QueryActivatedHeight().Return(uint64(1), nil).AnyTimes() currentBlockRes := &types.BlockInfo{ - Height: currentHeight, + Height: endHeight, } mockClientController.EXPECT().QueryBestBlock().Return(currentBlockRes, nil).AnyTimes() for i := startHeight; i <= endHeight; i++ { - resBlock := &types.BlockInfo{ + resBlocks := []*types.BlockInfo{{ Height: i, - } - mockClientController.EXPECT().QueryBlock(i).Return(resBlock, nil).AnyTimes() + }} + + mockClientController.EXPECT().QueryBlocks(i, endHeight, endHeight).Return(resBlocks, nil).AnyTimes() } m := metrics.NewFpMetrics() @@ -77,7 +78,7 @@ func FuzzChainPoller_SkipHeight(f *testing.F) { currentHeight := uint64(r.Int63n(100) + 1) startHeight := currentHeight + 1 endHeight := startHeight + uint64(r.Int63n(10)+2) - skipHeight := endHeight + uint64(r.Int63n(10)+1) + skipHeight := endHeight + uint64(r.Int63n(10)+2) ctl := gomock.NewController(t) mockClientController := mocks.NewMockClientController(ctl) @@ -85,16 +86,27 @@ func FuzzChainPoller_SkipHeight(f *testing.F) { mockClientController.EXPECT().QueryActivatedHeight().Return(uint64(1), nil).AnyTimes() currentBlockRes := &types.BlockInfo{ - Height: currentHeight, + Height: endHeight, } mockClientController.EXPECT().QueryBestBlock().Return(currentBlockRes, nil).AnyTimes() - for i := startHeight; i <= skipHeight; i++ { + var resBlocks []*types.BlockInfo + for i := startHeight; i <= endHeight; i++ { resBlock := &types.BlockInfo{ Height: i, } - mockClientController.EXPECT().QueryBlock(i).Return(resBlock, nil).AnyTimes() + resBlocks = append(resBlocks, resBlock) } + mockClientController.EXPECT().QueryBlocks(startHeight, endHeight, endHeight).Return(resBlocks, nil).AnyTimes() + mockClientController.EXPECT().QueryBlocks(endHeight+1, endHeight, endHeight).Return(resBlocks, nil).AnyTimes() + mockClientController.EXPECT().QueryBlocks(startHeight, skipHeight, skipHeight).Return(resBlocks, nil).AnyTimes() + + resBlocks = append(resBlocks, &types.BlockInfo{ + Height: skipHeight, + }) + mockClientController.EXPECT().QueryBlocks(skipHeight, endHeight, endHeight).Return(resBlocks, nil).AnyTimes() + mockClientController.EXPECT().QueryBlocks(skipHeight+1, endHeight, endHeight).Return(resBlocks, nil).AnyTimes() + mockClientController.EXPECT().QueryBlocks(skipHeight+1, skipHeight, skipHeight).Return(resBlocks, nil).AnyTimes() m := metrics.NewFpMetrics() pollerCfg := fpcfg.DefaultChainPollerConfig() @@ -116,7 +128,7 @@ func FuzzChainPoller_SkipHeight(f *testing.F) { var wg sync.WaitGroup wg.Add(1) go func() { - wg.Done() + defer wg.Done() // insert a skipToHeight request with height lower than the next // height to retrieve, expecting an error err = poller.SkipToHeight(poller.NextHeight() - 1) @@ -128,6 +140,7 @@ func FuzzChainPoller_SkipHeight(f *testing.F) { }() skipped := false + seenHeight := map[uint64]struct{}{} for i := startHeight; i <= endHeight; i++ { if skipped { break @@ -137,15 +150,25 @@ func FuzzChainPoller_SkipHeight(f *testing.F) { if info.Height == skipHeight { skipped = true } else { - require.Equal(t, i, info.Height) + seenHeight[info.Height] = struct{}{} } case <-time.After(10 * time.Second): t.Fatalf("Failed to get block info") } } - wg.Wait() + for i := startHeight; i <= endHeight; i++ { + if i == skipHeight { + break + } + if _, ok := seenHeight[i]; !ok { + t.Fatalf("height %d not seen", i) + } + } - require.Equal(t, skipHeight+1, poller.NextHeight()) + wg.Wait() + require.Eventually(t, func() bool { + return skipHeight+1 == poller.NextHeight() + }, eventuallyWaitTimeOut, eventuallyPollTime) }) } diff --git a/testutil/mocks/babylon.go b/testutil/mocks/babylon.go index 404463a7..375f438e 100644 --- a/testutil/mocks/babylon.go +++ b/testutil/mocks/babylon.go @@ -129,7 +129,7 @@ func (mr *MockClientControllerMockRecorder) QueryBlock(height interface{}) *gomo } // QueryBlocks mocks base method. -func (m *MockClientController) QueryBlocks(startHeight, endHeight uint64, limit uint32) ([]*types1.BlockInfo, error) { +func (m *MockClientController) QueryBlocks(startHeight, endHeight, limit uint64) ([]*types1.BlockInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "QueryBlocks", startHeight, endHeight, limit) ret0, _ := ret[0].([]*types1.BlockInfo)