Skip to content

Commit

Permalink
Add loop into geyser / Change to Versioned Transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
LevBeta committed Jul 1, 2024
1 parent 48f6869 commit 604c7d3
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 74 deletions.
136 changes: 72 additions & 64 deletions src/geyser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,63 +51,47 @@ impl GeyserService {
liquidator_sender: Sender<GeyserUpdate>,
rebalancer_sender: Sender<GeyserUpdate>,
) -> anyhow::Result<()> {
info!("Connecting to geyser");

let mut client = GeyserGrpcClient::build_from_shared(config.endpoint)?
.x_token(config.x_token)?
.connect()
.await?;

info!("Connected to geyser");

let tracked_accounts_vec: Vec<Pubkey> = tracked_accounts.keys().cloned().collect();

let sub_req =
Self::build_geyser_subscribe_request(&tracked_accounts_vec, &marginfi_program_id);

let (_, mut stream) = client.subscribe_with_request(Some(sub_req)).await?;

while let Some(msg) = stream.next().await {
match msg {
Ok(msg) => {
if let Some(update_oneof) = msg.update_oneof {
if let subscribe_update::UpdateOneof::Account(account) = update_oneof {
if let Some(update_account) = &account.account {
if let Ok(address) = Pubkey::try_from(update_account.pubkey.clone())
{
if let Ok(account) = account_update_to_account(update_account) {
if let Ok(account_owner_pk) =
Pubkey::try_from(account.owner)
loop {
info!("Connecting to geyser");

let mut client = GeyserGrpcClient::build_from_shared(config.endpoint.clone())?
.x_token(config.x_token.clone())?
.connect()
.await?;

info!("Connected to geyser");

let tracked_accounts_vec: Vec<Pubkey> = tracked_accounts.keys().cloned().collect();

let sub_req =
Self::build_geyser_subscribe_request(&tracked_accounts_vec, &marginfi_program_id);

let (_, mut stream) = client.subscribe_with_request(Some(sub_req)).await?;

while let Some(msg) = stream.next().await {
match msg {
Ok(msg) => {
if let Some(update_oneof) = msg.update_oneof {
if let subscribe_update::UpdateOneof::Account(account) = update_oneof {
if let Some(update_account) = &account.account {
if let Ok(address) =
Pubkey::try_from(update_account.pubkey.clone())
{
if let Ok(account) =
account_update_to_account(update_account)
{
if account_owner_pk == marginfi_program_id
&& update_account.data.len() == MARGIN_ACCOUNT_SIZE
if let Ok(account_owner_pk) =
Pubkey::try_from(account.owner)
{
let update = GeyserUpdate {
account_type: AccountType::MarginfiAccount,
address,
account: account.clone(),
};
if let Err(e) =
liquidator_sender.send(update.clone())
{
error!("Error sending update to the liquidator sender: {:?}", e);
}
if let Err(e) =
rebalancer_sender.send(update.clone())
if account_owner_pk == marginfi_program_id
&& update_account.data.len()
== MARGIN_ACCOUNT_SIZE
{
error!("Error sending update to the rebalancer sender: {:?}", e);
}
}
}
if let Some(account_type) = tracked_accounts.get(&address) {
let update = GeyserUpdate {
account_type: account_type.clone(),
address,
account: account.clone(),
};

match account_type {
AccountType::OracleAccount => {
let update = GeyserUpdate {
account_type: AccountType::MarginfiAccount,
address,
account: account.clone(),
};
if let Err(e) =
liquidator_sender.send(update.clone())
{
Expand All @@ -119,25 +103,49 @@ impl GeyserService {
error!("Error sending update to the rebalancer sender: {:?}", e);
}
}
AccountType::TokenAccount => {
if let Err(e) =
rebalancer_sender.send(update.clone())
{
error!("Error sending update to the rebalancer sender: {:?}", e);
}
if let Some(account_type) =
tracked_accounts.get(&address)
{
let update = GeyserUpdate {
account_type: account_type.clone(),
address,
account: account.clone(),
};

match account_type {
AccountType::OracleAccount => {
if let Err(e) =
liquidator_sender.send(update.clone())
{
error!("Error sending update to the liquidator sender: {:?}", e);
}
if let Err(e) =
rebalancer_sender.send(update.clone())
{
error!("Error sending update to the rebalancer sender: {:?}", e);
}
}
AccountType::TokenAccount => {
if let Err(e) =
rebalancer_sender.send(update.clone())
{
error!("Error sending update to the rebalancer sender: {:?}", e);
}
}
_ => {}
}
_ => {}
}
}
}
}
}
}
}
}
Err(e) => {
error!("Error receiving message from geyser {:?}", e);
break;
Err(e) => {
error!("Error receiving message from geyser {:?}", e);
break;
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/liquidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl Liquidator {
let asset_amount_to_liquidate =
min(max_liquidation_amount, liquidation_asset_amount_capacity);

let slippage_adjusted_asset_amount = asset_amount_to_liquidate * I80F48!(0.98);
let slippage_adjusted_asset_amount = asset_amount_to_liquidate * I80F48!(0.95);

Some(PreparedLiquidatableAccount {
liquidate_account: account.clone(),
Expand Down
19 changes: 10 additions & 9 deletions src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,15 @@ impl TransactionManager {

ixs.push(ComputeBudgetInstruction::set_compute_unit_limit(500_000));

let transaction = Transaction::new_signed_with_payer(
&ixs,
Some(&self.keypair.pubkey()),
let transaction = VersionedTransaction::try_new(
VersionedMessage::V0(v0::Message::try_compile(
&self.keypair.pubkey(),
&ixs,
&self.lookup_tables,
recent_blockhash,
)?),
&[&self.keypair],
recent_blockhash,
);
)?;

let signature = *transaction.get_signature();

Expand All @@ -162,13 +165,11 @@ impl TransactionManager {

let blockhash = transaction.get_recent_blockhash();

if let Err(err) = self.non_block_rpc.confirm_transaction_with_spinner(
self.non_block_rpc.confirm_transaction_with_spinner(
&signature,
blockhash,
CommitmentConfig::confirmed(),
) {
error!("Error confirming transaction: {:?}", err);
}
)?;

Ok(signature)
}
Expand Down

0 comments on commit 604c7d3

Please sign in to comment.