diff --git a/Cargo.toml b/Cargo.toml index 50399cf..b5ab322 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bping" -version = "2.0.5" +version = "2.1.0" description = "A command line utility to ping a website from anywhere in the world!" authors = ["Firaenix "] edition = "2021" @@ -52,7 +52,8 @@ bpaf = { version = "0.9.15", features = [ "autocomplete", "batteries", ] } -tracing = "0.1.40" -tracing-subscriber = "0.3.18" +tracing = { version = "0.1", features = ["async-await"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } rand = "0.8.5" regress = "0.10.1" +tokio-retry = "0.3.0" diff --git a/src/display/mod.rs b/src/display/mod.rs index d64b184..aca4bfc 100644 --- a/src/display/mod.rs +++ b/src/display/mod.rs @@ -1,6 +1,5 @@ mod ping_display; mod print; -// mod progress_bar; - pub use print::*; -// pub use progress_bar::*; +mod progress; +pub use progress::*; diff --git a/src/display/ping_display.rs b/src/display/ping_display.rs index b56e69d..5506bd2 100644 --- a/src/display/ping_display.rs +++ b/src/display/ping_display.rs @@ -6,28 +6,35 @@ use crate::{ options::Opts, }; use colorful::{Color, Colorful}; +use indicatif::ProgressBar; use std::*; +use sync::Arc; -fn print_border(pb: &indicatif::ProgressBar, width: usize) { +fn print_border(pb: &ProgressBar, width: usize) { pb.println("┌".to_string() + &"─".repeat(width - 2) + "┐"); } -fn print_footer(pb: &indicatif::ProgressBar, width: usize) { +fn print_footer(pb: &ProgressBar, width: usize) { pb.println("└".to_string() + &"─".repeat(width - 2) + "┘"); } -pub fn display_success_ping( - pb: &indicatif::ProgressBar, - config: &Opts, +async fn sleep_if_enabled(config: &'static Opts, duration: u64) { + if !config.no_delay { + tokio::time::sleep(std::time::Duration::from_millis(duration)).await; + } +} + +pub async fn display_success_ping( + pb: &ProgressBar, + config: &'static Opts, endpoint: &str, jobres: &PerformIcmpResponseResultsItemResult, node_info: &PerformIcmpResponseNodeInfo, ) { - let width = 80; // Adjust this value as needed + let width = 80; print_border(pb, width); format_ping_header(pb, config, endpoint, &jobres.ip_address, node_info); - // Display individual ping results let trips = jobres.trips as usize; for i in 0..trips { let time = jobres.min + (jobres.max - jobres.min) * (i as f64 / (trips - 1) as f64); @@ -35,42 +42,38 @@ pub fn display_success_ping( "│ 64 bytes from {}: icmp_seq={} ttl=120 time={:.2} ms", jobres.ip_address, i, time )); - std::thread::sleep(std::time::Duration::from_millis(time as u64)); + sleep_if_enabled(config, time as u64).await; } - pb.println("│"); // Empty line for spacing - - // Construct and print statistics line + pb.println("│"); pb.println(format!("│ --- {endpoint} ping statistics ---")); - std::thread::sleep(std::time::Duration::from_millis(250)); + sleep_if_enabled(config, 250).await; - // Print packet loss information pb.println(format!( "│ {} packets transmitted, {} packets received, {:.1}% packet loss", jobres.packets_sent, jobres.packets_recv, jobres.packet_loss * 100.0 )); - std::thread::sleep(std::time::Duration::from_millis(250)); + sleep_if_enabled(config, 250).await; - // Print round-trip statistics pb.println(format!( "│ round-trip min/avg/max/stddev = {:.3}/{:.3}/{:.3}/{:.3} ms", jobres.min, jobres.avg, jobres.max, jobres.std_dev )); - std::thread::sleep(std::time::Duration::from_millis(250)); + sleep_if_enabled(config, 250).await; print_footer(pb, width); } -pub fn display_failed_ping( - pb: &indicatif::ProgressBar, - config: &Opts, +pub async fn display_failed_ping( + pb: &ProgressBar, + config: &'static Opts, jobres: &PerformIcmpResponseResultsItem, node_info: &PerformIcmpResponseNodeInfo, ) { - let width = 80; // Adjust this value as needed + let width = 80; print_border(pb, width); let ip_address = jobres .result @@ -78,19 +81,15 @@ pub fn display_failed_ping( .map_or("Unknown".to_string(), |r| r.ip_address.clone()); format_ping_header(pb, config, &jobres.endpoint, &ip_address, node_info); - // Request timeout for icmp_seq 0, 1, 2, 3 let attempts = jobres.result.as_ref().map_or(4, |r| r.attempts as usize); for index in 0..attempts { pb.println(format!("│ Request timeout for icmp_seq {}", index)); - std::thread::sleep(std::time::Duration::from_millis(500)); + sleep_if_enabled(config, 500).await; } - // --- asdasdasd.com ping statistics --- pb.println(format!("│ --- {} ping statistics ---", jobres.endpoint)); + sleep_if_enabled(config, 250).await; - std::thread::sleep(std::time::Duration::from_millis(250)); - - // 5 packets transmitted, 0 packets received, 100.0% packet loss let error_string = if let Some(result) = &jobres.result { format!( "│ {} packets transmitted, {} packets received, {:.1}% packet loss", @@ -104,16 +103,16 @@ pub fn display_failed_ping( attempts ) }; - std::thread::sleep(std::time::Duration::from_millis(250)); + sleep_if_enabled(config, 250).await; pb.println(format!("{}", error_string.color(Color::Red))); - std::thread::sleep(std::time::Duration::from_millis(250)); + sleep_if_enabled(config, 250).await; print_footer(pb, width); } pub fn format_ping_header( - pb: &indicatif::ProgressBar, + pb: &ProgressBar, config: &Opts, endpoint: &str, ip_address: &str, diff --git a/src/display/print.rs b/src/display/print.rs index 6015cdf..2e11b08 100644 --- a/src/display/print.rs +++ b/src/display/print.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use indicatif::ProgressBar; use tracing::error; @@ -5,7 +7,7 @@ use crate::{models::types::PerformIcmpResponse, options::Opts}; use super::ping_display; -pub async fn display_job(pb: &ProgressBar, config: &Opts, job_data: &PerformIcmpResponse) { +pub async fn display_job(pb: &ProgressBar, config: &'static Opts, job_data: PerformIcmpResponse) { for result in &job_data.results { if let Some(err) = &result.error { error!(?err, "Fatal job error."); @@ -14,17 +16,18 @@ pub async fn display_job(pb: &ProgressBar, config: &Opts, job_data: &PerformIcmp if let Some(job_result) = &result.result { if job_result.packet_loss == 1.0 { - ping_display::display_failed_ping(pb, config, result, &job_data.node_info); + ping_display::display_failed_ping(&pb, &config, result, &job_data.node_info).await; continue; } ping_display::display_success_ping( - pb, - config, + &pb, + &config, &result.endpoint, job_result, &job_data.node_info, - ); + ) + .await; } pb.println(""); diff --git a/src/display/progress.rs b/src/display/progress.rs new file mode 100644 index 0000000..492eae7 --- /dev/null +++ b/src/display/progress.rs @@ -0,0 +1,76 @@ +use color_eyre::eyre::Result; +// progress.rs +use indicatif::{ProgressBar, ProgressStyle}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::task::JoinHandle; + +use crate::display::display_job; +use crate::models::types::PerformIcmpResponse; +use crate::options::Opts; + +pub struct ProgressDisplay { + bar: ProgressBar, + config: &'static Opts, + + rx: Receiver, +} + +#[derive(Clone)] +pub struct ProgressUpdater { + bar: ProgressBar, + tx: Sender, +} + +impl Drop for ProgressUpdater { + fn drop(&mut self) { + self.bar.finish_with_message("Completed"); + } +} + +impl ProgressUpdater { + pub(crate) async fn display_job( + &self, + job: progenitor::progenitor_client::ResponseValue< + crate::models::types::PerformIcmpResponse, + >, + ) { + let _ = self.tx.send(job.into_inner()).await; + self.bar.inc(1); + } +} + +impl ProgressDisplay { + pub fn new(config: &'static Opts) -> Result<(Self, ProgressUpdater)> { + let mut spinner_style = ProgressStyle::default_spinner() + .tick_chars("-\\|/") + .template("{spinner:.green} {msg:.cyan/blue} [{elapsed_precise}] {pos}/{len}")?; + + let bar = ProgressBar::new((config.attempts * config.regions.len()) as u64); + + let world_ticker = format!("{}", console::Emoji("🌍🌍🌎🌎🌏🌏🌎🌎🌍🌍", "-\\|/")); + spinner_style = spinner_style.tick_chars(&world_ticker); + bar.enable_steady_tick(Duration::from_millis(350)); + + bar.set_style(spinner_style); + + let (tx, rx) = mpsc::channel(config.concurrency); + + Ok(( + Self { + bar: bar.clone(), + config, + rx, + }, + ProgressUpdater { bar, tx }, + )) + } + + pub async fn display_job_thread(&mut self) { + while let Some(x) = self.rx.recv().await { + display_job(&self.bar, self.config, x).await; + } + } +} diff --git a/src/display/progress_bar.rs b/src/display/progress_bar.rs deleted file mode 100644 index ef72d0f..0000000 --- a/src/display/progress_bar.rs +++ /dev/null @@ -1,40 +0,0 @@ -pub fn get_progress_bar_text(endpoint: &str, regions: &Vec) -> String { - let config = app_config::get_configuration(); - - let regions_display = match regions.len() { - 0 => { - let mut world_message = "Worldwide".to_string(); - if config.show_emojis == true { - if let Some(emoji) = emojis::get_emoji_for_country_code("WORLD".to_string()) { - world_message = - format!("{}", console::Emoji(&format!("{}", emoji), "Worldwide")) - } - } - world_message - } - _ => regions - .iter() - .map(|region| { - let mut region_msg = region.to_owned(); - - if config.show_emojis == true { - if let Some(emoji) = custom_validators::get_emoji_safe_region_code(region) - .and_then(emojis::get_emoji_for_country_code) - { - let formatted_emoji = match custom_validators::is_continent(region) { - true => format!("{}", emoji), - false => format!("{} ", emoji), - }; - - region_msg = format!("{}", console::Emoji(&formatted_emoji, region)) - } - } - region_msg - }) - .collect::>() - .join(","), - }; - - return format!("Collecting results for {} [{}]", endpoint, regions_display); -} - diff --git a/src/job.rs b/src/job.rs new file mode 100644 index 0000000..b9e2516 --- /dev/null +++ b/src/job.rs @@ -0,0 +1,148 @@ +use crate::{ + display::{ProgressDisplay, ProgressUpdater}, + models::*, + options::{EarthRegion, Opts}, +}; +use color_eyre::eyre::{self, Context, Result}; +use futures::stream::{self, StreamExt}; +use reqwest::header::{HeaderMap, HeaderValue}; +use std::{iter::repeat, str::FromStr, sync::Arc, time::Duration}; +use tokio_retry::{strategy::ExponentialBackoff, Retry}; +use tracing::{error, info}; +use types::{ + PerformIcmpBody, PerformIcmpBodyConfiguration, PerformIcmpBodyContinentCode, + PerformIcmpBodyCountryCode, PerformIcmpBodyMobile, PerformIcmpBodyProxy, + PerformIcmpBodyResidential, PerformIcmpResponse, +}; + +#[derive(Debug)] +pub struct IcmpJob { + config: &'static Opts, + region: EarthRegion, +} + +impl IcmpJob { + pub fn new(config: &'static Opts, region: EarthRegion) -> Self { + Self { config, region } + } + + pub async fn execute( + &self, + client: &Client, + ) -> eyre::Result> { + info!( + region = ?self.region, + attempts = self.config.count, + "Executing ICMP job" + ); + let (country_code, continent_code) = self.region.get_codes()?; + + let request = PerformIcmpBody { + configuration: Some(PerformIcmpBodyConfiguration { + payload_size: Some(56.0), + timeout_millis: None, + attempts: Some(self.config.count as f64), + }), + country_code, + continent_code, + hostnames: vec![self.config.endpoint.to_string()], + isp_regex: None, + city: None, + mobile: PerformIcmpBodyMobile::from_str(&self.config.mobile.to_string())?, + node_id: None, + proxy: PerformIcmpBodyProxy::from_str(&self.config.proxy.to_string())?, + residential: PerformIcmpBodyResidential::from_str( + &self.config.residential.to_string(), + )?, + }; + + self.execute_with_retry(client, request).await + } + + async fn execute_with_retry( + &self, + client: &Client, + request: PerformIcmpBody, + ) -> eyre::Result> { + let retry_strategy = ExponentialBackoff::from_millis(100) + .factor(2) + .max_delay(Duration::from_secs(60)) + .take(3); + + info!("Executing request with retry strategy"); + Retry::spawn(retry_strategy, || async { + client + .perform_icmp(&request) + .await + .context("Failed to send job") + }) + .await + } +} + +pub struct JobScheduler { + config: &'static Opts, + client: Client, +} + +impl JobScheduler { + pub fn new(config: &'static Opts) -> Result { + info!( + concurrency = config.concurrency, + attempts = config.attempts, + "Initializing job scheduler" + ); + + let mut headers = HeaderMap::new(); + headers.insert( + "x-api-key", + HeaderValue::try_from(&config.api_key) + .context("Unable to parse API Key into header")?, + ); + + let req_client = reqwest::Client::builder() + .default_headers(headers) + .build() + .context("Failed to build HTTP client")?; + let client = Client::new_with_client("https://api.bitping.com/v2", req_client); + + Ok(Self { config, client }) + } + + pub async fn execute_jobs(&self, progress: ProgressUpdater) -> Result<(), Error> { + info!( + regions = ?self.config.regions, + "Starting job execution" + ); + let jobs = self.jobs_iterator(); + + let progress = Arc::new(progress); + + stream::iter(jobs) + .for_each_concurrent(Some(self.config.concurrency), |job| { + let client = self.client.clone(); + + let progress = progress.clone(); + async move { + match job.execute(&client).await { + Ok(v) => progress.display_job(v).await, + Err(e) => { + error!("Job failed: {}", e); + } + }; + } + }) + .await; + + Ok(()) + } + + // Replace prepare_jobs with an iterator + fn jobs_iterator(&self) -> impl Iterator + '_ { + self.config.regions.iter().flat_map(move |region| { + repeat(region) + .take(self.config.attempts) + .map(move |r| IcmpJob::new(self.config, r.clone())) + }) + } +} diff --git a/src/main.rs b/src/main.rs index a4eb835..56a1b42 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,162 +1,40 @@ +use std::sync::LazyLock; + use color_eyre::eyre::{self, Context, ContextCompat}; -use colorful::Colorful; -use display::display_job; -use keshvar::Alpha2; -use models::{ - types::{ - PerformIcmpBody, PerformIcmpBodyConfiguration, PerformIcmpBodyContinentCode, - PerformIcmpBodyCountryCode, PerformIcmpBodyMobile, PerformIcmpBodyProxy, - PerformIcmpBodyResidential, PerformIcmpResponse, PerformIcmpResponseResultsItem, - }, - Client, -}; +use display::ProgressDisplay; +use job::{IcmpJob, JobScheduler}; use options::Opts; -use progenitor::progenitor_client::ResponseValue; -use rand::seq::SliceRandom; -use reqwest::header::{HeaderMap, HeaderValue}; -use std::{ - pin, - str::FromStr, - sync::{LazyLock, OnceLock}, - thread, - time::Duration, -}; -use tokio::{sync::mpsc, task::JoinSet}; - -use futures::{stream, StreamExt}; - -use indicatif::{ProgressBar, ProgressStyle}; -use tracing::{self, debug, info}; +use tokio::join; +use tracing::{error, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; mod display; +mod job; mod models; mod options; static APP_CONFIG: LazyLock = LazyLock::new(|| Opts::parser().run()); -static API_CLIENT: LazyLock = LazyLock::new(|| { - let mut headers = HeaderMap::new(); - headers.insert( - "x-api-key", - HeaderValue::try_from(&APP_CONFIG.api_key).expect("Unable to parse API Key into header"), - ); - - let req_client = reqwest::Client::builder() - .default_headers(headers) - .build() - .expect("Failed to build HTTP client"); - Client::new_with_client("https://api.bitping.com/v2", req_client) -}); #[tokio::main] async fn main() -> eyre::Result<()> { color_eyre::install()?; + let filter = EnvFilter::from_default_env(); - tracing_subscriber::fmt::init(); - - let endpoint = &APP_CONFIG.endpoint; - tracing::debug!("Value for endpoint: {}", &endpoint); - - tracing::debug!( - "Number of jobs to send is {}", - APP_CONFIG.attempts * APP_CONFIG.regions.len() - ); - - let mut spinner_style = ProgressStyle::default_spinner() - .tick_chars("-\\|/") - .template("{spinner:.green} {msg:.cyan/blue} [{elapsed_precise}] {pos}/{len}")?; - - let pb = ProgressBar::new((APP_CONFIG.attempts * APP_CONFIG.regions.len()) as u64); - - let world_ticker = format!("{}", console::Emoji("🌍🌍🌎🌎🌏🌏🌎🌎🌍🌍", "-\\|/")); - spinner_style = spinner_style.tick_chars(&world_ticker); - pb.enable_steady_tick(Duration::from_millis(350)); - - pb.set_style(spinner_style); - - for region in &APP_CONFIG.regions { - for chunk in (0..APP_CONFIG.attempts) - .collect::>() - .chunks(APP_CONFIG.concurrency) - { - let mut chunk_set = JoinSet::new(); - - for _ in chunk { - let pb = pb.clone(); - let region = region.clone(); - let endpoint = endpoint.clone(); - - chunk_set.spawn(async move { - let (country_code, continent_code) = match region { - options::EarthRegion::Country(c) => ( - Some(PerformIcmpBodyCountryCode::from_str( - &c.to_country().alpha2().to_string(), - )?), - None, - ), - options::EarthRegion::Continent(con) => ( - None, - Some(PerformIcmpBodyContinentCode::from_str(match con { - keshvar::Continent::Africa => "AF", - keshvar::Continent::Antarctica => "AN", - keshvar::Continent::Asia => "AS", - keshvar::Continent::Australia => "OC", - keshvar::Continent::Europe => "EU", - keshvar::Continent::NorthAmerica => "NA", - keshvar::Continent::SouthAmerica => "SA", - })?), - ), - _ => (None, None), - }; - - debug!(?country_code, "Sending job to country"); - - let result = API_CLIENT - .perform_icmp(&PerformIcmpBody { - configuration: Some(PerformIcmpBodyConfiguration { - payload_size: Some(56.0), - timeout_millis: None, - attempts: Some(APP_CONFIG.count as f64), - }), - country_code, - continent_code, - hostnames: vec![endpoint.to_string()], - isp_regex: None, - city: None, - mobile: PerformIcmpBodyMobile::from_str( - &APP_CONFIG.mobile.to_string(), - )?, - node_id: None, - proxy: PerformIcmpBodyProxy::from_str(&APP_CONFIG.proxy.to_string())?, - residential: PerformIcmpBodyResidential::from_str( - &APP_CONFIG.residential.to_string(), - )?, - }) - .await - .context("Failed to send job"); + let fmt = tracing_subscriber::fmt::Layer::default() + .compact() + .pretty() + .with_thread_ids(true) + .with_target(false); + tracing_subscriber::registry().with(fmt).with(filter).init(); - pb.inc(1); + let (mut progress, updater) = ProgressDisplay::new(&APP_CONFIG)?; + let scheduler = JobScheduler::new(&APP_CONFIG)?; - result - }); - } + info!(config = ?&APP_CONFIG, "Starting job execution with config"); - while let Some(res) = chunk_set.join_next().await { - match res { - Ok(Ok(out)) => { - tracing::debug!("Response {:?}", out); - display_job(&pb, &APP_CONFIG, &out).await; - } - Ok(Err(e)) => { - tracing::error!("API request failed: {}", e); - } - Err(e) => { - tracing::error!("Task join error: {}", e); - } - } - } - } - } + let display_driver = progress.display_job_thread(); + let schedule_driver = scheduler.execute_jobs(updater); + let _ = join!(schedule_driver, display_driver); - pb.finish(); Ok(()) } diff --git a/src/options/opts.rs b/src/options/opts.rs index 3c8ba6c..65a8a8f 100644 --- a/src/options/opts.rs +++ b/src/options/opts.rs @@ -1,9 +1,11 @@ -use std::fmt::Display; +use std::{fmt::Display, str::FromStr}; use bpaf::{long, OptionParser, Parser}; -use color_eyre::eyre; +use color_eyre::eyre::{self, Result}; use keshvar::Continent; +use crate::models::types::{PerformIcmpBodyContinentCode, PerformIcmpBodyCountryCode}; + #[derive(Debug, Clone)] pub enum NetworkPolicy { Allowed, @@ -44,6 +46,7 @@ pub struct Opts { pub residential: NetworkPolicy, pub mobile: NetworkPolicy, pub proxy: NetworkPolicy, + pub no_delay: bool, } impl Opts { @@ -103,6 +106,10 @@ impl Opts { .optional() .map(NetworkPolicy::from); + let no_delay = bpaf::long("no-delay") + .help("Disable delays in output display") + .switch(); + bpaf::construct!(Opts { regions, count, @@ -112,6 +119,7 @@ impl Opts { residential, mobile, proxy, + no_delay, endpoint, }) .to_options() @@ -127,6 +135,37 @@ pub enum EarthRegion { Anywhere, } +impl EarthRegion { + pub fn get_codes( + &self, + ) -> Result<( + Option, + Option, + )> { + Ok(match self { + EarthRegion::Country(c) => ( + Some(PerformIcmpBodyCountryCode::from_str( + &c.to_country().alpha2().to_string(), + )?), + None, + ), + EarthRegion::Continent(con) => ( + None, + Some(PerformIcmpBodyContinentCode::from_str(match con { + keshvar::Continent::Africa => "AF", + keshvar::Continent::Antarctica => "AN", + keshvar::Continent::Asia => "AS", + keshvar::Continent::Australia => "OC", + keshvar::Continent::Europe => "EU", + keshvar::Continent::NorthAmerica => "NA", + keshvar::Continent::SouthAmerica => "SA", + })?), + ), + _ => (None, None), + }) + } +} + impl std::fmt::Display for EarthRegion { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -137,7 +176,7 @@ impl std::fmt::Display for EarthRegion { } } -pub fn parse_alpha_codes(regions: &str) -> eyre::Result> { +pub fn parse_alpha_codes(regions: &str) -> Result> { if regions.trim().to_lowercase() == "anywhere" { return Ok(vec![EarthRegion::Anywhere]); }