Skip to content

Commit

Permalink
Add ws producer to stream updated accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov committed Dec 13, 2023
1 parent 3f2321c commit ecec407
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 3 deletions.
73 changes: 73 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl ton_indexer::Subscriber for EngineSubscriber {
async fn process_block(&self, ctx: ProcessBlockContext<'_>) -> Result<()> {
self.rpc_state
.process_block(ctx.block_stuff(), ctx.shard_state_stuff())
.await
.context("Failed to update server state")
}

Expand Down
5 changes: 4 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ edition = "2021"
[dependencies]
anyhow = "1.0"
arc-swap = { version = "1.6", features = ["weak"] }
axum = { version = "0.6.0", features = ["http2"] }
axum = { version = "0.6.0", features = ["http2", "ws"] }
axum-jrpc = { version = "0.5.1", features = ["serde_json"] }
base64 = "0.13"
bincode = "1.3"
broxus-util = { version = "0.2", default-features = false }
bytes = "1.4.0"
fdlimit = "0.2.1"
futures-util = "0.3"
hex = "0.4"
humantime = "2.1"
num_cpus = "1.13.1"
Expand All @@ -25,6 +27,7 @@ tokio = { version = "1", features = ["sync"] }
tower = "0.4.12"
tower-http = { version = "0.4.0", features = ["cors", "timeout"] }
tracing = "0.1.37"
uuid = { version = "1.6", features = ["v4", "serde"] }
weedb = { version = "0.1", features = ["zstd", "lz4", "jemalloc"] }

nekoton-abi = { git = "https://github.com/broxus/nekoton.git", default-features = false }
Expand Down
10 changes: 8 additions & 2 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod proto;
mod server;
mod storage;
mod utils;
mod ws;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -91,6 +92,7 @@ impl ApiConfig {
pub struct RpcState {
config: Config,
engine: ArcSwapWeak<ton_indexer::Engine>,
ws_producer: ws::WsProducer,
runtime_storage: RuntimeStorage,
persistent_storage: Option<PersistentStorage>,
jrpc_counters: Counters,
Expand All @@ -108,6 +110,7 @@ impl RpcState {
config,
engine: Default::default(),
runtime_storage: Default::default(),
ws_producer: Default::default(),
persistent_storage,
jrpc_counters: Default::default(),
proto_counters: Default::default(),
Expand Down Expand Up @@ -215,7 +218,7 @@ impl RpcState {
Ok(())
}

pub fn process_block(
pub async fn process_block(
&self,
block_stuff: &BlockStuff,
shard_state: Option<&ShardStateStuff>,
Expand All @@ -227,9 +230,10 @@ impl RpcState {
block_info,
shard_state,
)
.await
}

pub fn process_block_parts(
pub async fn process_block_parts(
&self,
block_id: &ton_block::BlockIdExt,
block: &ton_block::Block,
Expand All @@ -249,6 +253,8 @@ impl RpcState {
storage.update(block_id, block, shard_state)?;
}

self.ws_producer.handle_block(block).await?;

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use axum::RequestExt;

use crate::jrpc;
use crate::proto;
use crate::ws;
use crate::RpcState;

pub struct Server {
Expand Down Expand Up @@ -60,6 +61,7 @@ impl Server {
let router = axum::Router::new()
.route("/", get(health_check))
.route("/", post(common_route))
.route("/ws", get(ws::ws_router))
.route("/rpc", post(jrpc::jrpc_router))
.route("/proto", post(proto::proto_router))
.layer(service)
Expand Down
84 changes: 84 additions & 0 deletions server/src/ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::sync::Arc;

use anyhow::Result;
use axum::extract::ws::{Message, WebSocket};
use axum::extract::{Query, State, WebSocketUpgrade};
use futures_util::stream::SplitSink;
use futures_util::{SinkExt, StreamExt};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use ton_block::{Deserializable, HashmapAugType};
use ton_types::HashmapType;

use crate::server::Server;

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WebSocketUpgradeQuery {
client_id: uuid::Uuid,
}

pub async fn ws_router(
ws: WebSocketUpgrade,
State(ctx): State<Arc<Server>>,
Query(query): Query<WebSocketUpgradeQuery>,
) -> axum::response::Response {
ws.on_upgrade(move |socket| handle_socket(query.client_id, ctx, socket))
}

async fn handle_socket(client_id: uuid::Uuid, state: Arc<Server>, socket: WebSocket) {
let (sender, mut receiver) = socket.split();

let clients = &state.state().ws_producer.clients;
clients.lock().await.insert(client_id, sender);

while let Some(msg) = receiver.next().await {
match msg {
Ok(Message::Close(_)) | Err(_) => {
tracing::warn!(%client_id, "websocket connection closed");
clients.lock().await.remove(&client_id);
return;
}
Ok(_) => {}
}
}
}

#[derive(Default)]
pub struct WsProducer {
clients: Mutex<FxHashMap<uuid::Uuid, SplitSink<WebSocket, axum::extract::ws::Message>>>,
}

impl WsProducer {
pub async fn handle_block(&self, block: &ton_block::Block) -> Result<()> {
let extra = block.read_extra()?;
let account_blocks = extra.read_account_blocks()?;

let mut accounts = FxHashMap::default();
account_blocks.iterate_with_keys(|account, value| {
let mut lt = 0;
value.transactions().iterate_slices(|_, mut value| {
let tx_cell = value.checked_drain_reference()?;
let tx = ton_block::Transaction::construct_from_cell(tx_cell)?;
if lt < tx.logical_time() {
lt = tx.logical_time();
}

Ok(true)
})?;
accounts.insert(account.into_vec(), lt);

Ok(true)
})?;

let message = bincode::serialize(&accounts)?;
let mut clients = self.clients.lock().await;
for (_, client) in clients.iter_mut() {
let message = axum::extract::ws::Message::Binary(message.clone());
client.send(message).await?;
}

Ok(())
}
}

0 comments on commit ecec407

Please sign in to comment.