Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: poll many blocks #277

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
* [#252](https://github.com/babylonlabs-io/finality-provider/pull/252) Remove interceptors and use context
* [#266](https://github.com/babylonlabs-io/finality-provider/pull/266) Change default config
* [#262](https://github.com/babylonlabs-io/finality-provider/pull/262) Add new command to export pop
* [#277](https://github.com/babylonlabs-io/finality-provider/pull/277) Poll many blocks in poller

## v0.14.3

Expand Down
6 changes: 3 additions & 3 deletions clientcontroller/babylon.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,13 @@ func (bc *BabylonController) QueryLastCommittedPublicRand(fpPk *btcec.PublicKey,
return res.PubRandCommitMap, nil
}

func (bc *BabylonController) QueryBlocks(startHeight, endHeight uint64, limit uint32) ([]*types.BlockInfo, error) {
func (bc *BabylonController) QueryBlocks(startHeight, endHeight uint64, limit uint64) ([]*types.BlockInfo, error) {
if endHeight < startHeight {
return nil, fmt.Errorf("the startHeight %v should not be higher than the endHeight %v", startHeight, endHeight)
}
count := endHeight - startHeight + 1
if count > uint64(limit) {
count = uint64(limit)
if count > limit {
count = limit
}

return bc.queryLatestBlocks(sdk.Uint64ToBigEndian(startHeight), count, finalitytypes.QueriedBlockStatus_ANY, false)
Expand Down
2 changes: 1 addition & 1 deletion clientcontroller/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type ClientController interface {
QueryBlock(height uint64) (*types.BlockInfo, error)

// QueryBlocks returns a list of blocks from startHeight to endHeight
QueryBlocks(startHeight, endHeight uint64, limit uint32) ([]*types.BlockInfo, error)
QueryBlocks(startHeight, endHeight uint64, limit uint64) ([]*types.BlockInfo, error)

// QueryBestBlock queries the tip block of the consumer chain
QueryBestBlock() (*types.BlockInfo, error)
Expand Down
118 changes: 100 additions & 18 deletions finality-provider/service/chain_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
return cp.blockInfoChan
}

func (cp *ChainPoller) blockWithRetry(height uint64) (*types.BlockInfo, error) {

Check failure on line 116 in finality-provider/service/chain_poller.go

View workflow job for this annotation

GitHub Actions / lint_test / lint

func `(*ChainPoller).blockWithRetry` is unused (unused)
var (
block *types.BlockInfo
err error
Expand All @@ -140,6 +140,66 @@
return block, nil
}

func (cp *ChainPoller) blocksWithRetry(start, end, limit uint64) ([]*types.BlockInfo, error) {
var (
block []*types.BlockInfo
err error
)
if err := retry.Do(func() error {
block, err = cp.cc.QueryBlocks(start, end, limit)
if err != nil {
return err
}

if len(block) == 0 {
return fmt.Errorf("no blocks found for range %d-%d", start, end)
}

return nil
}, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
cp.logger.Debug(
"failed to query the consumer chain for block range",
zap.Uint("attempt", n+1),
zap.Uint("max_attempts", RtyAttNum),
zap.Uint64("start_height", start),
zap.Uint64("end_height", end),
zap.Uint64("limit", limit),
zap.Error(err),
)
})); err != nil {
return nil, err
}

return block, nil
}

func (cp *ChainPoller) getLatestBlockWithRetry() (*types.BlockInfo, error) {
var (
latestBlock *types.BlockInfo
err error
)

if err := retry.Do(func() error {
latestBlock, err = cp.cc.QueryBestBlock()
if err != nil {
return err
}

return nil
}, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
cp.logger.Debug(
"failed to query the consumer chain for the latest block",
zap.Uint("attempt", n+1),
zap.Uint("max_attempts", RtyAttNum),
zap.Error(err),
)
})); err != nil {
return nil, err
}

return latestBlock, nil
}

// waitForActivation waits until BTC staking is activated
func (cp *ChainPoller) waitForActivation() {
// ensure that the startHeight is no lower than the activated height
Expand Down Expand Up @@ -171,31 +231,52 @@
var failedCycles uint32

for {
// start polling in the first iteration
blockToRetrieve := cp.nextHeight
block, err := cp.blockWithRetry(blockToRetrieve)
latestBlock, err := cp.getLatestBlockWithRetry()
if err != nil {
failedCycles++
cp.logger.Debug(
"failed to query the consumer chain for the block",
"failed to query the consumer chain for the latest block",
zap.Uint32("current_failures", failedCycles),
zap.Uint64("block_to_retrieve", blockToRetrieve),
zap.Error(err),
)
} else {
// no error and we got the header we wanted to get, bump the state and push
// notification about data
cp.nextHeight = blockToRetrieve + 1
failedCycles = 0
cp.metrics.RecordLastPolledHeight(block.Height)

cp.logger.Info("the poller retrieved the block from the consumer chain",
zap.Uint64("height", block.Height))

// push the data to the channel
// Note: if the consumer is too slow -- the buffer is full
// the channel will block, and we will stop retrieving data from the node
cp.blockInfoChan <- block
// start polling in the first iteration
blockToRetrieve := cp.nextHeight

blocks, err := cp.blocksWithRetry(blockToRetrieve, latestBlock.Height, latestBlock.Height)
if err != nil {
failedCycles++
cp.logger.Debug(
"failed to query the consumer chain for the block range",
zap.Uint32("current_failures", failedCycles),
zap.Uint64("start_height", blockToRetrieve),
zap.Uint64("end_height", latestBlock.Height),
zap.Error(err),
)
} else {
// no error and we got the header we wanted to get, bump the state and push
// notification about data
failedCycles = 0
if len(blocks) == 0 {
continue
}

lb := blocks[len(blocks)-1]
cp.nextHeight = lb.Height + 1

cp.metrics.RecordLastPolledHeight(lb.Height)

cp.logger.Info("the poller retrieved the blocks from the consumer chain",
zap.Uint64("start_height", blockToRetrieve),
zap.Uint64("end_height", lb.Height))

// push the data to the channel
// Note: if the consumer is too slow -- the buffer is full
// the channel will block, and we will stop retrieving data from the node
for _, block := range blocks {
cp.blockInfoChan <- block
}
}
}

if failedCycles > maxFailedCycles {
Expand All @@ -209,6 +290,7 @@
// than the next height to retrieve
targetHeight := req.height
if targetHeight <= cp.nextHeight {
fmt.Printf("target height %d, next height %d\n", targetHeight, cp.nextHeight)
resp := &skipHeightResponse{
err: fmt.Errorf(
"the target height %d is not higher than the next height %d to retrieve",
Expand Down
41 changes: 31 additions & 10 deletions finality-provider/service/chain_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@
mockClientController.EXPECT().QueryActivatedHeight().Return(uint64(1), nil).AnyTimes()

currentBlockRes := &types.BlockInfo{
Height: currentHeight,
Height: endHeight,
}
mockClientController.EXPECT().QueryBestBlock().Return(currentBlockRes, nil).AnyTimes()

for i := startHeight; i <= endHeight; i++ {
resBlock := &types.BlockInfo{
resBlocks := []*types.BlockInfo{{
Height: i,
}
mockClientController.EXPECT().QueryBlock(i).Return(resBlock, nil).AnyTimes()
}}

mockClientController.EXPECT().QueryBlocks(i, endHeight, uint32(endHeight)).Return(resBlocks, nil).AnyTimes()
}

m := metrics.NewFpMetrics()
Expand Down Expand Up @@ -77,24 +78,40 @@
currentHeight := uint64(r.Int63n(100) + 1)
startHeight := currentHeight + 1
endHeight := startHeight + uint64(r.Int63n(10)+2)
skipHeight := endHeight + uint64(r.Int63n(10)+1)
skipHeight := endHeight + uint64(r.Int63n(10)+2)

t.Log(startHeight, endHeight, skipHeight)

ctl := gomock.NewController(t)
mockClientController := mocks.NewMockClientController(ctl)
mockClientController.EXPECT().Close().Return(nil).AnyTimes()
mockClientController.EXPECT().QueryActivatedHeight().Return(uint64(1), nil).AnyTimes()

currentBlockRes := &types.BlockInfo{
Height: currentHeight,
Height: endHeight,
}
mockClientController.EXPECT().QueryBestBlock().Return(currentBlockRes, nil).AnyTimes()

for i := startHeight; i <= skipHeight; i++ {
var resBlocks []*types.BlockInfo
for i := startHeight; i <= endHeight; i++ {
resBlock := &types.BlockInfo{
Height: i,
}
mockClientController.EXPECT().QueryBlock(i).Return(resBlock, nil).AnyTimes()
resBlocks = append(resBlocks, resBlock)
}
//mockClientController.EXPECT().QueryBlocks(gomock.Any(), gomock.Any(), gomock.Any()).Return(resBlocks, nil).AnyTimes()

Check failure on line 102 in finality-provider/service/chain_poller_test.go

View workflow job for this annotation

GitHub Actions / lint_test / lint

commentFormatting: put a space between `//` and comment text (gocritic)
mockClientController.EXPECT().QueryBlocks(startHeight, endHeight, endHeight).Return(resBlocks, nil).AnyTimes()
mockClientController.EXPECT().QueryBlocks(endHeight+1, endHeight, endHeight).Return(resBlocks, nil).AnyTimes()

resBlocks = append(resBlocks, &types.BlockInfo{
Height: skipHeight,
})
mockClientController.EXPECT().QueryBlocks(startHeight, skipHeight, skipHeight).Return(resBlocks, nil).AnyTimes()
mockClientController.EXPECT().QueryBlocks(skipHeight+1, skipHeight, skipHeight).Return(resBlocks, nil).AnyTimes()
mockClientController.EXPECT().QueryBlocks(skipHeight, endHeight, endHeight).Return(resBlocks, nil).AnyTimes()
mockClientController.EXPECT().QueryBlocks(skipHeight+1, skipHeight, skipHeight).Return(resBlocks, nil).AnyTimes()

//mockClientController.EXPECT().QueryBlocks(skipHeight, skipHeight, uint32(skipHeight)).Return(resBlocks, nil).AnyTimes()

Check failure on line 114 in finality-provider/service/chain_poller_test.go

View workflow job for this annotation

GitHub Actions / lint_test / lint

commentFormatting: put a space between `//` and comment text (gocritic)

m := metrics.NewFpMetrics()
pollerCfg := fpcfg.DefaultChainPollerConfig()
Expand All @@ -116,14 +133,17 @@
var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
defer wg.Done()
// insert a skipToHeight request with height lower than the next
// height to retrieve, expecting an error
err = poller.SkipToHeight(poller.NextHeight() - 1)
require.Error(t, err)
// insert a skipToHeight request with a height higher than the
// next height to retrieve
err = poller.SkipToHeight(skipHeight)
if err != nil {
t.Log(err)
}
require.NoError(t, err)
}()

Expand All @@ -137,6 +157,7 @@
if info.Height == skipHeight {
skipped = true
} else {
t.Log(i, info.Height)
require.Equal(t, i, info.Height)
}
case <-time.After(10 * time.Second):
Expand All @@ -145,7 +166,7 @@
}

wg.Wait()

t.Log(skipHeight+1, poller.NextHeight())
require.Equal(t, skipHeight+1, poller.NextHeight())
})
}
2 changes: 1 addition & 1 deletion testutil/mocks/babylon.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading