Skip to content

Commit

Permalink
Added --connect-timeout with default of one second
Browse files Browse the repository at this point in the history
  • Loading branch information
blakejakopovic committed Nov 25, 2022
1 parent 0e84776 commit 87053e9
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
notes.txt
*.rs.bk
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Changelog

## v0.3.1
## v0.3.1-Beta
* Added --stream flag
* Added --connect-timeout option
* Added multi-line stdin support
* Added support for piping server to server
* Added websocket PONG response
* Added default one second connection timeout
* Added default one second connection timeout (non-streaming)
* Updated app description
* Migrate from async to threads
* Refactored main.rs + lib.rs
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ $ echo '["REQ", "RAND", {"kinds": [1], "limit": 8}]' |
nostcat --unique wss://relay.damus.io wss://nostr.ono.re
```

With a websocket connection timeout in milliseconds
```shell
$ echo '["REQ", "RAND", {"kinds": [1], "limit": 2}]' |
nostcat --connect-timeout 250 wss://relay.damus.io
```


Stream websocket data (like tail -f)
```shell
$ echo '["REQ", "RAND", {"kinds": [1], "limit": 8}]' |
Expand Down
66 changes: 42 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use clap::{Arg, ArgAction, Command};
use clap::{Arg, ArgAction, ArgMatches, Command};
use std::io::{self, BufRead};
use std::sync::mpsc;
use std::time::Duration;
Expand All @@ -12,7 +12,7 @@ enum Response {
Notice(String),
Ok(String),
EOSE(String),
Unsupported(String)
Unsupported(String),
}

impl Response {
Expand All @@ -22,7 +22,7 @@ impl Response {
s if s.starts_with("[\"NOTICE\"") => Response::Notice(s),
s if s.starts_with("[\"OK\"") => Response::Ok(s),
s if s.starts_with("[\"EOSE\"") => Response::EOSE(s),
_ => Response::Unsupported(s)
_ => Response::Unsupported(s),
}
}
}
Expand Down Expand Up @@ -51,6 +51,15 @@ pub fn cli() -> Command {
.num_args(0)
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("connect-timeout")
.help("Websocket connection timeout in milliseconds (non-streaming)")
.long("connect-timeout")
.required(false)
.num_args(1)
.value_parser(clap::value_parser!(u64))
.default_value("1"),
)
.arg(
Arg::new("servers")
.help("Websocket servers")
Expand All @@ -75,9 +84,12 @@ pub fn run(
tx: &mpsc::Sender<Result<String, String>>,
url_str: String,
input: Vec<String>,
stream: bool,
args: ArgMatches,
) {
// Connect to websocket

// Safe to unwrap as default arg value
let connect_timeout = args.get_one("connect-timeout").unwrap();

let url = match Url::parse(&url_str) {
Ok(url) => url,
Err(err) => {
Expand All @@ -87,27 +99,25 @@ pub fn run(
}
};

// TODO: Need to add connection timeout, as currently it hangs for bad/failed connections
// https://users.rust-lang.org/t/tls-websocket-how-to-make-tungstenite-works-with-mio-for-poll-and-secure-websocket-wss-via-native-tls-feature-of-tungstenite-crate/72533/4
let (mut socket, response) = match connect(url.clone()) {
Ok((socket, response)) => (socket, response),
Err(err) => {
tx.send(Err(format!(
"Unable to connect to websocket server: {}",
err
)))
.unwrap();
tx.send(Err(
format!("Unable to connect to websocket server: {}", err),
)).unwrap();
return;
}
};

let stream = args.get_flag("stream");
if !stream {
let timeout_ms = 1000;
let timeout = Duration::from_millis(timeout_ms);
let timeout_duration = Duration::from_millis(*connect_timeout);

let timeout_res = match socket.get_mut() {
MaybeTlsStream::NativeTls(ref mut s) => s.get_mut().set_read_timeout(Some(timeout)),
MaybeTlsStream::Plain(ref s) => s.set_read_timeout(Some(timeout)),
MaybeTlsStream::NativeTls(ref mut s) => {
s.get_mut().set_read_timeout(Some(timeout_duration))
}
MaybeTlsStream::Plain(ref s) => s.set_read_timeout(Some(timeout_duration)),

t => {
log::warn!("{:?} not handled, not setting read timeout", t);
Expand All @@ -116,15 +126,14 @@ pub fn run(
};

match timeout_res {
Err(err) => log::error!("error setting timeout: {}", err),
Ok(_) => log::info!("Setting timeout to {} ms", timeout_ms),
Err(err) => log::error!("Error setting timeout: {}", err),
Ok(_) => log::info!("Setting timeout to {} ms", connect_timeout),
}
}

log::info!("Connected to websocket server -- {}", url_str);
log::info!("Response HTTP code -- {}: {}", url_str, response.status());

// Send input (stdin)
for line in input {
match socket.write_message(Message::Text(line.to_owned())) {
Ok(_) => {
Expand All @@ -139,16 +148,24 @@ pub fn run(
}

'run_loop: loop {
// TODO: Review better error handing for this

let msg = socket.read_message();

if let Err(err) = msg.as_ref() {
let errmsg = format!("read error: {}", err);
let errmsg = format!("Websocket read message error: {}", err);

// Connection timeout detected
if errmsg.contains("Resource temporarily unavailable") {
let timeout_msg = format!("{} timed out when waiting for a response", url_str);
let timeout_msg = format!(
"Connection timed out after {}ms while waiting for a response -- {}",
connect_timeout,
url_str
);
log::info!("{}", timeout_msg);
tx.send(Err(timeout_msg)).unwrap();
return;
}

tx.send(Err(errmsg)).unwrap();
break;
}
Expand Down Expand Up @@ -194,11 +211,12 @@ pub fn run(

Response::Event(data) => {
tx.send(Ok(data)).unwrap();
},
}

// Handle unsupported nostr data
Response::Unsupported(data) => {
tx.send(Err(format!("Received unsupported nostr data: {:?}", data))).unwrap();
tx.send(Err(format!("Received unsupported nostr data: {:?}", data)))
.unwrap();
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ fn main() {
.map(|v| v.to_string())
.collect::<Vec<_>>();

let stream = cli_matches.get_flag("stream");

let input = read_input();

let (tx, rx) = mpsc::channel();
Expand All @@ -26,10 +24,11 @@ fn main() {
for server in servers {
let tx = tx.clone();
let input = input.clone();
let cli_matches = cli_matches.clone();

log::info!("Spawning thread for -- {}", server);

let jh = thread::spawn(move || run(&tx, server, input, stream));
let jh = thread::spawn(move || run(&tx, server, input, cli_matches));

v.push(jh);
}
Expand Down

0 comments on commit 87053e9

Please sign in to comment.