Skip to content

Commit

Permalink
feat: add sequence number to ws messages
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov committed Jul 9, 2024
1 parent ca38865 commit b9c4ef7
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions server/src/ws.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use anyhow::Result;
Expand Down Expand Up @@ -32,7 +33,10 @@ async fn handle_socket(client_id: uuid::Uuid, state: Arc<Server>, socket: WebSoc
let (tx, rx) = mpsc::channel(BUFFER_SIZE);

let clients = &state.state().ws_producer.clients;
clients.lock().await.insert(client_id, tx);
clients
.lock()
.await
.insert(client_id, (tx, AtomicUsize::default()));

let (sender, mut receiver) = socket.split();

Expand All @@ -52,7 +56,7 @@ async fn handle_socket(client_id: uuid::Uuid, state: Arc<Server>, socket: WebSoc

#[derive(Default)]
pub struct WsProducer {
clients: Mutex<FxHashMap<uuid::Uuid, mpsc::Sender<Message>>>,
clients: Mutex<FxHashMap<uuid::Uuid, (mpsc::Sender<Message>, AtomicUsize)>>,
}

impl WsProducer {
Expand Down Expand Up @@ -84,10 +88,22 @@ impl WsProducer {
Ok(true)
})?;

let message = bincode::serialize(&accounts)?;

let mut clients = self.clients.lock().await;
clients.retain(|client_id, client_tx| {
clients.retain(|client_id, (client_tx, msg_seqno)| {
let seqno = msg_seqno.load(Ordering::Acquire);
let message = match bincode::serialize(&WsMessage {
seqno,
payload: accounts.clone(),
}) {
Ok(message) => message,
Err(e) => {
tracing::error!(%client_id, "failed to serialize ws message: {e}");
return false;
}
};

msg_seqno.store(seqno + 1, Ordering::Release);

let message = Message::Binary(message.clone());
match client_tx.try_send(message) {
Ok(_) => true,
Expand Down Expand Up @@ -130,3 +146,9 @@ struct AccountInfo {
pub account: Vec<u8>,
pub account_lt: u64,
}

#[derive(Clone, Serialize, Deserialize)]
struct WsMessage {
seqno: usize,
payload: Vec<AccountInfo>,
}

0 comments on commit b9c4ef7

Please sign in to comment.