diff --git a/Cargo.lock b/Cargo.lock index 48e14d1..c931005 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1492,6 +1492,7 @@ dependencies = [ "hex", "hex-literal", "log", + "pubsub-client-fixed", "rayon", "regex", "serde", @@ -1833,6 +1834,12 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" +[[package]] +name = "gjson" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43503cc176394dd30a6525f5f36e838339b8b5619be33ed9a7783841580a97b6" + [[package]] name = "goblin" version = "0.5.4" @@ -2811,6 +2818,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "pubsub-client-fixed" +version = "0.1.0" +dependencies = [ + "env_logger 0.11.5", + "futures-util", + "gjson", + "log", + "serde", + "serde_json", + "solana-account-decoder", + "solana-client", + "solana-rpc-client-api", + "solana-sdk", + "solana-transaction-status", + "thiserror", + "tokio", + "tokio-stream", + "tokio-tungstenite", + "url", +] + [[package]] name = "qstring" version = "0.7.2" diff --git a/Cargo.toml b/Cargo.toml index a2bc728..ca1e695 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,8 @@ thiserror = "1" tokio = { version = "1.40", features = ["full"] } type-layout = "0.2.0" +drift-pubsub-client = { package = "pubsub-client-fixed", path = "crates/pubsub-client" } + [dev-dependencies] bytes = "1" hex = "0.4" diff --git a/crates/pubsub-client/Cargo.toml b/crates/pubsub-client/Cargo.toml new file mode 100644 index 0000000..ac771d0 --- /dev/null +++ b/crates/pubsub-client/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "pubsub-client-fixed" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +readme = "README.md" +repository = "https://github.com/drift-labs/drift-rs" + +[dependencies] +env_logger = "0.11" +futures-util = "0.3" +gjson = "0.8.1" +log = "0.4" +serde = { version = "1", features = ["derive"] } +serde_json = { version = "1", features = ["raw_value"] } +solana-account-decoder = "2" +solana-client = "2" +solana-sdk = "2" +solana-rpc-client-api = "2" +solana-transaction-status = "2" +thiserror = "1" +tokio = { version = "1.40", features = ["full"] } +tokio-stream = "*" +tokio-tungstenite = "*" +url = "*" diff --git a/crates/pubsub-client/README.md b/crates/pubsub-client/README.md new file mode 100644 index 0000000..5bef727 --- /dev/null +++ b/crates/pubsub-client/README.md @@ -0,0 +1 @@ +Drop-in replacement for the Solana/Agave PubSubClient with better disconnection handling \ No newline at end of file diff --git a/crates/pubsub-client/src/lib.rs b/crates/pubsub-client/src/lib.rs new file mode 100644 index 0000000..54ad5bd --- /dev/null +++ b/crates/pubsub-client/src/lib.rs @@ -0,0 +1,526 @@ +use std::{collections::BTreeMap, time::Duration}; + +use futures_util::{ + future::{ready, BoxFuture, FutureExt}, + sink::SinkExt, + stream::{BoxStream, StreamExt}, +}; +use log::*; +use serde::de::DeserializeOwned; +use serde_json::{json, Value}; +use solana_account_decoder::UiAccount; +use solana_rpc_client_api::{ + config::{ + RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, + RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, + }, + error_object::RpcErrorObject, + response::{ + Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse, + RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, + }, +}; +use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; +use thiserror::Error; +use tokio::{ + sync::{ + mpsc::{self, UnboundedSender}, + oneshot, + }, + task::JoinHandle, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_tungstenite::{ + connect_async, + tungstenite::{ + protocol::frame::{coding::CloseCode, CloseFrame}, + Message, + }, +}; +use url::Url; + +pub type PubsubClientResult = Result; + +#[derive(Debug, Error)] +pub enum PubsubClientError { + #[error("url parse error")] + UrlParseError(#[from] url::ParseError), + + #[error("unable to connect to server")] + ConnectionError(tokio_tungstenite::tungstenite::Error), + + #[error("websocket error")] + WsError(#[from] tokio_tungstenite::tungstenite::Error), + + #[error("connection closed (({0})")] + ConnectionClosed(String), + + #[error("json parse error")] + JsonParseError(#[from] serde_json::error::Error), + + #[error("subscribe failed: {reason}")] + SubscribeFailed { reason: String, message: String }, + + #[error("unexpected message format: {0}")] + UnexpectedMessageError(String), + + #[error("request failed: {reason}")] + RequestFailed { reason: String, message: String }, + + #[error("request error: {0}")] + RequestError(String), + + #[error("could not find subscription id: {0}")] + UnexpectedSubscriptionResponse(String), + + #[error("could not find node version: {0}")] + UnexpectedGetVersionResponse(String), +} + +type UnsubscribeFn = Box BoxFuture<'static, ()> + Send>; +type SubscribeResponseMsg = + Result<(mpsc::UnboundedReceiver, UnsubscribeFn), PubsubClientError>; +type SubscribeRequestMsg = (String, Value, oneshot::Sender); +type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>; +type RequestMsg = ( + String, + Value, + oneshot::Sender>, +); + +/// A client for subscribing to messages from the RPC server. +/// +/// See the [module documentation][self]. +#[derive(Debug)] +pub struct PubsubClient { + subscribe_sender: mpsc::UnboundedSender, + _request_sender: mpsc::UnboundedSender, + shutdown_sender: oneshot::Sender<()>, + ws: JoinHandle>, + url: Url, +} + +impl PubsubClient { + pub async fn new(url: &str) -> PubsubClientResult { + let url = Url::parse(url)?; + + let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel(); + let (_request_sender, request_receiver) = mpsc::unbounded_channel(); + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + + // spawn Ws manager task + let ws_handle = tokio::spawn(PubsubClient::run_ws( + url.clone(), + subscribe_receiver, + request_receiver, + shutdown_receiver, + )); + + #[allow(clippy::used_underscore_binding)] + Ok(Self { + subscribe_sender, + _request_sender, + shutdown_sender, + ws: ws_handle, + url, + }) + } + + /// Returns the URL of the underlying Ws + pub fn url(&self) -> Url { + self.url.clone() + } + + /// Returns true if the underlying Ws connection task is running + /// + /// NB: the actual Ws may be either connected or reconnecting + pub fn is_running(&self) -> bool { + !self.ws.is_finished() + } + + pub async fn shutdown(self) -> PubsubClientResult { + let _ = self.shutdown_sender.send(()); + self.ws.await.unwrap() // WS future should not be cancelled or panicked + } + + // TODO: if the underlying Ws is ready, it should tell users somehow?? + async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T> + where + T: DeserializeOwned + Send + 'a, + { + let (response_sender, response_receiver) = oneshot::channel(); + self.subscribe_sender + .send((operation.to_string(), params.clone(), response_sender)) + .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?; + + let (notifications, unsubscribe) = response_receiver + .await + .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??; + + Ok(( + UnboundedReceiverStream::new(notifications) + .filter_map(|value| ready(serde_json::from_value::(value).ok())) + .boxed(), + unsubscribe, + )) + } + + /// Subscribe to account events. + /// + /// Receives messages of type [`UiAccount`] when an account's lamports or data changes. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`accountSubscribe`] RPC method. + /// + /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket#accountsubscribe + pub async fn account_subscribe( + &self, + pubkey: &Pubkey, + config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + let params = json!([pubkey.to_string(), config]); + self.subscribe("account", params).await + } + + /// Subscribe to block events. + /// + /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized. + /// + /// This method is disabled by default. It can be enabled by passing + /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`blockSubscribe`] RPC method. + /// + /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket#blocksubscribe + pub async fn block_subscribe( + &self, + filter: RpcBlockSubscribeFilter, + config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + self.subscribe("block", json!([filter, config])).await + } + + /// Subscribe to transaction log events. + /// + /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`logsSubscribe`] RPC method. + /// + /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket#logssubscribe + pub async fn logs_subscribe( + &self, + filter: RpcTransactionLogsFilter, + config: RpcTransactionLogsConfig, + ) -> SubscribeResult<'_, RpcResponse> { + self.subscribe("logs", json!([filter, config])).await + } + + /// Subscribe to program account events. + /// + /// Receives messages of type [`RpcKeyedAccount`] when an account owned + /// by the given program changes. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`programSubscribe`] RPC method. + /// + /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket#programsubscribe + pub async fn program_subscribe( + &self, + pubkey: &Pubkey, + config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + let params = json!([pubkey.to_string(), config]); + self.subscribe("program", params).await + } + + /// Subscribe to vote events. + /// + /// Receives messages of type [`RpcVote`] when a new vote is observed. These + /// votes are observed prior to confirmation and may never be confirmed. + /// + /// This method is disabled by default. It can be enabled by passing + /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`voteSubscribe`] RPC method. + /// + /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket#votesubscribe + pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> { + self.subscribe("vote", json!([])).await + } + + /// Subscribe to root events. + /// + /// Receives messages of type [`Slot`] when a new [root] is set by the + /// validator. + /// + /// [root]: https://solana.com/docs/terminology#root + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`rootSubscribe`] RPC method. + /// + /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket#rootsubscribe + pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> { + self.subscribe("root", json!([])).await + } + + /// Subscribe to transaction confirmation events. + /// + /// Receives messages of type [`RpcSignatureResult`] when a transaction + /// with the given signature is committed. + /// + /// This is a subscription to a single notification. It is automatically + /// cancelled by the server once the notification is sent. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`signatureSubscribe`] RPC method. + /// + /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket#signaturesubscribe + pub async fn signature_subscribe( + &self, + signature: &Signature, + config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + let params = json!([signature.to_string(), config]); + self.subscribe("signature", params).await + } + + /// Subscribe to slot events. + /// + /// Receives messages of type [`SlotInfo`] when a slot is processed. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`slotSubscribe`] RPC method. + /// + /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket#slotsubscribe + pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> { + self.subscribe("slot", json!([])).await + } + + /// Subscribe to slot update events. + /// + /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur. + /// + /// Note that this method operates differently than other subscriptions: + /// instead of sending the message to a receiver on a channel, it accepts a + /// `handler` callback that processes the message directly. This processing + /// occurs on another thread. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method. + /// + /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket#slotsupdatessubscribe + pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> { + self.subscribe("slotsUpdates", json!([])).await + } + + async fn run_ws( + url: Url, + mut subscribe_receiver: mpsc::UnboundedReceiver, + mut request_receiver: mpsc::UnboundedReceiver, + mut shutdown_receiver: oneshot::Receiver<()>, + ) -> PubsubClientResult { + // manage Ws requests and forward subscription messages to subscribers + // this loop will retry indefinitely unless the consumer invokes `shutdown` + let max_retry_count = 3; + let mut retry_count = 0; + 'reconnect: loop { + let (mut ws, _response) = match connect_async(url.as_str()).await { + Ok(res) => { + retry_count = 0; + res + } + Err(err) => { + log::warn!(target: "ws", "couldn't reconnect: {err:?}"); + if retry_count >= max_retry_count { + log::warn!(target: "ws", "reached max reconneciton attempts: {err:?}"); + break 'reconnect Err(PubsubClientError::ConnectionError(err)); + } + retry_count += 1; + let delay = 2_u64.pow(2 + retry_count); + info!("PubSubClient try reconnect after {delay}s, attempt: {retry_count}/{max_retry_count}"); + tokio::time::sleep(Duration::from_secs(delay)).await; + continue 'reconnect; + } + }; + + let mut request_id: u64 = 0; + + let mut requests_subscribe = + BTreeMap::)>::new(); + let mut requests_unsubscribe = BTreeMap::>::new(); + let mut other_requests = BTreeMap::>::new(); + let mut subscriptions = BTreeMap::>::new(); + let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel(); + + 'manager: loop { + tokio::select! { + biased; + // Send close on shutdown signal + _ = (&mut shutdown_receiver) => { + log::info!("PubsubClient received shutdown"); + let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() }; + ws.send(Message::Close(Some(frame))).await?; + ws.flush().await?; + break 'reconnect Ok(()); + }, + // Read incoming WebSocket message + next_msg = ws.next() => { + let msg = match next_msg { + Some(msg) => msg?, + None => break 'manager, + }; + trace!("ws.next(): {:?}", &msg); + + // Get text from the message + let text = match msg { + Message::Text(ref text) => text, + Message::Close(_frame) => break 'manager, + Message::Ping(_) | Message::Pong(_) | Message::Binary(_) | Message::Frame(_) => continue 'manager, + }; + + // Notification, example: + // `{"jsonrpc":"2.0","method":"logsNotification","params":{"result":{...},"subscription":3114862}}` + let params = gjson::get(text, "params"); + if params.exists() { + let sid = params.get("subscription").u64(); + let mut unsubscribe_required = false; + + if let Some(notifications_sender) = subscriptions.get(&sid) { + let result = params.get("result"); + if result.exists() && notifications_sender.send(serde_json::from_str(result.json()).expect("valid json")).is_err() { + unsubscribe_required = true; + } + } else { + unsubscribe_required = true; + } + + if unsubscribe_required { + let method = gjson::get(text, "method"); + if let Some(operation) = method.str().strip_suffix("Notification") { + let (response_sender, _response_receiver) = oneshot::channel(); + let _ = unsubscribe_sender.send((operation.to_string(), sid, response_sender)); + } + } + // done processing notification + continue 'manager; + } + + // Subscribe/Unsubscribe response, example: + // `{"jsonrpc":"2.0","result":5308752,"id":1}` + let id = gjson::get(text, "id"); + if id.exists() { + let err = gjson::get(text, "error"); + let err = if err.exists() { + match serde_json::from_str::(err.json()) { + Ok(rpc_error_object) => { + Some(format!("{} ({})", rpc_error_object.message, rpc_error_object.code)) + } + Err(e) => Some(format!( + "Failed to deserialize RPC error response: {} [{e}]", err.str(), + )) + } + } else { + None + }; + + let id = id.u64(); + if let Some(response_sender) = other_requests.remove(&id) { + match err { + Some(reason) => { + let _ = response_sender.send(Err(PubsubClientError::RequestFailed { reason, message: text.clone()})); + }, + None => { + let json_result = gjson::get(text, "result"); + let json_result_value = if json_result.exists() { + Ok(serde_json::from_str::(json_result.json()).unwrap()) + } else { + Err(PubsubClientError::RequestFailed { reason: "missing `result` field".into(), message: text.clone() }) + }; + + if let Err(err) = response_sender.send(json_result_value) { + log::warn!("Ws request failed: {err:?}"); + break 'manager; + } + } + } + } else if let Some(response_sender) = requests_unsubscribe.remove(&id) { + let _ = response_sender.send(()); // do not care if receiver is closed + } else if let Some((operation, response_sender)) = requests_subscribe.remove(&id) { + match err { + Some(reason) => { + let _ = response_sender.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.clone()})); + }, + None => { + // Subscribe Id + let sid = gjson::get(text, "result"); + if !sid.exists() { + return Err(PubsubClientError::SubscribeFailed { reason: "invalid `result` field".into(), message: text.clone() }); + } + let sid = sid.u64(); + + // Create notifications channel and unsubscribe function + let (notifications_sender, notifications_receiver) = mpsc::unbounded_channel(); + let unsubscribe_sender = unsubscribe_sender.clone(); + let unsubscribe = Box::new(move || async move { + let (response_sender, response_receiver) = oneshot::channel(); + // do nothing if ws already closed + if unsubscribe_sender.send((operation, sid, response_sender)).is_ok() { + let _ = response_receiver.await; // channel can be closed only if ws is closed + } + }.boxed()); + + if response_sender.send(Ok((notifications_receiver, unsubscribe))).is_err() { + break 'manager; + } + subscriptions.insert(sid, notifications_sender); + } + } + } else { + error!("Unknown request id: {id}"); + break 'manager; + } + continue 'manager; + } + } + // Read message for subscribe + Some((operation, params, response_sender)) = subscribe_receiver.recv() => { + request_id += 1; + let method = format!("{operation}Subscribe"); + let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string(); + ws.send(Message::Text(text)).await?; + requests_subscribe.insert(request_id, (operation, response_sender)); + }, + // Read message for unsubscribe + Some((operation, sid, response_sender)) = unsubscribe_receiver.recv() => { + subscriptions.remove(&sid); + request_id += 1; + let method = format!("{operation}Unsubscribe"); + let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string(); + ws.send(Message::Text(text)).await?; + requests_unsubscribe.insert(request_id, response_sender); + }, + // Read message for other requests + Some((method, params, response_sender)) = request_receiver.recv() => { + request_id += 1; + let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string(); + ws.send(Message::Text(text)).await?; + other_requests.insert(request_id, response_sender); + } + } + } + } + } +} diff --git a/crates/src/account_map.rs b/crates/src/account_map.rs index 6226cca..027e658 100644 --- a/crates/src/account_map.rs +++ b/crates/src/account_map.rs @@ -2,12 +2,13 @@ use std::sync::{Arc, Mutex, RwLock}; use anchor_lang::AccountDeserialize; use dashmap::DashMap; +use drift_pubsub_client::PubsubClient; use log::debug; use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey}; use crate::{ - types::DataAndSlot, utils::get_ws_url, - websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult, UnsubHandle, + types::DataAndSlot, websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult, + UnsubHandle, }; const LOG_TARGET: &str = "accountmap"; @@ -20,15 +21,15 @@ pub struct AccountSlot { /// Set of subscriptions to a dynamic subset of network accounts pub struct AccountMap { - endpoint: String, + pubsub: Arc, commitment: CommitmentConfig, inner: DashMap, ahash::RandomState>, } impl AccountMap { - pub fn new(endpoint: String, commitment: CommitmentConfig) -> Self { + pub fn new(pubsub: Arc, commitment: CommitmentConfig) -> Self { Self { - endpoint, + pubsub, commitment, inner: Default::default(), } @@ -40,7 +41,7 @@ impl AccountMap { } debug!(target: LOG_TARGET, "subscribing: {account:?}"); - let user = AccountSub::new(&self.endpoint, self.commitment, *account); + let user = AccountSub::new(Arc::clone(&self.pubsub), self.commitment, *account); let user = user.subscribe().await?; self.inner.insert(*account, user); @@ -88,12 +89,8 @@ pub struct AccountSub { impl AccountSub { pub const SUBSCRIPTION_ID: &'static str = "account"; - pub fn new(endpoint: &str, commitment: CommitmentConfig, pubkey: Pubkey) -> Self { - let subscription = WebsocketAccountSubscriber::new( - get_ws_url(endpoint).expect("valid url"), - pubkey, - commitment, - ); + pub fn new(pubsub: Arc, commitment: CommitmentConfig, pubkey: Pubkey) -> Self { + let subscription = WebsocketAccountSubscriber::new(pubsub, pubkey, commitment); Self { pubkey, @@ -165,13 +162,21 @@ mod tests { use super::*; use crate::{ - accounts::User, constants::DEFAULT_PUBKEY, utils::test_envs::mainnet_endpoint, Wallet, + accounts::User, + constants::DEFAULT_PUBKEY, + utils::{get_ws_url, test_envs::mainnet_endpoint}, + Wallet, }; #[tokio::test] async fn test_user_subscribe() { let _ = env_logger::try_init(); - let account_map = AccountMap::new(mainnet_endpoint().into(), CommitmentConfig::confirmed()); + let pubsub = Arc::new( + PubsubClient::new(&get_ws_url(&mainnet_endpoint()).unwrap()) + .await + .expect("ws connects"), + ); + let account_map = AccountMap::new(pubsub, CommitmentConfig::confirmed()); let user_1 = Wallet::derive_user_account( &pubkey!("DxoRJ4f5XRMvXU9SGuM4ZziBFUxbhB3ubur5sVZEvue2"), 0, diff --git a/crates/src/drift_idl.rs b/crates/src/drift_idl.rs index ae29ea4..024c170 100644 --- a/crates/src/drift_idl.rs +++ b/crates/src/drift_idl.rs @@ -2,7 +2,6 @@ #![doc = r""] #![doc = r" Auto-generated IDL types, manual edits do not persist (see `crates/drift-idl-gen`)"] #![doc = r""] -use self::traits::ToAccountMetas; use anchor_lang::{ prelude::{ account, @@ -13,6 +12,8 @@ use anchor_lang::{ }; use serde::{Deserialize, Serialize}; use solana_sdk::{instruction::AccountMeta, pubkey::Pubkey}; + +use self::traits::ToAccountMetas; pub mod traits { use solana_sdk::instruction::AccountMeta; #[doc = r" This is distinct from the anchor_lang version of the trait"] @@ -1918,8 +1919,9 @@ pub mod instructions { } pub mod types { #![doc = r" IDL types"] - use super::*; use std::ops::Mul; + + use super::*; #[doc = ""] #[doc = " backwards compatible u128 deserializing data from rust <=1.76.0 when u/i128 was 8-byte aligned"] #[doc = " https://solana.stackexchange.com/questions/7720/using-u128-without-sacrificing-alignment-8"] diff --git a/crates/src/lib.rs b/crates/src/lib.rs index cabc34e..a75a885 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -3,6 +3,7 @@ use std::{borrow::Cow, sync::Arc, time::Duration}; use anchor_lang::{AccountDeserialize, InstructionData}; +use drift_pubsub_client::PubsubClient; use futures_util::TryFutureExt; use log::debug; use solana_client::{nonblocking::rpc_client::RpcClient, rpc_response::Response}; @@ -18,6 +19,7 @@ use solana_sdk::{ transaction::VersionedTransaction, }; pub use solana_sdk::{address_lookup_table::AddressLookupTableAccount, pubkey::Pubkey}; +use utils::get_ws_url; use crate::{ account_map::AccountMap, @@ -521,6 +523,7 @@ impl DriftClient { /// It is intended to be a singleton pub struct DriftClientBackend { rpc_client: Arc, + pubsub_client: Arc, program_data: ProgramData, blockhash_subscriber: BlockhashSubscriber, account_map: AccountMap, @@ -532,8 +535,13 @@ pub struct DriftClientBackend { impl DriftClientBackend { /// Initialize a new `DriftClientBackend` async fn new(context: Context, rpc_client: Arc) -> SdkResult { - let perp_market_map = MarketMap::::new(Arc::clone(&rpc_client)); - let spot_market_map = MarketMap::::new(Arc::clone(&rpc_client)); + let pubsub_client = + Arc::new(PubsubClient::new(&get_ws_url(rpc_client.url().as_str())?).await?); + + let perp_market_map = + MarketMap::::new(Arc::clone(&rpc_client), Arc::clone(&pubsub_client)); + let spot_market_map = + MarketMap::::new(Arc::clone(&rpc_client), Arc::clone(&pubsub_client)); let lookup_table_address = context.lut(); @@ -557,16 +565,18 @@ impl DriftClientBackend { all_oracles.push(*market_oracle_info); } - let oracle_map = OracleMap::new(Arc::clone(&rpc_client), all_oracles.as_slice()); - let account_map = AccountMap::new(rpc_client.url(), rpc_client.commitment()); + let oracle_map = OracleMap::new( + Arc::clone(&rpc_client), + Arc::clone(&pubsub_client), + all_oracles.as_slice(), + ); + let account_map = AccountMap::new(Arc::clone(&pubsub_client), rpc_client.commitment()); account_map.subscribe_account(state_account()).await?; Ok(Self { rpc_client: Arc::clone(&rpc_client), - blockhash_subscriber: BlockhashSubscriber::new( - Duration::from_secs(2), - Arc::clone(&rpc_client), - ), + pubsub_client, + blockhash_subscriber: BlockhashSubscriber::new(Duration::from_secs(2), rpc_client), program_data: ProgramData::new( spot_market_map.values(), perp_market_map.values(), @@ -1694,23 +1704,29 @@ mod tests { rpc_mocks, )); - let perp_market_map = MarketMap::::new(Arc::clone(&rpc_client)); - let spot_market_map = MarketMap::::new(Arc::clone(&rpc_client)); + let pubsub_client = Arc::new( + PubsubClient::new(&get_ws_url(DEVNET_ENDPOINT).unwrap()) + .await + .expect("ws connects"), + ); + + let perp_market_map = + MarketMap::::new(Arc::clone(&rpc_client), Arc::clone(&pubsub_client)); + let spot_market_map = + MarketMap::::new(Arc::clone(&rpc_client), Arc::clone(&pubsub_client)); let backend = DriftClientBackend { rpc_client: Arc::clone(&rpc_client), + pubsub_client: Arc::clone(&pubsub_client), program_data: ProgramData::uninitialized(), perp_market_map, spot_market_map, - oracle_map: OracleMap::new(Arc::clone(&rpc_client), &[]), + oracle_map: OracleMap::new(Arc::clone(&rpc_client), Arc::clone(&pubsub_client), &[]), blockhash_subscriber: BlockhashSubscriber::new( Duration::from_secs(2), Arc::clone(&rpc_client), ), - account_map: AccountMap::new( - DEVNET_ENDPOINT.to_string(), - CommitmentConfig::processed(), - ), + account_map: AccountMap::new(Arc::clone(&pubsub_client), CommitmentConfig::processed()), }; DriftClient { diff --git a/crates/src/marketmap.rs b/crates/src/marketmap.rs index aa148fe..0d1e83d 100644 --- a/crates/src/marketmap.rs +++ b/crates/src/marketmap.rs @@ -8,6 +8,7 @@ use std::{ use anchor_lang::{AccountDeserialize, AnchorDeserialize}; use dashmap::DashMap; +use drift_pubsub_client::PubsubClient; use futures_util::{stream::FuturesUnordered, StreamExt}; use serde_json::json; use solana_account_decoder::UiAccountEncoding; @@ -24,7 +25,6 @@ use crate::{ constants::{self, derive_perp_market_account, derive_spot_market_account, state_account}, drift_idl::types::OracleSource, memcmp::get_market_filter, - utils::get_ws_url, websocket_account_subscriber::WebsocketAccountSubscriber, DataAndSlot, MarketId, MarketType, PerpMarket, SdkError, SdkResult, SpotMarket, UnsubHandle, }; @@ -78,6 +78,7 @@ pub struct MarketMap { subscriptions: DashMap, latest_slot: Arc, rpc: Arc, + pubsub: Arc, } impl MarketMap @@ -86,11 +87,12 @@ where { pub const SUBSCRIPTION_ID: &'static str = "marketmap"; - pub fn new(rpc: Arc) -> Self { + pub fn new(rpc: Arc, pubsub: Arc) -> Self { Self { subscriptions: Default::default(), marketmap: Arc::default(), latest_slot: Arc::new(AtomicU64::new(0)), + pubsub, rpc, } } @@ -98,7 +100,6 @@ where /// Subscribe to market account updates pub async fn subscribe(&self, markets: &[MarketId]) -> SdkResult<()> { log::debug!(target: LOG_TARGET, "subscribing: {:?}", T::MARKET_TYPE); - let url = get_ws_url(&self.rpc.url()).expect("valid url"); let markets = HashSet::::from_iter(markets.iter().copied()); let mut pending_subscriptions = @@ -113,8 +114,11 @@ where MarketType::Spot => derive_spot_market_account(market.index()), }; - let market_subscriber = - WebsocketAccountSubscriber::new(url.clone(), market_pubkey, self.rpc.commitment()); + let market_subscriber = WebsocketAccountSubscriber::new( + Arc::clone(&self.pubsub), + market_pubkey, + self.rpc.commitment(), + ); pending_subscriptions.push((market.index(), market_subscriber)); } @@ -337,14 +341,29 @@ pub async fn get_market_accounts_with_fallback( mod tests { use std::sync::Arc; + use drift_pubsub_client::PubsubClient; use solana_client::nonblocking::rpc_client::RpcClient; use super::{get_market_accounts_with_fallback, MarketMap}; - use crate::{accounts::PerpMarket, utils::test_envs::devnet_endpoint, MarketId}; + use crate::{ + accounts::{PerpMarket, SpotMarket}, + utils::{ + get_ws_url, + test_envs::{devnet_endpoint, mainnet_endpoint}, + }, + MarketId, + }; #[tokio::test] async fn marketmap_subscribe() { - let map = MarketMap::::new(Arc::new(RpcClient::new(devnet_endpoint()))); + let map = MarketMap::::new( + Arc::new(RpcClient::new(devnet_endpoint())), + Arc::new( + PubsubClient::new(&get_ws_url(&devnet_endpoint()).unwrap()) + .await + .expect("ws connects"), + ), + ); assert!(map .subscribe(&[MarketId::perp(0), MarketId::perp(1), MarketId::perp(1)]) @@ -362,11 +381,17 @@ mod tests { #[tokio::test] async fn get_market_accounts_with_fallback_works() { - let result = + let result: Result<(Vec, _), _> = get_market_accounts_with_fallback::(&RpcClient::new(devnet_endpoint())) .await; assert!(result.is_ok_and(|r| r.0.len() > 0 && r.1 > 0)); + + let result = + get_market_accounts_with_fallback::(&RpcClient::new(devnet_endpoint())) + .await; + + assert!(result.is_ok_and(|r| r.0.len() > 0 && r.1 > 0)); } } diff --git a/crates/src/math/liquidation.rs b/crates/src/math/liquidation.rs index a4bf3d5..044ec3f 100644 --- a/crates/src/math/liquidation.rs +++ b/crates/src/math/liquidation.rs @@ -47,7 +47,7 @@ pub async fn calculate_liquidation_price_and_unrealized_pnl( let position = user .get_perp_position(market_index) - .map_err(|_| SdkError::NoPosiiton(market_index))?; + .map_err(|_| SdkError::NoPosition(market_index))?; // build a list of all user positions for margin calculations let mut builder = AccountsListBuilder::default(); @@ -100,7 +100,7 @@ pub async fn calculate_unrealized_pnl( calculate_unrealized_pnl_inner(&position, oracle_price) } else { - Err(SdkError::NoPosiiton(market_index)) + Err(SdkError::NoPosition(market_index)) } } @@ -173,7 +173,7 @@ pub fn calculate_liquidation_price_inner( // calculate perp free collateral delta let perp_position = user .get_perp_position(perp_market.market_index) - .map_err(|_| SdkError::NoPosiiton(perp_market.market_index))?; + .map_err(|_| SdkError::NoPosition(perp_market.market_index))?; let perp_position_with_lp = perp_position.simulate_settled_lp_position(perp_market, oracle_price)?; diff --git a/crates/src/oraclemap.rs b/crates/src/oraclemap.rs index 3dd9124..81fc11b 100644 --- a/crates/src/oraclemap.rs +++ b/crates/src/oraclemap.rs @@ -5,6 +5,7 @@ use std::sync::{ use ahash::HashSet; use dashmap::{DashMap, ReadOnlyView}; +use drift_pubsub_client::PubsubClient; use futures_util::{stream::FuturesUnordered, StreamExt}; use log::warn; use solana_client::nonblocking::rpc_client::RpcClient; @@ -13,7 +14,6 @@ use solana_sdk::{account::Account, clock::Slot, pubkey::Pubkey}; use crate::{ drift_idl::types::OracleSource, ffi::{get_oracle_price, OraclePriceData}, - utils::get_ws_url, websocket_account_subscriber::{AccountUpdate, WebsocketAccountSubscriber}, MarketId, SdkError, SdkResult, UnsubHandle, }; @@ -43,6 +43,7 @@ pub struct OracleMap { oracle_by_market: ReadOnlyView, latest_slot: Arc, rpc: Arc, + pubsub: Arc, } impl OracleMap { @@ -50,9 +51,13 @@ impl OracleMap { /// Create a new `OracleMap` /// + /// * `rpc_client` - Shared RPC client instance + /// * `pubsub_client` - Shared Pubsub client instance /// * `all_oracles` - Exhaustive list of all Drift oracle pubkeys and source by market + /// pub fn new( rpc_client: Arc, + pubsub_client: Arc, all_oracles: &[(MarketId, Pubkey, OracleSource)], ) -> Self { log::debug!(target: LOG_TARGET, "all oracles: {:?}", all_oracles); @@ -83,6 +88,7 @@ impl OracleMap { subcriptions: Default::default(), latest_slot: Arc::new(AtomicU64::new(0)), rpc: rpc_client, + pubsub: pubsub_client, } } @@ -97,7 +103,6 @@ impl OracleMap { let markets = HashSet::from_iter(markets); log::debug!(target: LOG_TARGET, "subscribe market oracles: {markets:?}"); - let url = get_ws_url(&self.rpc.url()).expect("valid url"); let mut pending_subscriptions = Vec::<(WebsocketAccountSubscriber, Oracle)>::with_capacity(markets.len()); @@ -115,8 +120,11 @@ impl OracleMap { continue; } - let oracle_subscriber = - WebsocketAccountSubscriber::new(url.clone(), *oracle_pubkey, self.rpc.commitment()); + let oracle_subscriber = WebsocketAccountSubscriber::new( + Arc::clone(&self.pubsub), + *oracle_pubkey, + self.rpc.commitment(), + ); pending_subscriptions.push((oracle_subscriber, oracle_info.clone())); } @@ -385,7 +393,7 @@ async fn get_multi_account_data_with_fallback( #[cfg(test)] mod tests { use super::*; - use crate::utils::test_envs::devnet_endpoint; + use crate::utils::{get_ws_url, test_envs::devnet_endpoint}; const SOL_PERP_ORACLE: Pubkey = solana_sdk::pubkey!("BAtFj4kQttZRVep3UZS2aZRDixkGYgWsbqTBVDbnSsPF"); @@ -407,7 +415,12 @@ mod tests { (MarketId::spot(1), SOL_PERP_ORACLE, OracleSource::PythPull), ]; let rpc = Arc::new(RpcClient::new(devnet_endpoint().into())); - let map = OracleMap::new(rpc, &all_oracles); + let pubsub = Arc::new( + PubsubClient::new(&get_ws_url(&devnet_endpoint()).unwrap()) + .await + .expect("ws connects"), + ); + let map = OracleMap::new(rpc, pubsub, &all_oracles); // - dups ignored // - makerts with same oracle pubkey, make at most 1 sub @@ -437,7 +450,12 @@ mod tests { (MarketId::spot(1), SOL_PERP_ORACLE, OracleSource::PythPull), ]; let rpc = Arc::new(RpcClient::new(devnet_endpoint().into())); - let map = OracleMap::new(rpc, &all_oracles); + let pubsub = Arc::new( + PubsubClient::new(&get_ws_url(&devnet_endpoint()).unwrap()) + .await + .expect("ws connects"), + ); + let map = OracleMap::new(rpc, pubsub, &all_oracles); // - dups ignored // - makerts with same oracle pubkey, make at most 1 sub @@ -477,6 +495,11 @@ mod tests { ]; let map = OracleMap::new( Arc::new(RpcClient::new(devnet_endpoint().into())), + Arc::new( + PubsubClient::new(&get_ws_url(&devnet_endpoint()).unwrap()) + .await + .expect("ws connects"), + ), &all_oracles, ); map.subscribe(&[MarketId::spot(0), MarketId::perp(1)]) diff --git a/crates/src/slot_subscriber.rs b/crates/src/slot_subscriber.rs index ee9d73d..f48d4ae 100644 --- a/crates/src/slot_subscriber.rs +++ b/crates/src/slot_subscriber.rs @@ -3,9 +3,9 @@ use std::{ time::Duration, }; +use drift_pubsub_client::PubsubClient; use futures_util::StreamExt; use log::{debug, error, warn}; -use solana_client::nonblocking::pubsub_client::PubsubClient; use solana_sdk::clock::Slot; use tokio::sync::{ mpsc::{self}, @@ -35,8 +35,8 @@ const LOG_TARGET: &str = "slotsub"; /// ``` /// pub struct SlotSubscriber { + pubsub: Arc, current_slot: Arc, - url: String, unsub: Mutex>>, } @@ -59,10 +59,15 @@ impl SlotSubscriber { guard.is_some() } - pub fn new(url: String) -> Self { + /// Create a new `SlotSubscriber` + /// + /// * `pubsub` - a `PubsubClient` instance for the subscription to utilize (maybe shared) + /// + /// Consumer must call `.subscribe()` to start receiving updates + pub fn new(pubsub: Arc) -> Self { Self { + pubsub, current_slot: Arc::default(), - url, unsub: Mutex::new(None), } } @@ -72,6 +77,7 @@ impl SlotSubscriber { self.current_slot.load(std::sync::atomic::Ordering::Relaxed) } + /// Start the slot subscription task pub fn subscribe(&mut self, handler_fn: F) -> SdkResult<()> where F: 'static + Send + Fn(SlotUpdate), @@ -87,13 +93,13 @@ impl SlotSubscriber { F: 'static + Send + Fn(SlotUpdate), { let (slot_tx, mut slot_rx) = mpsc::channel(8); - let url = self.url.clone(); + let pubsub = Arc::clone(&self.pubsub); let join_handle = spawn_retry_task( move || { let task = SlotSubscriberTask { - endpoint: url.clone(), slot_tx: slot_tx.clone(), + pubsub: Arc::clone(&pubsub), }; task.slot_subscribe() }, @@ -148,21 +154,14 @@ impl SlotSubscriber { } struct SlotSubscriberTask { - endpoint: String, slot_tx: mpsc::Sender, + pubsub: Arc, } impl SlotSubscriberTask { async fn slot_subscribe(self) { debug!(target: LOG_TARGET, "start task"); - let pubsub = match PubsubClient::new(&self.endpoint).await { - Ok(p) => p, - Err(err) => { - debug!(target: LOG_TARGET, "connect failed: {err:?}"); - return; - } - }; - let (mut slot_updates, unsubscriber) = match pubsub.slot_subscribe().await { + let (mut slot_updates, unsubscriber) = match self.pubsub.slot_subscribe().await { Ok(s) => s, Err(err) => { debug!(target: LOG_TARGET, "subscribe failed: {err:?}"); diff --git a/crates/src/types.rs b/crates/src/types.rs index c4257ac..0cb243a 100644 --- a/crates/src/types.rs +++ b/crates/src/types.rs @@ -271,7 +271,7 @@ pub enum SdkError { #[error("{0}")] Rpc(#[from] solana_client::client_error::ClientError), #[error("{0}")] - Ws(#[from] solana_client::nonblocking::pubsub_client::PubsubClientError), + Ws(#[from] drift_pubsub_client::PubsubClientError), #[error("{0}")] Anchor(#[from] Box), #[error("error while deserializing")] @@ -285,7 +285,7 @@ pub enum SdkError { #[error("invalid base58 value")] InvalidBase58, #[error("user does not have position: {0}")] - NoPosiiton(u16), + NoPosition(u16), #[error("insufficient SOL balance for fees")] OutOfSOL, #[error("{0}")] diff --git a/crates/src/utils.rs b/crates/src/utils.rs index 812f8d6..4a8d3ce 100644 --- a/crates/src/utils.rs +++ b/crates/src/utils.rs @@ -141,7 +141,7 @@ pub mod test_envs { /// solana mainnet endpoint pub fn mainnet_endpoint() -> String { - std::env::var("TEST_MAINNET_RPC_ENDPOINT").expect("TEST_MAINNET_ENDPOINT set") + std::env::var("TEST_MAINNET_RPC_ENDPOINT").expect("TEST_MAINNET_RPC_ENDPOINT set") } /// solana devnet endpoint pub fn devnet_endpoint() -> String { diff --git a/crates/src/websocket_account_subscriber.rs b/crates/src/websocket_account_subscriber.rs index aca5496..07d4446 100644 --- a/crates/src/websocket_account_subscriber.rs +++ b/crates/src/websocket_account_subscriber.rs @@ -1,12 +1,10 @@ -use std::str::FromStr; +use std::{str::FromStr, sync::Arc}; +use drift_pubsub_client::PubsubClient; use futures_util::StreamExt; use log::warn; use solana_account_decoder::UiAccountEncoding; -use solana_client::{ - nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient}, - rpc_config::RpcAccountInfoConfig, -}; +use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcAccountInfoConfig}; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; use tokio::sync::oneshot; @@ -29,15 +27,15 @@ pub struct AccountUpdate { #[derive(Clone)] pub struct WebsocketAccountSubscriber { - url: String, + pubsub: Arc, pub(crate) pubkey: Pubkey, pub(crate) commitment: CommitmentConfig, } impl WebsocketAccountSubscriber { - pub fn new(url: String, pubkey: Pubkey, commitment: CommitmentConfig) -> Self { + pub fn new(pubsub: Arc, pubkey: Pubkey, commitment: CommitmentConfig) -> Self { WebsocketAccountSubscriber { - url, + pubsub, pubkey, commitment, } @@ -63,7 +61,7 @@ impl WebsocketAccountSubscriber { // seed initial account state log::debug!(target: LOG_TARGET, "seeding account: {subscription_name}-{:?}", self.pubkey); let owner: Pubkey; - let rpc = RpcClient::new(get_http_url(&self.url)?); + let rpc = RpcClient::new(get_http_url(self.pubsub.url().as_str())?); match rpc .get_account_with_commitment(&self.pubkey, self.commitment) .await @@ -96,114 +94,63 @@ impl WebsocketAccountSubscriber { ..RpcAccountInfoConfig::default() }; - let mut attempt = 0; - let max_reconnection_attempts = 20; - let base_delay = tokio::time::Duration::from_secs(2); - - let url = self.url.clone(); let (unsub_tx, mut unsub_rx) = oneshot::channel::<()>(); tokio::spawn({ let mut latest_slot = 0; let pubkey = self.pubkey; + let pubsub = Arc::clone(&self.pubsub); async move { log::debug!(target: LOG_TARGET, "spawn account subscriber: {subscription_name}-{pubkey:?}"); - let exit_status = 'outer: loop { - let pubsub = match PubsubClient::new(&url).await { - Ok(client) => { - attempt = 0; - client - } + 'outer: loop { + let (mut account_updates, account_unsubscribe) = match pubsub + .account_subscribe(&pubkey, Some(account_config.clone())) + .await + { + Ok(res) => res, Err(err) => { - warn!(target: LOG_TARGET, "couldn't subscribe {pubkey:?}: {err:?}, retrying..."); - attempt += 1; - if attempt >= max_reconnection_attempts { - log::error!( - "{}: Max reconnection attempts reached", - subscription_name - ); - break 'outer Err(crate::SdkError::MaxReconnectionAttemptsReached); - } - tokio::time::sleep(base_delay).await; + log::error!("{subscription_name}: Failed to subscribe to account stream: {err:?}, retrying"); continue; } }; log::debug!(target: LOG_TARGET, "account subscribed: {subscription_name}-{pubkey:?}"); - - match pubsub - .account_subscribe(&pubkey, Some(account_config.clone())) - .await - { - Ok((mut account_updates, account_unsubscribe)) => loop { - attempt = 0; - tokio::select! { - biased; - message = account_updates.next() => { - match message { - Some(message) => { - let slot = message.context.slot; - if slot >= latest_slot { - latest_slot = slot; - if let Some(data) = message.value.data.decode() { - let account_update = AccountUpdate { - owner: Pubkey::from_str(&message.value.owner).unwrap(), - lamports: message.value.lamports, - pubkey, - data, - slot, - }; - handler_fn(&account_update); - } + loop { + tokio::select! { + biased; + message = account_updates.next() => { + match message { + Some(message) => { + let slot = message.context.slot; + if slot >= latest_slot { + latest_slot = slot; + if let Some(data) = message.value.data.decode() { + let account_update = AccountUpdate { + owner: Pubkey::from_str(&message.value.owner).unwrap(), + lamports: message.value.lamports, + pubkey, + data, + slot, + }; + handler_fn(&account_update); } } - None => { - log::warn!("{}: Account stream interrupted", subscription_name); - account_unsubscribe().await; - break; - } } - } - _ = &mut unsub_rx => { - log::debug!(target: LOG_TARGET, "{}: Unsubscribing from account stream: {pubkey:?}", subscription_name); - account_unsubscribe().await; - break 'outer Ok(()); + None => { + log::warn!("{}: Account stream interrupted", subscription_name); + account_unsubscribe().await; + break; + } } } - }, - Err(_) => { - log::error!( - "{}: Failed to subscribe to account stream, retrying", - subscription_name - ); - attempt += 1; - if attempt >= max_reconnection_attempts { - log::error!("Max reconnection attempts reached."); - break 'outer Err(crate::SdkError::MaxReconnectionAttemptsReached); + _ = &mut unsub_rx => { + log::debug!(target: LOG_TARGET, "{}: Unsubscribing from account stream: {pubkey:?}", subscription_name); + account_unsubscribe().await; + break 'outer; } } } - - if attempt >= max_reconnection_attempts { - log::error!("{}: Max reconnection attempts reached", subscription_name); - break 'outer Err(crate::SdkError::MaxReconnectionAttemptsReached); - } - - let delay_duration = base_delay * 2_u32.pow(attempt); - log::warn!( - "{}: reconnecting in {:?}", - subscription_name, - delay_duration - ); - tokio::time::sleep(delay_duration).await; - attempt += 1; - }; - - if let Err(err) = exit_status { - log::warn!( - "account subscriber failed ({subscription_name}-{pubkey:?}): {err:?}" - ); } } });