Skip to content

Commit

Permalink
Reproduce and fix streaming issue (#550)
Browse files Browse the repository at this point in the history
## Summary

- Reproduces the streaming issue seen in RN iOS in pure Rust.
- Adds a hacky version of an Authenticator so that we can send publish requests to the dev network without getting auth errors back
- Sets `http2_keep_alive_interval` to 10 seconds to fix the disconnect issue.

## Notes

Issue is only reproducible in the dev network and only with ~15 seconds of inactivity in the stream.
  • Loading branch information
neekolas authored Mar 13, 2024
1 parent 144d214 commit c9b5d6b
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 3 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bindings_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 35 additions & 1 deletion bindings_ffi/src/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,9 @@ mod tests {
use futures::stream;
use xmtp_proto::api_client::{Envelope, Error as ApiError};

use crate::v2::FfiV2Subscription;
use crate::v2::{
create_v2_client, FfiEnvelope, FfiPublishRequest, FfiV2SubscribeRequest, FfiV2Subscription,
};

// Try a query on a test topic, and make sure we get a response
#[tokio::test]
Expand Down Expand Up @@ -577,4 +579,36 @@ mod tests {
let second = stream_handler.next().await;
assert!(second.is_err());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_e2e() {
let client = create_v2_client("http://localhost:5556".to_string(), false)
.await
.unwrap();
let content_topic = "/xmtp/0/foo";

let subscription = client
.subscribe(FfiV2SubscribeRequest {
content_topics: vec![content_topic.to_string()],
})
.await
.unwrap();

client
.publish(
FfiPublishRequest {
envelopes: vec![FfiEnvelope {
content_topic: content_topic.to_string(),
timestamp_ns: 3,
message: vec![1, 2, 3],
}],
},
"".to_string(),
)
.await
.unwrap();

let sub_result = subscription.next().await.unwrap();
assert_eq!(sub_result.content_topic, content_topic.to_string());
}
}
2 changes: 2 additions & 0 deletions xmtp_api_grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ version = "0.1.0"
async-stream = "0.3.5"
base64 = "0.21.0"
futures = "0.3.29"
hex.workspace = true
http-body = "0.4.5"
hyper = "0.14.26"
log = { version = "0.4", features = ["std"] }
Expand All @@ -24,6 +25,7 @@ tonic = { workspace = true, features = [
tower = "0.4.13"
webpki-roots = "0.23.0"
xmtp_proto = { path = "../xmtp_proto", features = ["proto_full", "grpc"] }
xmtp_v2 = { path = "../xmtp_v2" }

[dev-dependencies]
uuid = { version = "1.3.1", features = ["v4"] }
101 changes: 101 additions & 0 deletions xmtp_api_grpc/src/auth_token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use base64::Engine;
use prost::Message;
use xmtp_proto::xmtp::message_api::v1::{AuthData, Token};
use xmtp_proto::xmtp::message_contents::private_key_bundle::Version;
use xmtp_proto::xmtp::message_contents::signature::{EcdsaCompact, Union as SignatureUnion};
use xmtp_proto::xmtp::message_contents::{
private_key::Union as PrivateKeyUnion, PrivateKeyBundle, PrivateKeyBundleV1, PublicKey,
Signature,
};
use xmtp_v2::k256_helper::sign_keccak_256;

fn create_auth_data(wallet_address: String) -> AuthData {
AuthData {
wallet_addr: wallet_address,
created_ns: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_nanos() as u64,
}
}

pub struct Authenticator {
identity_key: PublicKey,
wallet_address: String,
private_key_bytes: Vec<u8>,
}

impl Authenticator {
pub fn new(
identity_key: PublicKey,
wallet_address: String,
private_key_bytes: Vec<u8>,
) -> Self {
Self {
identity_key,
wallet_address,
private_key_bytes,
}
}

pub fn create_token(&self) -> String {
let auth_data = create_auth_data(self.wallet_address.clone());
let mut serialized = Vec::new();
auth_data
.encode(&mut serialized)
.expect("serialization failed");

let signature = self.sign(serialized.as_slice());

let token = Token {
identity_key: Some(self.identity_key.clone()),
auth_data_bytes: serialized,
auth_data_signature: Some(signature),
};
let mut token_bytes = Vec::new();
let _ = token.encode(&mut token_bytes);

let token_base64 = base64::engine::general_purpose::STANDARD.encode(&token_bytes);
token_base64

Check warning on line 59 in xmtp_api_grpc/src/auth_token.rs

View workflow job for this annotation

GitHub Actions / workspace

returning the result of a `let` binding from a block

warning: returning the result of a `let` binding from a block --> xmtp_api_grpc/src/auth_token.rs:59:9 | 58 | let token_base64 = base64::engine::general_purpose::STANDARD.encode(&token_bytes); | ---------------------------------------------------------------------------------- unnecessary `let` binding 59 | token_base64 | ^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#let_and_return = note: `#[warn(clippy::let_and_return)]` on by default help: return the expression directly | 58 ~ 59 ~ base64::engine::general_purpose::STANDARD.encode(&token_bytes) |
}

fn sign(&self, bytes_to_sign: &[u8]) -> Signature {
let (sig, recovery) = sign_keccak_256(self.private_key_bytes.as_slice(), bytes_to_sign)
.expect("signature failed");

Signature {
union: Some(SignatureUnion::EcdsaCompact(EcdsaCompact {
bytes: sig,
recovery: recovery as u32,
})),
}
}

pub fn from_bytes(private_key_bundle_bytes: Vec<u8>, wallet_address: String) -> Self {
let bundle = PrivateKeyBundle::decode(&mut private_key_bundle_bytes.as_slice())
.expect("deserialization");
let identity_key = match bundle.version {
Some(Version::V1(PrivateKeyBundleV1 {
identity_key,
pre_keys: _,
})) => identity_key.unwrap(),
_ => panic!("missing identity key"),
};

let private_key_bytes = match identity_key.union {
Some(PrivateKeyUnion::Secp256k1(inner)) => inner.bytes.clone(),
_ => panic!("missing private key bytes"),
};

Self {
wallet_address,
identity_key: identity_key.public_key.unwrap(),
private_key_bytes,
}
}

pub fn from_hex(private_key_bundle_string: String, wallet_address: String) -> Self {
let decoded_bytes = hex::decode(private_key_bundle_string).unwrap();
Self::from_bytes(decoded_bytes, wallet_address)
}
}
4 changes: 2 additions & 2 deletions xmtp_api_grpc/src/grpc_api_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ async fn create_tls_channel(address: String) -> Result<Channel, Error> {
.map_err(|e| Error::new(ErrorKind::SetupError).with(e))?
.keep_alive_while_idle(true)
.connect_timeout(Duration::from_secs(5))
.http2_keep_alive_interval(Duration::from_secs(3))
.keep_alive_timeout(Duration::from_secs(5))
.http2_keep_alive_interval(Duration::from_secs(10))
.keep_alive_timeout(Duration::from_secs(25))
.tls_config(ClientTlsConfig::new())
.map_err(|e| Error::new(ErrorKind::SetupError).with(e))?
.connect()
Expand Down
90 changes: 90 additions & 0 deletions xmtp_api_grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod auth_token;
pub mod grpc_api_helper;

pub const LOCALHOST_ADDRESS: &str = "http://localhost:5556";
Expand All @@ -9,6 +10,8 @@ pub use grpc_api_helper::Client;
mod tests {
use std::time::{SystemTime, UNIX_EPOCH};

use self::auth_token::Authenticator;

use super::*;
use futures::StreamExt;
use xmtp_proto::{
Expand All @@ -18,6 +21,9 @@ mod tests {
},
};

const PRIVATE_KEY_BUNDLE_HEX: &str = "0a88030ac20108eec0888ae33112220a201cd19d1d6e129cb8f8ba4bd85aae10ffcc97a3de939d85f9bc378d47e6ba83711a940108eec0888ae33112460a440a40130cfb1cd667f48585f90372fe4b529da318e83221a3bfd1446ef6cf00d173543fed831d1517d310b05bd5ab138fde22af50a3ffce1aa72da8c7084e9bab0e4910011a430a4104c4eb77c3b2eaacaca12e2b55c6c42dc33f4518a5690bb49cd6ae0e0a652e59fbc9defd98242d30a0737a13c3461cac1edc0f8e3007d65b1637382088ac1cd3d712c00108a4c1888ae33112220a2062e553bceac5247e7bebfdcc8c31959965603e442f79c6346028060ab2129e931a920108a4c1888ae33112440a420a40d12c6ab6eb1874edd3044fdc753543516130bd4d1db11024bd81cd9c2c4bb6b6138e85ed313f387ea7707e09090659b580ee22f42f022c4521e4a11ab7abddfc1a430a4104175097c31bbe1700729f1f1ede87b8bd21a5bc62e4bb4c963e0de885080048bd31138b657fd9146aa8255f1c57c4fa1f8cb7b30bed8803eed48d6a3e67e71ccf";
const WALLET_ADDRESS: &str = "0xA38A1f04B29dea1de621E17447fB4efB11BFfBdf";

// Return the json serialization of an Envelope with bytes
pub fn test_envelope(topic: String) -> Envelope {
let time_since_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
Expand All @@ -29,6 +35,16 @@ mod tests {
}
}

fn get_auth_token() -> String {
// This is a private key bundle exported from the JS SDK and hex encoded
let authenticator = Authenticator::from_hex(
PRIVATE_KEY_BUNDLE_HEX.to_string(),
WALLET_ADDRESS.to_string(),
);

authenticator.create_token()
}

#[tokio::test]
async fn grpc_query_test() {
let mut client = Client::create(LOCALHOST_ADDRESS.to_string(), false)
Expand Down Expand Up @@ -210,4 +226,78 @@ mod tests {
let value_2 = stream.next().await.unwrap().unwrap();
assert_eq!(value_2.content_topic, topic_2.to_string());
}

#[tokio::test]
async fn test_dev_publish() {
let auth_token = get_auth_token();
let dev_client = Client::create(DEV_ADDRESS.to_string(), true).await.unwrap();
dev_client
.publish(
auth_token,
PublishRequest {
envelopes: vec![Envelope {
content_topic: "/xmtp/0/foo/2".to_string(),
timestamp_ns: 3,
message: vec![1, 2, 3],
}],
},
)
.await
.unwrap();
}

#[tokio::test]
async fn long_lived_subscribe_test() {
let auth_token = get_auth_token();
tokio::time::timeout(std::time::Duration::from_secs(30), async move {
let client = Client::create(DEV_ADDRESS.to_string(), true).await.unwrap();

let topic = uuid::Uuid::new_v4();
let mut subscription = client
.subscribe2(SubscribeRequest {
content_topics: vec![topic.to_string()],
})
.await
.unwrap();

client
.publish(
auth_token.to_string(),
PublishRequest {
envelopes: vec![test_envelope(topic.to_string())],
},
)
.await
.unwrap();

// Sleep to give the response time to come back
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

let mut next_message = subscription.next().await.unwrap();
if let Err(err) = next_message {
panic!("Message 1 Error: {}", err);
}

tokio::time::sleep(std::time::Duration::from_secs(15)).await;
client
.publish(
auth_token.to_string(),
PublishRequest {
envelopes: vec![test_envelope(topic.to_string())],
},
)
.await
.unwrap();

// Sleep to give the response time to come back
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

next_message = subscription.next().await.unwrap();
if let Err(err) = next_message {
panic!("Message 2 Error: {}", err);
}
})
.await
.expect("Timed out");
}
}
9 changes: 9 additions & 0 deletions xmtp_v2/src/k256_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ pub fn sign_sha256(secret_key: &[u8], message: &[u8]) -> Result<(Vec<u8>, u8), S
Ok((signature.to_vec(), recovery_id.to_byte()))
}

pub fn sign_keccak_256(secret_key: &[u8], message: &[u8]) -> Result<(Vec<u8>, u8), String> {
let signing_key = SigningKey::from_bytes(secret_key).map_err(|e| e.to_string())?;
let hash = Keccak256::new().chain(message);
let (signature, recovery_id) = signing_key
.sign_digest_recoverable::<Keccak256>(hash)
.map_err(|e| e.to_string())?;
Ok((signature.to_vec(), recovery_id.to_byte()))
}

/// Verify given a compact signature, recovery_id, digest, and public key in uncompressed format
/// NOTE: the recovery_id situation is not necessary, but it is a good sanity check
pub fn verify_sha256(
Expand Down

0 comments on commit c9b5d6b

Please sign in to comment.