Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supported socks5 proxy for crypto-ws-client #18

Merged
merged 5 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crypto-ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ reqwest = { version = "0.11.10", features = ["gzip"] }
serde_json = "1.0.79"
tokio = { version = "1.17.0", features = ["rt-multi-thread", "time", "sync", "macros"] }
tokio-tungstenite = { version = "0.17.1", features = ["rustls-tls-native-roots"] }
fast-socks5 = "0.7.0"

[dev-dependencies]
tokio = { version = "1.17.0", features = ["test-util"] }
102 changes: 101 additions & 1 deletion crypto-ws-client/src/common/connect_async.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::env;
use futures_util::{SinkExt, StreamExt};
use log::*;
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -6,6 +7,8 @@ use tokio_tungstenite::tungstenite::{Error, Message};
use governor::{Quota, RateLimiter};
use nonzero_ext::*;
use std::num::NonZeroU32;
use fast_socks5::client::{Config, Socks5Stream};
use reqwest::Url;

/// Wraps a websocket client inside an event loop, returns a message_rx to receive messages and
/// a command_tx to send commands to the websocket server.
Expand All @@ -17,6 +20,24 @@ pub async fn connect_async(
url: &str,
uplink_limit: Option<(NonZeroU32, std::time::Duration)>,
) -> Result<(Receiver<Message>, Sender<Message>), Error> {

if let Ok(proxy_env) = env::var("https_proxy").or(env::var("http_proxy")) {
let proxy_url = Url::parse(&proxy_env).unwrap();
let proxy_addr = format!("{}:{}", proxy_url.host_str().unwrap(), proxy_url.port_or_known_default().unwrap());
match proxy_url.scheme().to_lowercase().as_str() {
"socks5" => connect_async_with_socks5_proxy(url, &proxy_addr, uplink_limit).await,
_ => panic!("proxy scheme not implement")
}

} else {
connect_async_direct(url, uplink_limit).await
}
}

pub async fn connect_async_direct(
url: &str,
uplink_limit: Option<(NonZeroU32, std::time::Duration)>,
) -> Result<(Receiver<Message>, Sender<Message>), Error> {
let (command_tx, mut command_rx) = tokio::sync::mpsc::channel::<Message>(1);
let (message_tx, message_rx) = tokio::sync::mpsc::channel::<Message>(32);

Expand Down Expand Up @@ -47,7 +68,86 @@ pub async fn connect_async(
_ => {
limiter.until_ready().await;
if let Err(err) =write.send(command).await {
error!("{}", err);
error!("{:#?}", err);
}
}
}
}
None => {
debug!("command_rx closed");
break;
}
}
}
msg = read.next() => match msg {
Some(Ok(msg)) => {
let _= message_tx.send(msg).await;
}
Some(Err(err)) => {
error!("{}", err);
break;
}
None => {
debug!("message_tx closed");
break;
}
}
};
}
write.send(Message::Close(None)).await;
});

Ok((message_rx, command_tx))
}

pub async fn connect_async_with_socks5_proxy(
url: &str,
proxy_addr: &str,
uplink_limit: Option<(NonZeroU32, std::time::Duration)>,
) -> Result<(Receiver<Message>, Sender<Message>), Error> {
let (command_tx, mut command_rx) = tokio::sync::mpsc::channel::<Message>(1);
let (message_tx, message_rx) = tokio::sync::mpsc::channel::<Message>(32);
// replace with socks5 stream
let connect_url = Url::parse(url).unwrap();
let proxy_stream = Socks5Stream::connect(
proxy_addr.to_string(),
connect_url.host_str().unwrap().to_string(),
connect_url.port().unwrap(),
Config::default()
).await.unwrap();
let ret = tokio_tungstenite::client_async_tls(
connect_url,
proxy_stream
).await;
// replaced
// let ret = tokio_tungstenite::connect_async(url).await;
if let Err(e) = ret {
return Err(e);
}
let (ws_stream, _) = ret.unwrap();
let (mut write, mut read) = ws_stream.split();

let limiter = if let Some((max_burst, duration)) = uplink_limit {
let quota = Quota::with_period(duration).unwrap().allow_burst(max_burst);
RateLimiter::direct(quota)
} else {
RateLimiter::direct(Quota::per_second(nonzero!(u32::max_value())))
};

tokio::task::spawn(async move {
loop {
tokio::select! {
command = command_rx.recv() => {
match command {
Some(command) => {
match command {
Message::Close(_) => {
break; // close the connection and break the loop
}
_ => {
limiter.until_ready().await;
if let Err(err) =write.send(command).await {
error!("{:#?}", err);
}
}
}
Expand Down