Skip to content

Commit

Permalink
Parallel transactions into jito
Browse files Browse the repository at this point in the history
  • Loading branch information
LevBeta committed Jul 1, 2024
1 parent 9093612 commit 6d76cb7
Showing 1 changed file with 40 additions and 27 deletions.
67 changes: 40 additions & 27 deletions src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use jito_protos::searcher::{
};
use jito_searcher_client::{get_searcher_client_no_auth, send_bundle_with_confirmation};
use log::{error, info};
use rayon::iter::IntoParallelRefIterator;
use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::RpcClient as NonBlockRpc,
Expand All @@ -22,7 +23,13 @@ use solana_sdk::{
system_instruction::transfer,
transaction::VersionedTransaction,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use std::{error::Error, str::FromStr};
use tonic::transport::Channel;

Expand All @@ -38,7 +45,7 @@ const SLEEP_DURATION: std::time::Duration = std::time::Duration::from_millis(500
pub struct TransactionManager {
rx: Receiver<BatchTransactions>,
keypair: Keypair,
rpc: RpcClient,
rpc: Arc<RpcClient>,
non_block_rpc: NonBlockRpc,
/// The searcher client for the jito block engine
searcher_client: SearcherServiceClient<Channel>,
Expand All @@ -63,8 +70,10 @@ impl TransactionManager {
.await
.unwrap();

let rpc =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::confirmed());
let rpc = Arc::new(RpcClient::new_with_commitment(
config.rpc_url.clone(),
CommitmentConfig::confirmed(),
));

let non_block_rpc = NonBlockRpc::new(config.rpc_url.clone());

Expand Down Expand Up @@ -96,49 +105,53 @@ impl TransactionManager {

/// Starts the transaction manager
pub async fn start(&mut self) {
for instructions in self.rx.iter() {
for instructions in self.rx.clone().iter() {
let transactions = self.configure_instructions(instructions).await.unwrap();
for transaction in transactions {
if let Err(e) = self
.send_transaction(transaction, self.searcher_client.clone())
loop {
let next_leader = self.searcher_client
.get_next_scheduled_leader(NextScheduledLeaderRequest {})
.await
{
error!("Failed to send transaction: {:?}", e);
.unwrap()
.into_inner();

let num_slots = next_leader.next_leader_slot - next_leader.current_slot;

if num_slots <= LEADERSHIP_THRESHOLD {
break;
}

tokio::time::sleep(SLEEP_DURATION).await;
}
transactions.iter().for_each(|tx| {
let mut transaction = Self::send_transaction(
tx.clone(),
self.searcher_client.clone(),
self.rpc.clone(),
);
tokio::spawn(async move {
if let Err(e) = transaction.await {
error!("Failed to send transaction: {:?}", e);
}
});
});
}
}

/// Sends a transaction/bundle of transactions to the jito
/// block engine and waits for confirmation
async fn send_transaction(
&self,
transaction: VersionedTransaction,
mut searcher_client: SearcherServiceClient<Channel>,
rpc: Arc<RpcClient>,
) -> anyhow::Result<()> {
loop {
let next_leader = searcher_client
.get_next_scheduled_leader(NextScheduledLeaderRequest {})
.await?
.into_inner();

let num_slots = next_leader.next_leader_slot - next_leader.current_slot;

if num_slots <= LEADERSHIP_THRESHOLD {
break;
}

tokio::time::sleep(SLEEP_DURATION).await;
}

let mut bundle_results_subscription = searcher_client
.subscribe_bundle_results(SubscribeBundleResultsRequest {})
.await?
.into_inner();

if let Err(e) = send_bundle_with_confirmation(
&[transaction],
&self.rpc,
&rpc,
&mut searcher_client,
&mut bundle_results_subscription,
)
Expand Down

0 comments on commit 6d76cb7

Please sign in to comment.