Skip to content

Commit

Permalink
feat: added ability to specify backup rpc for connecting to the netwo…
Browse files Browse the repository at this point in the history
…rk (#28)

* feat: added ability to specify backup rpc for connecting to the network

* fmt
  • Loading branch information
akorchyn authored Dec 6, 2024
1 parent 3982a3f commit 1ad1db1
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 173 deletions.
17 changes: 17 additions & 0 deletions examples/specify_backup_rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use near_api::{prelude::*, types::reference::Reference};

#[tokio::main]
async fn main() {
let mut network = NetworkConfig::mainnet();
network.rpc_endpoints.push(
RPCEndpoint::new("https://rpc.mainnet.pagoda.co/".parse().unwrap())
.with_api_key("potential api key".parse().unwrap())
.with_retries(5),
);
// Query latest block
let _block = Chain::block()
.at(Reference::Optimistic)
.fetch_from_mainnet()
.await
.unwrap();
}
97 changes: 28 additions & 69 deletions src/common/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use serde::de::DeserializeOwned;
use tracing::{debug, error, info, instrument, trace, warn};

use crate::{
common::utils::{retry, RetryResponse},
config::NetworkConfig,
config::{retry, NetworkConfig, RetryResponse},
errors::QueryError,
types::Data,
};
Expand Down Expand Up @@ -155,9 +154,6 @@ where
reference: Reference,
requests: Vec<Arc<dyn QueryCreator<Method, RpcReference = Reference> + Send + Sync>>,
handler: ResponseHandler,
retries: u8,
sleep_duration: std::time::Duration,
exponential_backoff: bool,
}

impl<Handler, Method, Reference> MultiRpcBuilder<Handler, Method, Reference>
Expand All @@ -173,10 +169,6 @@ where
reference,
requests: vec![],
handler,
retries: 5,
// 50ms, 100ms, 200ms, 400ms, 800ms
sleep_duration: std::time::Duration::from_millis(50),
exponential_backoff: true,
}
}

Expand Down Expand Up @@ -205,8 +197,6 @@ where
self,
network: &NetworkConfig,
) -> ResultWithMethod<Handler::Response, Method> {
let json_rpc_client = network.json_rpc_client();

debug!(target: QUERY_EXECUTOR_TARGET, "Preparing queries");
let requests: Vec<_> = self
.requests
Expand All @@ -219,32 +209,27 @@ where
.collect::<Result<_, _>>()?;

info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", requests.len());
let requests = requests.into_iter().map(|(query, request)| {
let json_rpc_client = json_rpc_client.clone();
async move {
retry(
|| async {
let result = match json_rpc_client.call(&query).await {
Ok(result) => RetryResponse::Ok(result),
Err(err) if request.is_critical_error(&err) => {
RetryResponse::Critical(err)
}
Err(err) => RetryResponse::Retry(err),
};
tracing::debug!(
target: QUERY_EXECUTOR_TARGET,
"Querying RPC with {:?} resulted in {:?}",
query,
result
);
let requests = requests.into_iter().map(|(query, request)| async move {
retry(network.clone(), |json_rpc_client| {
let query = &query;
let request = &request;

async move {
let result = match json_rpc_client.call(&query).await {
Ok(result) => RetryResponse::Ok(result),
Err(err) if request.is_critical_error(&err) => RetryResponse::Critical(err),
Err(err) => RetryResponse::Retry(err),
};
tracing::debug!(
target: QUERY_EXECUTOR_TARGET,
"Querying RPC with {:?} resulted in {:?}",
query,
result
},
self.retries,
self.sleep_duration,
self.exponential_backoff,
)
.await
}
);
result
}
})
.await
});

let requests: Vec<_> = join_all(requests)
Expand Down Expand Up @@ -275,9 +260,6 @@ pub struct RpcBuilder<Handler, Method, Reference> {
reference: Reference,
request: Arc<dyn QueryCreator<Method, RpcReference = Reference> + Send + Sync>,
handler: Handler,
retries: u8,
sleep_duration: std::time::Duration,
exponential_backoff: bool,
}

impl<Handler, Method, Reference> RpcBuilder<Handler, Method, Reference>
Expand All @@ -297,10 +279,6 @@ where
reference,
request: Arc::new(request),
handler,
retries: 5,
// 50ms, 100ms, 200ms, 400ms, 800ms
sleep_duration: std::time::Duration::from_millis(50),
exponential_backoff: true,
}
}

Expand All @@ -311,37 +289,21 @@ where
}
}

pub const fn with_retries(mut self, retries: u8) -> Self {
self.retries = retries;
self
}

pub const fn with_sleep_duration(mut self, sleep_duration: std::time::Duration) -> Self {
self.sleep_duration = sleep_duration;
self
}

pub const fn with_exponential_backoff(mut self) -> Self {
self.exponential_backoff = true;
self
}

#[instrument(skip(self, network))]
pub async fn fetch_from(
self,
network: &NetworkConfig,
) -> ResultWithMethod<Handler::Response, Method> {
debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
let json_rpc_client = network.json_rpc_client();
let query = self.request.create_query(network, self.reference)?;

let query_response = retry(
|| async {
let query_response = retry(network.clone(), |json_rpc_client| {
let query = &query;
let request = &self.request;
async move {
let result = match json_rpc_client.call(&query).await {
Ok(result) => RetryResponse::Ok(result),
Err(err) if self.request.is_critical_error(&err) => {
RetryResponse::Critical(err)
}
Err(err) if request.is_critical_error(&err) => RetryResponse::Critical(err),
Err(err) => RetryResponse::Retry(err),
};
tracing::debug!(
Expand All @@ -351,11 +313,8 @@ where
result
);
result
},
3,
std::time::Duration::from_secs(1),
false,
)
}
})
.await?;

debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
Expand Down
49 changes: 8 additions & 41 deletions src/common/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use reqwest::Response;
use tracing::{debug, info};

use crate::{
common::utils::{is_critical_transaction_error, RetryResponse},
config::NetworkConfig,
common::utils::is_critical_transaction_error,
config::{retry, NetworkConfig, RetryResponse},
errors::{
ExecuteMetaTransactionsError, ExecuteTransactionError, MetaSignError, SignerError,
ValidationError,
Expand All @@ -22,8 +22,7 @@ use crate::{
};

use super::{
signed_delegate_action::SignedDelegateActionAsBase64, utils::retry,
META_TRANSACTION_VALID_FOR_DEFAULT,
signed_delegate_action::SignedDelegateActionAsBase64, META_TRANSACTION_VALID_FOR_DEFAULT,
};

const TX_EXECUTOR_TARGET: &str = "near_api::tx::executor";
Expand Down Expand Up @@ -80,42 +79,20 @@ impl From<SignedTransaction> for PrepopulateTransaction {
pub struct ExecuteSignedTransaction {
pub tr: TransactionableOrSigned<SignedTransaction>,
pub signer: Arc<Signer>,
pub retries: u8,
pub sleep_duration: std::time::Duration,
pub exponential_backoff: bool,
}

impl ExecuteSignedTransaction {
pub fn new<T: Transactionable + 'static>(tr: T, signer: Arc<Signer>) -> Self {
Self {
tr: TransactionableOrSigned::Transactionable(Box::new(tr)),
signer,
retries: 5,
// 50ms, 100ms, 200ms, 400ms, 800ms
sleep_duration: std::time::Duration::from_millis(50),
exponential_backoff: true,
}
}

pub fn meta(self) -> ExecuteMetaTransaction {
ExecuteMetaTransaction::from_box(self.tr.transactionable(), self.signer)
}

pub const fn with_retries(mut self, retries: u8) -> Self {
self.retries = retries;
self
}

pub const fn with_sleep_duration(mut self, sleep_duration: std::time::Duration) -> Self {
self.sleep_duration = sleep_duration;
self
}

pub const fn with_exponential_backoff(mut self) -> Self {
self.exponential_backoff = true;
self
}

pub async fn presign_offline(
mut self,
public_key: PublicKey,
Expand Down Expand Up @@ -169,9 +146,6 @@ impl ExecuteSignedTransaction {
mut self,
network: &NetworkConfig,
) -> Result<FinalExecutionOutcomeView, ExecuteTransactionError> {
let sleep_duration = self.sleep_duration;
let retries = self.retries;

let (signed, transactionable) = match &mut self.tr {
TransactionableOrSigned::Transactionable(tr) => {
debug!(target: TX_EXECUTOR_TARGET, "Preparing unsigned transaction");
Expand Down Expand Up @@ -212,7 +186,7 @@ impl ExecuteSignedTransaction {
signed.transaction.nonce(),
);

Self::send_impl(network, signed, retries, sleep_duration).await
Self::send_impl(network, signed).await
}

pub async fn send_to_mainnet(
Expand All @@ -232,15 +206,11 @@ impl ExecuteSignedTransaction {
async fn send_impl(
network: &NetworkConfig,
signed_tr: SignedTransaction,
retries: u8,
sleep_duration: std::time::Duration,
) -> Result<FinalExecutionOutcomeView, ExecuteTransactionError> {
retry(
|| {
let signed_tr = signed_tr.clone();
async move {
let result = match network
.json_rpc_client()
retry(network.clone(), |json_rpc_client| {
let signed_tr = signed_tr.clone();
async move {
let result = match json_rpc_client
.call(
near_jsonrpc_client::methods::broadcast_tx_commit::RpcBroadcastTxCommitRequest {
signed_transaction: signed_tr.clone(),
Expand All @@ -263,9 +233,6 @@ impl ExecuteSignedTransaction {
result
}
},
retries,
sleep_duration,
false,
)
.await
.map_err(ExecuteTransactionError::TransactionError)
Expand Down
51 changes: 1 addition & 50 deletions src/common/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// https://github.com/near/near-token-rs/blob/3feafec624e7d1028ed00695f2acf87e1d823fa7/src/utils.rs#L1-L49

use crate::errors::{DecimalNumberParsingError, RetryError};
use crate::errors::DecimalNumberParsingError;

/// Parsing decimal numbers from `&str` type in `u128`.
/// Function also takes a value of metric prefix in u128 type.
Expand Down Expand Up @@ -49,55 +49,6 @@ pub fn parse_decimal_number(s: &str, pref_const: u128) -> Result<u128, DecimalNu
Ok(result)
}

#[derive(Debug)]
pub enum RetryResponse<R, E> {
Ok(R),
Retry(E),
Critical(E),
}

impl<R, E> From<Result<R, E>> for RetryResponse<R, E> {
fn from(value: Result<R, E>) -> Self {
match value {
Ok(value) => Self::Ok(value),
Err(value) => Self::Retry(value),
}
}
}

pub async fn retry<R, E, T, F>(
mut task: F,
retries: u8,
initial_sleep: std::time::Duration,
exponential_backoff: bool,
) -> Result<R, RetryError<E>>
where
F: FnMut() -> T + Send,
T: core::future::Future<Output = RetryResponse<R, E>> + Send,
T::Output: Send,
{
let mut retries = (1..=retries).rev();
let mut sleep_duration = initial_sleep;
loop {
let result = task().await;
match result {
RetryResponse::Ok(result) => return Ok(result),
RetryResponse::Retry(_) if retries.next().is_some() => {
tokio::time::sleep(sleep_duration).await;
sleep_duration = if exponential_backoff {
sleep_duration * 2
} else {
sleep_duration
};
}
RetryResponse::Retry(err) => {
return Err(RetryError::RetriesExhausted(err));
}
RetryResponse::Critical(err) => return Err(RetryError::Critical(err)),
}
}
}

pub fn is_critical_blocks_error(
err: &near_jsonrpc_client::errors::JsonRpcError<
near_jsonrpc_primitives::types::blocks::RpcBlockError,
Expand Down
Loading

0 comments on commit 1ad1db1

Please sign in to comment.