Skip to content

Commit

Permalink
we not consider block that are not available with a block number lowe…
Browse files Browse the repository at this point in the history
…r than last confirm block skipped be default
  • Loading branch information
billettc committed Aug 20, 2024
1 parent b197eef commit be85c9a
Showing 1 changed file with 23 additions and 28 deletions.
51 changes: 23 additions & 28 deletions block/fetcher/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/gagliardetto/solana-go/rpc/jsonrpc"
bin "github.com/streamingfast/binary"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/derr"
firecoreRPC "github.com/streamingfast/firehose-core/rpc"
pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
sfsol "github.com/streamingfast/solana-go"
Expand Down Expand Up @@ -61,8 +60,6 @@ func (f *RPCFetcher) IsBlockAvailable(requestedSlot uint64) bool {
}

func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbstream.Block, skip bool, err error) {
f.logger.Info("fetching block", zap.Uint64("block_num", requestedSlot))

//THIS IS A FKG Ugly hack!
if requestedSlot >= 13334464 && requestedSlot <= 13334475 {
return nil, true, nil
Expand Down Expand Up @@ -101,9 +98,9 @@ func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbs
return nil, false, err
}

f.logger.Info("fetching block", zap.Uint64("block_num", requestedSlot), zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot))
f.logger.Info("fetcher fetching block", zap.Uint64("block_num", requestedSlot), zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot))

blockResult, skip, err := f.fetch(ctx, requestedSlot)
blockResult, skip, err := f.fetch(ctx, requestedSlot, f.latestConfirmedSlot)
if err != nil {
return nil, false, fmt.Errorf("fetching block %d: %w", requestedSlot, err)
}
Expand All @@ -121,37 +118,38 @@ func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbs
return nil, false, fmt.Errorf("decoding block %d: %w", requestedSlot, err)
}

f.logger.Info("fetched block", zap.Uint64("block_num", requestedSlot), zap.String("block_hash", blockResult.Blockhash.String()))
f.logger.Info("fetcher fetched block", zap.Uint64("block_num", requestedSlot), zap.String("block_hash", blockResult.Blockhash.String()))
return block, false, nil
}

func (f *RPCFetcher) fetch(ctx context.Context, requestedSlot uint64) (*rpc.GetBlockResult, bool, error) {
func (f *RPCFetcher) fetch(ctx context.Context, requestedSlot uint64, lastConfirmBlockNum uint64) (*rpc.GetBlockResult, bool, error) {
currentSlot := requestedSlot
var out *rpc.GetBlockResult
skipped := false

var lastErrorPrintedAt time.Time

err := derr.Retry(math.MaxUint64, func(ctx context.Context) error {
var err error
out, err = firecoreRPC.WithClients(f.rpcClients, func(client *rpc.Client) (*rpc.GetBlockResult, error) {
for {
out, err := firecoreRPC.WithClients(f.rpcClients, func(client *rpc.Client) (*rpc.GetBlockResult, error) {
f.logger.Info("calling GetBlockWithOptions", zap.String("endpoints", fmt.Sprintf("%s", client)))
blockResult, err := client.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts)
blockResult, err := client.GetBlockWithOpts(ctx, currentSlot, GetBlockOpts)
return blockResult, err
})

if err != nil {
var rpcErr *jsonrpc.RPCError
if errors.As(err, &rpcErr) {
if rpcErr.Code == -32004 {
f.logger.Warn("block not available. Retrying same block", zap.Uint64("block_num", currentSlot))
return err
}

if rpcErr.Code == -32009 || rpcErr.Code == -32007 {
f.logger.Info("block was skipped", zap.Uint64("block_num", currentSlot))
currentSlot += 1
skipped = true
return nil
f.logger.Info("fetcher block was skipped", zap.Uint64("block_num", currentSlot))
return nil, true, nil
}

if rpcErr.Code == -32004 {
if currentSlot < lastConfirmBlockNum {
f.logger.Info("fetcher block was supposedly skipped", zap.Uint64("block_num", currentSlot))
return nil, true, nil
}

f.logger.Warn("block not available. trying same block", zap.Uint64("block_num", currentSlot))
continue
}
}

Expand All @@ -160,15 +158,12 @@ func (f *RPCFetcher) fetch(ctx context.Context, requestedSlot uint64) (*rpc.GetB
lastErrorPrintedAt = time.Now()
}

return fmt.Errorf("getting block %d from rpcClient: %w", requestedSlot, err)
//we retry forever!
continue
}
return nil
})

if err != nil {
return nil, false, fmt.Errorf("after retrying fetch block %d: %w", requestedSlot, err)
return out, false, nil
}
return out, skipped, err
}

func blockFromBlockResult(slot uint64, finalizedSlot uint64, result *rpc.GetBlockResult, logger *zap.Logger) (*pbbstream.Block, error) {
Expand Down

0 comments on commit be85c9a

Please sign in to comment.