Skip to content

Commit

Permalink
Fallback to RPC if gRPC is down (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmantica11 authored Aug 7, 2024
1 parent 4c2910f commit 8ae0085
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions src/ingester/fetchers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rand::Rng;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use tokio::time::sleep;
use tokio::time::{sleep, timeout};
use tracing::error;
use yellowstone_grpc_client::{GeyserGrpcBuilderResult, GeyserGrpcClient, Interceptor};
use yellowstone_grpc_proto::convert_from::create_tx_error;
Expand Down Expand Up @@ -71,22 +71,36 @@ pub fn get_grpc_stream_with_rpc_fallback(
}
}
None => {
let block = grpc_stream.next().await.unwrap();
if block.metadata.slot == 0 {
continue;
}
if block.metadata.parent_slot == last_indexed_slot {
last_indexed_slot = block.metadata.slot;
yield block;
} else {
info!("Switching to RPC block fetching");
rpc_poll_stream = Some(Box::pin(get_poller_block_stream(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
Some(block.metadata.slot),
)));

match timeout(Duration::from_secs(60), grpc_stream.next()).await {
Ok(Some(block)) => {
if block.metadata.slot == 0 {
continue;
}
if block.metadata.parent_slot == last_indexed_slot {
last_indexed_slot = block.metadata.slot;
yield block;
} else {
info!("Switching to RPC block fetching");
rpc_poll_stream = Some(Box::pin(get_poller_block_stream(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
Some(block.metadata.slot),
)));
}
}
Ok(None) => {
panic!("gRPC stream ended unexpectedly");
}
Err(_elapsed) => {
info!("No block received in 60 seconds, switching to RPC block fetching");
rpc_poll_stream = Some(Box::pin(get_poller_block_stream(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None,
)));
}
}

}
Expand Down

0 comments on commit 8ae0085

Please sign in to comment.