From ecec40702f5545917291b2f111ed8c2eddeb090f Mon Sep 17 00:00:00 2001 From: Alexey Pashinov Date: Wed, 13 Dec 2023 12:23:12 +0100 Subject: [PATCH] Add ws producer to stream updated accounts --- Cargo.lock | 73 ++++++++++++++++++++++++++++++++++++ node/src/subscriber.rs | 1 + server/Cargo.toml | 5 ++- server/src/lib.rs | 10 ++++- server/src/server.rs | 2 + server/src/ws.rs | 84 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 server/src/ws.rs diff --git a/Cargo.lock b/Cargo.lock index def6e44..7aec1bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -157,6 +157,7 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", + "base64 0.21.3", "bitflags 1.3.2", "bytes", "futures-util", @@ -174,8 +175,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -252,6 +255,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a32fd6af2b5827bce66c29053ba0e7c42b9dcab01835835058558c10851a46b" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bindgen" version = "0.65.1" @@ -988,10 +1000,12 @@ dependencies = [ "axum", "axum-jrpc", "base64 0.13.1", + "bincode", "broxus-util", "bytes", "everscale-rpc-models", "fdlimit", + "futures-util", "hex", "humantime", "nekoton-abi", @@ -1010,6 +1024,7 @@ dependencies = [ "tower", "tower-http", "tracing", + "uuid", "weedb", ] @@ -2813,6 +2828,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.9.9" @@ -3206,6 +3232,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.8" @@ -3540,6 +3578,25 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.16.0" @@ -3601,6 +3658,22 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + +[[package]] +name = "uuid" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" +dependencies = [ + "getrandom 0.2.10", + "serde", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/node/src/subscriber.rs b/node/src/subscriber.rs index 5b867b1..02c7b2d 100644 --- a/node/src/subscriber.rs +++ b/node/src/subscriber.rs @@ -20,6 +20,7 @@ impl ton_indexer::Subscriber for EngineSubscriber { async fn process_block(&self, ctx: ProcessBlockContext<'_>) -> Result<()> { self.rpc_state .process_block(ctx.block_stuff(), ctx.shard_state_stuff()) + .await .context("Failed to update server state") } diff --git a/server/Cargo.toml b/server/Cargo.toml index 25f4603..6658a7c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -6,12 +6,14 @@ edition = "2021" [dependencies] anyhow = "1.0" arc-swap = { version = "1.6", features = ["weak"] } -axum = { version = "0.6.0", features = ["http2"] } +axum = { version = "0.6.0", features = ["http2", "ws"] } axum-jrpc = { version = "0.5.1", features = ["serde_json"] } base64 = "0.13" +bincode = "1.3" broxus-util = { version = "0.2", default-features = false } bytes = "1.4.0" fdlimit = "0.2.1" +futures-util = "0.3" hex = "0.4" humantime = "2.1" num_cpus = "1.13.1" @@ -25,6 +27,7 @@ tokio = { version = "1", features = ["sync"] } tower = "0.4.12" tower-http = { version = "0.4.0", features = ["cors", "timeout"] } tracing = "0.1.37" +uuid = { version = "1.6", features = ["v4", "serde"] } weedb = { version = "0.1", features = ["zstd", "lz4", "jemalloc"] } nekoton-abi = { git = "https://github.com/broxus/nekoton.git", default-features = false } diff --git a/server/src/lib.rs b/server/src/lib.rs index 25c5786..fbd1cae 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -20,6 +20,7 @@ mod proto; mod server; mod storage; mod utils; +mod ws; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { @@ -91,6 +92,7 @@ impl ApiConfig { pub struct RpcState { config: Config, engine: ArcSwapWeak, + ws_producer: ws::WsProducer, runtime_storage: RuntimeStorage, persistent_storage: Option, jrpc_counters: Counters, @@ -108,6 +110,7 @@ impl RpcState { config, engine: Default::default(), runtime_storage: Default::default(), + ws_producer: Default::default(), persistent_storage, jrpc_counters: Default::default(), proto_counters: Default::default(), @@ -215,7 +218,7 @@ impl RpcState { Ok(()) } - pub fn process_block( + pub async fn process_block( &self, block_stuff: &BlockStuff, shard_state: Option<&ShardStateStuff>, @@ -227,9 +230,10 @@ impl RpcState { block_info, shard_state, ) + .await } - pub fn process_block_parts( + pub async fn process_block_parts( &self, block_id: &ton_block::BlockIdExt, block: &ton_block::Block, @@ -249,6 +253,8 @@ impl RpcState { storage.update(block_id, block, shard_state)?; } + self.ws_producer.handle_block(block).await?; + Ok(()) } diff --git a/server/src/server.rs b/server/src/server.rs index 62c6f1e..43d7d82 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -12,6 +12,7 @@ use axum::RequestExt; use crate::jrpc; use crate::proto; +use crate::ws; use crate::RpcState; pub struct Server { @@ -60,6 +61,7 @@ impl Server { let router = axum::Router::new() .route("/", get(health_check)) .route("/", post(common_route)) + .route("/ws", get(ws::ws_router)) .route("/rpc", post(jrpc::jrpc_router)) .route("/proto", post(proto::proto_router)) .layer(service) diff --git a/server/src/ws.rs b/server/src/ws.rs new file mode 100644 index 0000000..915889f --- /dev/null +++ b/server/src/ws.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; + +use anyhow::Result; +use axum::extract::ws::{Message, WebSocket}; +use axum::extract::{Query, State, WebSocketUpgrade}; +use futures_util::stream::SplitSink; +use futures_util::{SinkExt, StreamExt}; +use rustc_hash::FxHashMap; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; +use ton_block::{Deserializable, HashmapAugType}; +use ton_types::HashmapType; + +use crate::server::Server; + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WebSocketUpgradeQuery { + client_id: uuid::Uuid, +} + +pub async fn ws_router( + ws: WebSocketUpgrade, + State(ctx): State>, + Query(query): Query, +) -> axum::response::Response { + ws.on_upgrade(move |socket| handle_socket(query.client_id, ctx, socket)) +} + +async fn handle_socket(client_id: uuid::Uuid, state: Arc, socket: WebSocket) { + let (sender, mut receiver) = socket.split(); + + let clients = &state.state().ws_producer.clients; + clients.lock().await.insert(client_id, sender); + + while let Some(msg) = receiver.next().await { + match msg { + Ok(Message::Close(_)) | Err(_) => { + tracing::warn!(%client_id, "websocket connection closed"); + clients.lock().await.remove(&client_id); + return; + } + Ok(_) => {} + } + } +} + +#[derive(Default)] +pub struct WsProducer { + clients: Mutex>>, +} + +impl WsProducer { + pub async fn handle_block(&self, block: &ton_block::Block) -> Result<()> { + let extra = block.read_extra()?; + let account_blocks = extra.read_account_blocks()?; + + let mut accounts = FxHashMap::default(); + account_blocks.iterate_with_keys(|account, value| { + let mut lt = 0; + value.transactions().iterate_slices(|_, mut value| { + let tx_cell = value.checked_drain_reference()?; + let tx = ton_block::Transaction::construct_from_cell(tx_cell)?; + if lt < tx.logical_time() { + lt = tx.logical_time(); + } + + Ok(true) + })?; + accounts.insert(account.into_vec(), lt); + + Ok(true) + })?; + + let message = bincode::serialize(&accounts)?; + let mut clients = self.clients.lock().await; + for (_, client) in clients.iter_mut() { + let message = axum::extract::ws::Message::Binary(message.clone()); + client.send(message).await?; + } + + Ok(()) + } +}