diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 1e31771ff..b9336c301 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -1,4 +1,4 @@ -on: [push, pull_request] +on: [ push, pull_request ] name: master @@ -22,73 +22,6 @@ jobs: with: command: check - check-protos: - name: Check protos - runs-on: ubuntu-latest - steps: - - name: Checkout sources - uses: actions/checkout@v2 - - - name: Install stable toolchain - uses: actions-rs/toolchain@v1 - with: - profile: minimal - toolchain: stable - override: true - - - name: Install protobuf - run: sudo apt update && sudo apt-get -y install protobuf-compiler - - - name: Generate Rust code from .proto files - run: cargo run -p gen-protos - - - name: Check for uncommitted changes - run: git diff --exit-code - - test: - name: Test Suite - runs-on: ubuntu-latest - steps: - - name: Checkout sources - uses: actions/checkout@v2 - - - name: Install stable toolchain - uses: actions-rs/toolchain@v1 - with: - profile: minimal - toolchain: stable - override: true - - - name: Run cargo test for nekoton - uses: actions-rs/cargo@v1 - with: - command: test - args: --all-features - - - name: Run cargo test for nekoton-abi - uses: actions-rs/cargo@v1 - with: - command: test - args: --all-features --all-targets -p nekoton-abi - - - name: Run cargo test for nekoton-derive - uses: actions-rs/cargo@v1 - with: - command: test - args: --all-features --all-targets -p nekoton-derive - - - name: Run cargo test for nekoton-abi - uses: actions-rs/cargo@v1 - with: - command: test - args: --all-features --all-targets -p nekoton-abi - - - name: Run cargo test for nekoton-utils - uses: actions-rs/cargo@v1 - with: - command: test - args: --all-features --all-targets -p nekoton-utils - lints: name: Lints runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 82397331a..78349886a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,11 +20,13 @@ anyhow = "1.0" async-trait = "0.1" everscale-crypto = { version = "0.2", features = ["tl-proto", "serde"] } everscale-types = { version = "0.1.0-rc.6", features = ["tycho"] } +futures-util = "0.3" parking_lot = { version = "0.12.1" } rand = "0.8" reqwest = { version = "0.12", features = ["json", "gzip", "rustls-tls"], default-features = false } serde = "1.0" serde_json = "1.0" +thiserror = "1.0" tokio = { version = "1", default-features = false } # local deps diff --git a/transport/Cargo.toml b/transport/Cargo.toml index 9c2a4b626..402c4d4ee 100644 --- a/transport/Cargo.toml +++ b/transport/Cargo.toml @@ -12,11 +12,14 @@ license.workspace = true anyhow = { workspace = true } async-trait = { workspace = true } everscale-types = { workspace = true } +futures-util = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } reqwest = { workspace = true } +serde = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } # local deps nekoton-core = { workspace = true } -serde = { version = "1.0.213", features = ["derive"] } +nekoton-utils = { workspace = true } diff --git a/transport/src/endpoint/jrpc.rs b/transport/src/endpoint/jrpc.rs new file mode 100644 index 000000000..7d65ea1cb --- /dev/null +++ b/transport/src/endpoint/jrpc.rs @@ -0,0 +1,62 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use everscale_types::models::*; +use everscale_types::prelude::*; +use nekoton_core::transport::{ContractState, Transport}; +use parking_lot::Mutex; + +use crate::endpoint::Connection; +use crate::models::Timings; +use crate::LiveCheckResult; + +#[derive(Clone)] +pub struct JrpcClient { + client: reqwest::Client, + endpoint: Arc, + was_dead: Arc, + stats: Arc>>, +} + +#[async_trait::async_trait] +impl Connection for JrpcClient { + fn new(endpoint: String, client: reqwest::Client) -> Self { + JrpcClient { + client, + endpoint: Arc::new(endpoint), + was_dead: Arc::new(AtomicBool::new(false)), + stats: Arc::new(Default::default()), + } + } + + fn endpoint(&self) -> &str { + self.endpoint.as_str() + } + + fn get_stats(&self) -> Option { + self.stats.lock().clone() + } + + fn set_stats(&self, stats: Option) { + *self.stats.lock() = stats; + } + + fn update_was_dead(&self, is_dead: bool) { + self.was_dead.store(is_dead, Ordering::Release); + } + + async fn is_alive_inner(&self) -> LiveCheckResult { + todo!() + } +} + +#[async_trait::async_trait] +impl Transport for JrpcClient { + async fn broadcast_message(&self, message: &DynCell) -> anyhow::Result<()> { + todo!() + } + + async fn get_contract_state(&self, address: &StdAddr) -> anyhow::Result { + todo!() + } +} diff --git a/transport/src/endpoint/mod.rs b/transport/src/endpoint/mod.rs new file mode 100644 index 000000000..b7a22e547 --- /dev/null +++ b/transport/src/endpoint/mod.rs @@ -0,0 +1,132 @@ +use crate::models::Timings; +use crate::LiveCheckResult; +use everscale_types::cell::DynCell; +use everscale_types::models::StdAddr; +use nekoton_core::transport::{ContractState, Transport}; + +mod jrpc; + +#[derive(Clone)] +pub enum Endpoint { + Jrpc(jrpc::JrpcClient), +} + +#[async_trait::async_trait] +impl Transport for Endpoint { + async fn broadcast_message(&self, message: &DynCell) -> anyhow::Result<()> { + match &self { + Endpoint::Jrpc(client) => client.broadcast_message(message).await, + } + } + + async fn get_contract_state(&self, address: &StdAddr) -> anyhow::Result { + match &self { + Endpoint::Jrpc(client) => client.get_contract_state(address).await, + } + } +} + +impl Eq for Endpoint {} + +impl PartialEq for Endpoint { + fn eq(&self, other: &Self) -> bool { + self.endpoint() == other.endpoint() + } +} + +impl PartialOrd for Endpoint { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Endpoint { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + if self.eq(other) { + std::cmp::Ordering::Equal + } else { + let left_stats = self.get_stats(); + let right_stats = other.get_stats(); + + match (left_stats, right_stats) { + (Some(left_stats), Some(right_stats)) => left_stats.cmp(&right_stats), + (None, Some(_)) => std::cmp::Ordering::Less, + (Some(_), None) => std::cmp::Ordering::Greater, + (None, None) => std::cmp::Ordering::Equal, + } + } + } +} + +#[async_trait::async_trait] +pub trait Connection: Send + Sync { + fn new(endpoint: String, client: reqwest::Client) -> Self; + + async fn is_alive(&self) -> bool { + let check_result = self.is_alive_inner().await; + let is_alive = check_result.as_bool(); + self.update_was_dead(!is_alive); + + match check_result { + LiveCheckResult::Live(stats) => self.set_stats(Some(stats)), + LiveCheckResult::Dummy => {} + LiveCheckResult::Dead => {} + } + + is_alive + } + + fn endpoint(&self) -> &str; + + fn get_stats(&self) -> Option; + + fn set_stats(&self, stats: Option); + + fn update_was_dead(&self, is_dead: bool); + + async fn is_alive_inner(&self) -> LiveCheckResult; +} + +#[async_trait::async_trait] +impl Connection for Endpoint { + fn new(endpoint: String, client: reqwest::Client) -> Self { + // TODO: parse url to determine type of connection + Self::Jrpc(jrpc::JrpcClient::new(endpoint, client)) + } + + async fn is_alive(&self) -> bool { + match &self { + Endpoint::Jrpc(client) => client.is_alive().await, + } + } + + fn endpoint(&self) -> &str { + match &self { + Endpoint::Jrpc(client) => client.endpoint(), + } + } + + fn get_stats(&self) -> Option { + match &self { + Endpoint::Jrpc(client) => client.get_stats(), + } + } + + fn set_stats(&self, stats: Option) { + match &self { + Endpoint::Jrpc(client) => client.set_stats(stats), + } + } + + fn update_was_dead(&self, is_dead: bool) { + match &self { + Endpoint::Jrpc(client) => client.update_was_dead(is_dead), + } + } + + async fn is_alive_inner(&self) -> LiveCheckResult { + match &self { + Endpoint::Jrpc(client) => client.is_alive_inner().await, + } + } +} diff --git a/transport/src/jrpc.rs b/transport/src/jrpc.rs deleted file mode 100644 index 06bef1fb7..000000000 --- a/transport/src/jrpc.rs +++ /dev/null @@ -1,19 +0,0 @@ -use everscale_types::models::*; -use everscale_types::prelude::*; -use nekoton_core::transport::{ContractState, Transport}; - -#[derive(Clone)] -pub struct JrpcClient { - client: reqwest::Client, -} - -#[async_trait::async_trait] -impl Transport for JrpcClient { - async fn broadcast_message(&self, message: &DynCell) -> anyhow::Result<()> { - todo!() - } - - async fn get_contract_state(&self, address: &StdAddr) -> anyhow::Result { - todo!() - } -} diff --git a/transport/src/lib.rs b/transport/src/lib.rs index 003e82760..a8ae232c4 100644 --- a/transport/src/lib.rs +++ b/transport/src/lib.rs @@ -1,34 +1,22 @@ -use crate::jrpc::JrpcClient; +use std::future::Future; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; + use everscale_types::cell::DynCell; use everscale_types::models::StdAddr; +use futures_util::StreamExt; use nekoton_core::transport::{ContractState, Transport}; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; -use std::time::Duration; -mod jrpc; +use crate::endpoint::{Connection, Endpoint}; +use crate::models::Timings; + +mod endpoint; mod models; mod utils; -#[derive(Clone)] -pub enum Endpoint { - Jrpc(JrpcClient), -} - -#[async_trait::async_trait] -impl Transport for Endpoint { - async fn broadcast_message(&self, message: &DynCell) -> anyhow::Result<()> { - match &self { - Endpoint::Jrpc(client) => client.broadcast_message(message).await, - } - } - - async fn get_contract_state(&self, address: &StdAddr) -> anyhow::Result { - match &self { - Endpoint::Jrpc(client) => client.get_contract_state(address).await, - } - } -} +static ROUND_ROBIN_COUNTER: AtomicUsize = AtomicUsize::new(0); pub struct TransportImpl { endpoints: Vec, @@ -38,20 +26,89 @@ pub struct TransportImpl { impl TransportImpl { async fn get_client(&self) -> Option { - todo!() + for _ in 0..self.endpoints.len() { + let client = { + let live_endpoints = self.live_endpoints.read(); + self.options.choose_strategy.choose(&live_endpoints) + }; + + if client.is_some() { + return client; + } else { + tokio::time::sleep(self.options.aggressive_poll_interval).await; + } + } + + None + } + + async fn with_retries(&self, f: F) -> anyhow::Result + where + F: Fn(Endpoint) -> Fut, + Fut: Future>, + { + const NUM_RETRIES: usize = 10; + + for tries in 0..NUM_RETRIES { + let client = self + .get_client() + .await + .ok_or(TransportError::NoEndpointsAvailable)?; + + // TODO: lifetimes to avoid of cloning? + match f(client.clone()).await { + Ok(result) => return Ok(result), + Err(e) => { + if tries == NUM_RETRIES - 1 { + return Err(e); + } + + self.remove_endpoint(client.endpoint()); + + tokio::time::sleep(self.options.aggressive_poll_interval).await; + } + } + } + + unreachable!() + } + + async fn update_endpoints(&self) -> usize { + let mut futures = futures_util::stream::FuturesUnordered::new(); + for endpoint in &self.endpoints { + futures.push(async move { endpoint.is_alive().await.then(|| endpoint.clone()) }); + } + + let mut new_endpoints = Vec::with_capacity(self.endpoints.len()); + while let Some(endpoint) = futures.next().await { + new_endpoints.extend(endpoint); + } + + let mut old_endpoints = self.live_endpoints.write(); + + *old_endpoints = new_endpoints; + old_endpoints.len() + } + + fn remove_endpoint(&self, endpoint: &str) { + self.live_endpoints + .write() + .retain(|c| c.endpoint() != endpoint); } } #[async_trait::async_trait] impl Transport for TransportImpl { + // TODO: avoid of additional Future created by async move { ... } + async fn broadcast_message(&self, message: &DynCell) -> anyhow::Result<()> { - let client = self.get_client().await.unwrap(); - client.broadcast_message(message).await + self.with_retries(|client| async move { client.broadcast_message(message).await }) + .await } async fn get_contract_state(&self, address: &StdAddr) -> anyhow::Result { - let client = self.get_client().await.unwrap(); - client.get_contract_state(address).await + self.with_retries(|client| async move { client.get_contract_state(address).await }) + .await } } @@ -70,6 +127,8 @@ pub struct ClientOptions { /// /// Default: `1 sec` pub aggressive_poll_interval: Duration, + + pub choose_strategy: ChooseStrategy, } impl Default for ClientOptions { @@ -78,6 +137,58 @@ impl Default for ClientOptions { probe_interval: Duration::from_secs(1), request_timeout: Duration::from_secs(3), aggressive_poll_interval: Duration::from_secs(1), + choose_strategy: ChooseStrategy::Random, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Copy)] +pub enum ChooseStrategy { + Random, + RoundRobin, + /// Choose the endpoint with the lowest latency + TimeBased, +} + +impl ChooseStrategy { + fn choose(&self, endpoints: &[Endpoint]) -> Option { + use rand::prelude::SliceRandom; + + match self { + ChooseStrategy::Random => endpoints.choose(&mut rand::thread_rng()).cloned(), + ChooseStrategy::RoundRobin => { + let index = ROUND_ROBIN_COUNTER.fetch_add(1, Ordering::Release); + + endpoints.get(index % endpoints.len()).cloned() + } + ChooseStrategy::TimeBased => endpoints + .iter() + .min_by(|&left, &right| left.cmp(right)) + .cloned(), } } } + +pub enum LiveCheckResult { + /// GetTimings request was successful + Live(Timings), + /// Keyblock request was successful, but getTimings failed + Dummy, + Dead, +} + +impl LiveCheckResult { + fn as_bool(&self) -> bool { + match self { + LiveCheckResult::Live(metrics) => metrics.is_reliable(), + LiveCheckResult::Dummy => true, + LiveCheckResult::Dead => false, + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum TransportError { + #[error("No endpoint available")] + NoEndpointsAvailable, +} diff --git a/transport/src/models.rs b/transport/src/models.rs index 8b1378917..bec906073 100644 --- a/transport/src/models.rs +++ b/transport/src/models.rs @@ -1 +1,41 @@ +use nekoton_utils::time::now_sec_u64; +const MC_ACCEPTABLE_TIME_DIFF: u64 = 120; +const ACCEPTABLE_BLOCKS_DIFF: u32 = 500; +const ACCEPTABLE_NODE_BLOCK_INSERT_TIME: u64 = 240; + +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub struct Timings { + pub last_mc_block_seqno: u32, + pub last_mc_utime: u32, + pub mc_time_diff: i64, + pub smallest_known_lt: Option, +} + +impl Timings { + pub fn is_reliable(&self) -> bool { + // just booted up + if self == &Self::default() { + return false; + } + + let acceptable_time = (now_sec_u64() - ACCEPTABLE_NODE_BLOCK_INSERT_TIME) as u32; + + // TODO: clarify how is correct + self.mc_time_diff.unsigned_abs() < MC_ACCEPTABLE_TIME_DIFF + && self.last_mc_block_seqno < ACCEPTABLE_BLOCKS_DIFF + && self.last_mc_utime > acceptable_time + } +} + +impl PartialOrd for Timings { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Timings { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.mc_time_diff.cmp(&other.mc_time_diff) + } +} diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 4d40b33a6..787fb5e0e 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -1 +1,3 @@ pub mod serde_helpers; +pub mod time; +pub mod traits; diff --git a/utils/src/time.rs b/utils/src/time.rs new file mode 100644 index 000000000..806b851d8 --- /dev/null +++ b/utils/src/time.rs @@ -0,0 +1,8 @@ +pub fn now_sec_u64() -> u64 { + use crate::traits::TrustMe; + use std::time::SystemTime; + + (SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)) + .trust_me() + .as_secs() +} diff --git a/utils/src/traits.rs b/utils/src/traits.rs new file mode 100644 index 000000000..90be10d55 --- /dev/null +++ b/utils/src/traits.rs @@ -0,0 +1,21 @@ +pub trait TrustMe: Sized { + #[track_caller] + fn trust_me(self) -> T; +} + +impl TrustMe for Result +where + E: std::fmt::Debug, +{ + #[track_caller] + fn trust_me(self) -> T { + self.expect("Shouldn't fail") + } +} + +impl TrustMe for Option { + #[track_caller] + fn trust_me(self) -> T { + self.expect("Shouldn't fail") + } +}