Skip to content

Commit

Permalink
Merge pull request #14 from broxus/feature/accounts_producer
Browse files Browse the repository at this point in the history
  • Loading branch information
0xdeafbeef authored Feb 12, 2024
2 parents 06eb6cf + 86eb606 commit fb26aa7
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 97 deletions.
475 changes: 382 additions & 93 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ everscale-rpc-models = { path = "../models" }
ed25519-dalek = "1.0.1"
hex = "0.4.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
nekoton = { git = "https://github.com/broxus/nekoton.git", default-features = true }

[features]
default = ["log"]
Expand Down
2 changes: 1 addition & 1 deletion client/examples/multisig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn main() {
bounce: false,
destination: to,
amount: 1_000_000_000,
/// can be built with `nekoton_abi::MessageBuilder`
// can be built with `nekoton_abi::MessageBuilder`
body: None,
state_init: None,
},
Expand Down
1 change: 1 addition & 0 deletions node/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl ton_indexer::Subscriber for EngineSubscriber {
async fn process_block(&self, ctx: ProcessBlockContext<'_>) -> Result<()> {
self.rpc_state
.process_block(ctx.block_stuff(), ctx.shard_state_stuff())
.await
.context("Failed to update server state")
}

Expand Down
5 changes: 4 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ edition = "2021"
[dependencies]
anyhow = "1.0"
arc-swap = { version = "1.6", features = ["weak"] }
axum = { version = "0.6.0", features = ["http2"] }
axum = { version = "0.6.0", features = ["http2", "ws"] }
axum-jrpc = { version = "0.5.1", features = ["serde_json"] }
base64 = "0.13"
bincode = "1.3"
broxus-util = { version = "0.2", default-features = false }
bytes = "1.4.0"
fdlimit = "0.2.1"
futures-util = "0.3"
hex = "0.4"
humantime = "2.1"
num_cpus = "1.13.1"
Expand All @@ -25,6 +27,7 @@ tower = "0.4.12"
tower-http = { version = "0.4.0", features = ["cors", "timeout"] }
tracing = "0.1.37"
metrics = "0.22.0"
uuid = { version = "1.6", features = ["v4", "serde"] }
weedb = { version = "0.1", features = ["zstd", "lz4", "jemalloc"] }

nekoton-abi = { git = "https://github.com/broxus/nekoton.git", default-features = false }
Expand Down
12 changes: 10 additions & 2 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod proto;
mod server;
mod storage;
mod utils;
mod ws;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -90,6 +91,7 @@ impl ApiConfig {
pub struct RpcState {
config: Config,
engine: ArcSwapWeak<ton_indexer::Engine>,
ws_producer: ws::WsProducer,
runtime_storage: RuntimeStorage,
persistent_storage: Option<PersistentStorage>,
jrpc_counters: Counters,
Expand All @@ -107,6 +109,7 @@ impl RpcState {
config,
engine: Default::default(),
runtime_storage: Default::default(),
ws_producer: Default::default(),
persistent_storage,
jrpc_counters: Counters::named("jrpc"),
proto_counters: Counters::named("proto"),
Expand Down Expand Up @@ -206,7 +209,7 @@ impl RpcState {
Ok(())
}

pub fn process_block(
pub async fn process_block(
&self,
block_stuff: &BlockStuff,
shard_state: Option<&ShardStateStuff>,
Expand All @@ -218,9 +221,10 @@ impl RpcState {
block_info,
shard_state,
)
.await
}

pub fn process_block_parts(
pub async fn process_block_parts(
&self,
block_id: &ton_block::BlockIdExt,
block: &ton_block::Block,
Expand All @@ -240,6 +244,10 @@ impl RpcState {
storage.update(block_id, block, shard_state)?;
}

self.ws_producer
.handle_block(block_id.shard_id.workchain_id(), block)
.await?;

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use axum::RequestExt;

use crate::jrpc;
use crate::proto;
use crate::ws;
use crate::RpcState;

pub struct Server {
Expand Down Expand Up @@ -60,6 +61,7 @@ impl Server {
let router = axum::Router::new()
.route("/", get(health_check))
.route("/", post(common_route))
.route("/ws", get(ws::ws_router))
.route("/rpc", post(jrpc::jrpc_router))
.route("/proto", post(proto::proto_router))
.layer(service)
Expand Down
132 changes: 132 additions & 0 deletions server/src/ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::sync::Arc;

use anyhow::Result;
use axum::extract::ws::{Message, WebSocket};
use axum::extract::{Query, State, WebSocketUpgrade};
use futures_util::stream::SplitSink;
use futures_util::{SinkExt, StreamExt};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Mutex};
use ton_block::{Deserializable, HashmapAugType, MsgAddressInt};
use ton_types::{AccountId, HashmapType};

use crate::server::Server;

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WebSocketUpgradeQuery {
client_id: uuid::Uuid,
}

pub async fn ws_router(
ws: WebSocketUpgrade,
State(ctx): State<Arc<Server>>,
Query(query): Query<WebSocketUpgradeQuery>,
) -> axum::response::Response {
ws.on_upgrade(move |socket| handle_socket(query.client_id, ctx, socket))
}

async fn handle_socket(client_id: uuid::Uuid, state: Arc<Server>, socket: WebSocket) {
const BUFFER_SIZE: usize = 100;
let (tx, rx) = mpsc::channel(BUFFER_SIZE);

let clients = &state.state().ws_producer.clients;
clients.lock().await.insert(client_id, tx);

let (sender, mut receiver) = socket.split();

start_listening_ws_events(client_id, sender, rx);

while let Some(msg) = receiver.next().await {
match msg {
Ok(Message::Close(_)) | Err(_) => {
tracing::warn!(%client_id, "websocket connection closed");
clients.lock().await.remove(&client_id);
break;
}
Ok(_) => {}
}
}
}

#[derive(Default)]
pub struct WsProducer {
clients: Mutex<FxHashMap<uuid::Uuid, mpsc::Sender<Message>>>,
}

impl WsProducer {
pub async fn handle_block(&self, workchain_id: i32, block: &ton_block::Block) -> Result<()> {
let extra = block.read_extra()?;
let account_blocks = extra.read_account_blocks()?;

let mut accounts = Vec::with_capacity(account_blocks.len()?);
account_blocks.iterate_with_keys(|account, value| {
let mut lt = 0;
value.transactions().iterate_slices(|_, mut value| {
let tx_cell = value.checked_drain_reference()?;
let tx = ton_block::Transaction::construct_from_cell(tx_cell)?;
if lt < tx.logical_time() {
lt = tx.logical_time();
}

Ok(true)
})?;

let address =
MsgAddressInt::with_standart(None, workchain_id as i8, AccountId::from(account))?;

accounts.push(AccountInfo {
account: nekoton_proto::utils::addr_to_bytes(&address).to_vec(),
account_lt: lt,
});

Ok(true)
})?;

let message = bincode::serialize(&accounts)?;

let mut clients = self.clients.lock().await;
clients.retain(|client_id, client_tx| {
let message = Message::Binary(message.clone());
match client_tx.try_send(message) {
Ok(_) => true,
Err(e) => {
tracing::error!(%client_id, "failed to send ws message to channel: {e}");
false
}
}
});

Ok(())
}
}

fn start_listening_ws_events(
client_id: uuid::Uuid,
mut ws_sender: SplitSink<WebSocket, Message>,
mut events_rx: mpsc::Receiver<Message>,
) {
tokio::spawn(async move {
while let Some(message) = events_rx.recv().await {
if let Err(e) = ws_sender.send(message).await {
tracing::error!(%client_id, "failed to send message to ws client: {e}");
}
}

if let Err(e) = ws_sender.close().await {
tracing::error!(%client_id, "failed to close ws connection: {e}")
}

tracing::warn!(%client_id, "stopped listening for ws events");

events_rx.close();
while events_rx.recv().await.is_some() {}
});
}

#[derive(Clone, Serialize, Deserialize)]
struct AccountInfo {
pub account: Vec<u8>,
pub account_lt: u64,
}

0 comments on commit fb26aa7

Please sign in to comment.