diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e1457b..5e7055a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,42 @@ All notable changes to this project will be documented in this file. +## v0.7.6 - 2024-10-31 + +### Highlights + +* migrate default broker API endpoint to `https://api.bgpkit.com/v3/broker` + * Full API docs is available at `https://api.bgpkit.com/docs` +* add `get_peers` to `BgpkitBroker` struct + * fetches the list of peers for a given collector + * can specify filters the same way as querying MRT files + * available filter functions include: + * `.peers_asn(ASN)` + * `.peers_ip(IP)` + * `.collector_id(COLLECTOR_ID)` + * `.peers_only_full_feed(TRUE/FALSE)` + * returns `Vec` + +```rust +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct BrokerPeer { + /// The date of the latest available data. + pub date: NaiveDate, + /// The IP address of the collector peer. + pub ip: IpAddr, + /// The ASN (Autonomous System Number) of the collector peer. + pub asn: u32, + /// The name of the collector. + pub collector: String, + /// The number of IPv4 prefixes. + pub num_v4_pfxs: u32, + /// The number of IPv6 prefixes. + pub num_v6_pfxs: u32, + /// The number of connected ASNs. + pub num_connected_asns: u32, +} +``` + ## v0.7.5 - 2024-08-23 ### [NEW] deploy at fly.io diff --git a/Dockerfile b/Dockerfile index 723cb76..2063c37 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,9 @@ FROM debian:bookworm-slim # copy the build artifact from the build stage COPY --from=build /my_project/target/release/bgpkit-broker /usr/local/bin/bgpkit-broker +RUN apt update && apt install -y curl tini WORKDIR /bgpkit-broker EXPOSE 40064 -ENTRYPOINT bash -c '/usr/local/bin/bgpkit-broker serve bgpkit-broker.sqlite3 --bootstrap --silent' +ENTRYPOINT ["/usr/bin/tini", "--", "/usr/local/bin/bgpkit-broker"] +CMD ["serve", "bgpkit-broker.sqlite3", "--bootstrap", "--silent"] diff --git a/examples/peers.rs b/examples/peers.rs new file mode 100644 index 0000000..41d06fc --- /dev/null +++ b/examples/peers.rs @@ -0,0 +1,36 @@ +//! This example retrieves a list of full-feed MRT collector peers from route-views.amsix and print +//! out the top 10 peers with the most connected ASNs. +//! +//! Example output +//! ```text +//! 2024-10-31,route-views.amsix,58511,80.249.212.104,2567,960791,0 +//! 2024-10-31,route-views.amsix,267613,80.249.213.223,2268,965321,0 +//! 2024-10-31,route-views.amsix,267613,2001:7f8:1:0:a500:26:7613:1,2011,0,206667 +//! 2024-10-31,route-views.amsix,12779,80.249.209.17,1932,951788,0 +//! 2024-10-31,route-views.amsix,9002,2001:7f8:1::a500:9002:1,1896,0,202069 +//! 2024-10-31,route-views.amsix,38880,80.249.212.75,1883,992214,0 +//! 2024-10-31,route-views.amsix,58511,2001:7f8:1::a505:8511:1,1853,0,216981 +//! 2024-10-31,route-views.amsix,9002,80.249.209.216,1318,956345,0 +//! 2024-10-31,route-views.amsix,42541,80.249.212.84,1302,952091,0 +//! 2024-10-31,route-views.amsix,12779,2001:7f8:1::a501:2779:1,1247,0,201726 +//! ``` + +fn main() { + let broker = bgpkit_broker::BgpkitBroker::new() + .collector_id("route-views.amsix") + .peers_only_full_feed(true); + let mut peers = broker.get_peers().unwrap(); + peers.sort_by(|a, b| b.num_connected_asns.cmp(&a.num_connected_asns)); + for peer in peers.iter().take(10) { + println!( + "{},{},{},{},{},{},{}", + peer.date, + peer.collector, + peer.asn, + peer.ip, + peer.num_connected_asns, + peer.num_v4_pfxs, + peer.num_v6_pfxs, + ); + } +} diff --git a/examples/query.rs b/examples/query.rs index 9d232cc..d05a2d5 100644 --- a/examples/query.rs +++ b/examples/query.rs @@ -2,6 +2,7 @@ use bgpkit_broker::{BgpkitBroker, BrokerItem}; pub fn main() { let broker = BgpkitBroker::new() + .broker_url("http://dev.api.bgpkit.com/v3/broker") .ts_start("1634693400") .ts_end("1634693400") .collector_id("rrc00,route-views2"); diff --git a/fly.toml b/fly.toml index 0e48a65..c9f5ad0 100644 --- a/fly.toml +++ b/fly.toml @@ -8,13 +8,13 @@ primary_region = 'lax' [http_service] internal_port = 40064 -force_https = true +force_https = false auto_stop_machines = 'off' processes = ['app'] [[http_service.checks]] -grace_period = "120s" -interval = "60s" +grace_period = "60s" +interval = "30s" method = "GET" timeout = "5s" path = "/health?max_delay_secs=3600" @@ -22,11 +22,9 @@ path = "/health?max_delay_secs=3600" [[vm]] memory = '1gb' -cpu_kind = 'shared' -cpus = 1 +size = 'shared-cpu-4x' [deploy] -strategy = "rolling" -max_unavailable = 1 -wait_timeout = "20m" \ No newline at end of file +strategy = "bluegreen" +wait_timeout = "20m" diff --git a/src/cli/main.rs b/src/cli/main.rs index 9aec859..576a598 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -16,6 +16,7 @@ use clap::{Parser, Subcommand}; use futures::StreamExt; use itertools::Itertools; use std::collections::{HashMap, HashSet}; +use std::net::IpAddr; use std::path::Path; use std::process::exit; use tabled::settings::Style; @@ -185,6 +186,29 @@ enum Commands { json: bool, }, + /// List public BGP collector peers + Peers { + /// filter by collector ID + #[clap(short, long)] + collector: Option, + + /// filter by peer AS number + #[clap(short = 'a', long)] + peer_asn: Option, + + /// filter by peer IP address + #[clap(short = 'i', long)] + peer_ip: Option, + + /// show only full-feed peers + #[clap(short, long)] + full_feed_only: bool, + + /// Print out search results in JSON format instead of Markdown table + #[clap(short, long)] + json: bool, + }, + /// Streaming live from a broker NATS server Live { /// URL to NATS server, e.g. nats://localhost:4222. @@ -640,6 +664,41 @@ fn main() { println!("{}", Table::new(items).with(Style::markdown())); } } + + Commands::Peers { + collector, + peer_asn, + peer_ip, + full_feed_only, + json, + } => { + let mut broker = BgpkitBroker::new(); + // health check first + if broker.health_check().is_err() { + println!("broker instance at {} is not available", broker.broker_url); + return; + } + if let Some(collector_id) = collector { + broker = broker.collector_id(collector_id); + } + if let Some(asn) = peer_asn { + broker = broker.peers_asn(asn); + } + if let Some(ip) = peer_ip { + broker = broker.peers_ip(ip); + } + if full_feed_only { + broker = broker.peers_only_full_feed(true); + } + let items = broker.get_peers().unwrap(); + + if json { + println!("{}", serde_json::to_string_pretty(&items).unwrap()); + } else { + println!("{}", Table::new(items).with(Style::markdown())); + } + } + Commands::Live { url, subject, diff --git a/src/lib.rs b/src/lib.rs index c7ffc53..f759ae2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,13 +94,12 @@ mod error; mod item; #[cfg(feature = "nats")] pub mod notifier; +mod peer; mod query; -use crate::query::{CollectorLatestResult, QueryResult}; -use std::collections::HashMap; -use std::fmt::Display; - use crate::collector::DEFAULT_COLLECTORS_CONFIG; +use crate::peer::BrokerPeersResult; +use crate::query::{BrokerQueryResult, CollectorLatestResult}; pub use collector::{load_collectors, Collector}; #[cfg(feature = "cli")] pub use crawler::crawl_collector; @@ -108,7 +107,11 @@ pub use crawler::crawl_collector; pub use db::{LocalBrokerDb, UpdatesMeta, DEFAULT_PAGE_SIZE}; pub use error::BrokerError; pub use item::BrokerItem; +pub use peer::BrokerPeer; pub use query::{QueryParams, SortOrder}; +use std::collections::HashMap; +use std::fmt::Display; +use std::net::IpAddr; /// BgpkitBroker struct maintains the broker's URL and handles making API queries. /// @@ -126,7 +129,7 @@ impl Default for BgpkitBroker { dotenvy::dotenv().ok(); let url = match std::env::var("BGPKIT_BROKER_URL") { Ok(url) => url.trim_end_matches('/').to_string(), - Err(_) => "https://api.broker.bgpkit.com/v3".to_string(), + Err(_) => "https://api.bgpkit.com/v3/broker".to_string(), }; let collector_project_map = DEFAULT_COLLECTORS_CONFIG.clone().to_project_map(); @@ -375,6 +378,63 @@ impl BgpkitBroker { } } + /// Add a filter of peer IP address when listing peers. + /// + /// # Examples + /// + /// ``` + /// let broker = bgpkit_broker::BgpkitBroker::new() + /// .peers_ip("192.168.1.1".parse().unwrap()); + /// ``` + pub fn peers_ip(self, peer_ip: IpAddr) -> Self { + let mut query_params = self.query_params; + query_params.peers_ip = Some(peer_ip); + Self { + broker_url: self.broker_url, + client: self.client, + query_params, + collector_project_map: self.collector_project_map, + } + } + + /// Add a filter of peer ASN when listing peers. + /// + /// # Examples + /// + /// ``` + /// let broker = bgpkit_broker::BgpkitBroker::new() + /// .peers_asn(64496); + /// ``` + pub fn peers_asn(self, peer_asn: u32) -> Self { + let mut query_params = self.query_params; + query_params.peers_asn = Some(peer_asn); + Self { + broker_url: self.broker_url, + client: self.client, + query_params, + collector_project_map: self.collector_project_map, + } + } + + /// Add a filter of peer full feed status when listing peers. + /// + /// # Examples + /// + /// ``` + /// let broker = bgpkit_broker::BgpkitBroker::new() + /// .peers_only_full_feed(true); + /// ``` + pub fn peers_only_full_feed(self, peer_full_feed: bool) -> Self { + let mut query_params = self.query_params; + query_params.peers_only_full_feed = peer_full_feed; + Self { + broker_url: self.broker_url, + client: self.client, + query_params, + collector_project_map: self.collector_project_map, + } + } + /// Turn to specified page, page starting from 1. /// /// This works with [Self::query_single_page] function to manually paginate. @@ -405,7 +465,7 @@ impl BgpkitBroker { pub fn query_single_page(&self) -> Result, BrokerError> { let url = format!("{}/search{}", &self.broker_url, &self.query_params); log::info!("sending broker query to {}", &url); - match self.run_query(url.as_str()) { + match self.run_files_query(url.as_str()) { Ok(res) => Ok(res), Err(e) => Err(e), } @@ -464,7 +524,7 @@ impl BgpkitBroker { loop { let url = format!("{}/search{}", &self.broker_url, &p); - let res_items = match self.run_query(url.as_str()) { + let res_items = match self.run_files_query(url.as_str()) { Ok(res) => res, Err(e) => return Err(e), }; @@ -576,25 +636,133 @@ impl BgpkitBroker { Ok(items) } - fn run_query(&self, url: &str) -> Result, BrokerError> { + /// Get the most recent information for collector peers. + /// + /// The returning result is structured as a vector of [BrokerPeer] objects. + /// + /// # Examples + /// + /// ## Get all peers + /// + /// ``` + /// let broker = bgpkit_broker::BgpkitBroker::new(); + /// let peers = broker.get_peers().unwrap(); + /// for peer in &peers { + /// println!("{:?}", peer); + /// } + /// ``` + /// + /// ## Get peers from a specific collector + /// + /// ``` + /// let broker = bgpkit_broker::BgpkitBroker::new() + /// .collector_id("route-views2"); + /// let peers = broker.get_peers().unwrap(); + /// for peer in &peers { + /// println!("{:?}", peer); + /// } + /// ``` + /// + /// ## Get peers from a specific ASN + /// + /// ``` + /// let broker = bgpkit_broker::BgpkitBroker::new() + /// .peers_asn(64496); + /// let peers = broker.get_peers().unwrap(); + /// for peer in &peers { + /// println!("{:?}", peer); + /// } + /// ``` + /// + /// ## Get peers from a specific IP address + /// + /// ``` + /// let broker = bgpkit_broker::BgpkitBroker::new() + /// .peers_ip("192.168.1.1".parse().unwrap()); + /// let peers = broker.get_peers().unwrap(); + /// for peer in &peers { + /// println!("{:?}", peer); + /// } + /// ``` + /// + /// ## Get peers with full feed + /// + /// ``` + /// let broker = bgpkit_broker::BgpkitBroker::new() + /// .peers_only_full_feed(true); + /// let peers = broker.get_peers().unwrap(); + /// for peer in &peers { + /// println!("{:?}", peer); + /// } + /// ``` + /// + /// ## Get peers from a specific collector with full feed + /// + /// ``` + /// let broker = bgpkit_broker::BgpkitBroker::new() + /// .collector_id("route-views2") + /// .peers_only_full_feed(true); + /// let peers = broker.get_peers().unwrap(); + /// for peer in &peers { + /// println!("{:?}", peer); + /// } + /// ``` + pub fn get_peers(&self) -> Result, BrokerError> { + let mut url = format!("{}/peers", self.broker_url); + let mut param_strings = vec![]; + if let Some(ip) = &self.query_params.peers_ip { + param_strings.push(format!("ip={}", ip)); + } + if let Some(asn) = &self.query_params.peers_asn { + param_strings.push(format!("asn={}", asn)); + } + if self.query_params.peers_only_full_feed { + param_strings.push("full_feed=true".to_string()); + } + if let Some(collector_id) = &self.query_params.collector_id { + param_strings.push(format!("collector={}", collector_id)); + } + if !param_strings.is_empty() { + let param_string = param_strings.join("&"); + url = format!("{}?{}", url, param_string); + } + + let peers = match self.client.get(url.as_str()).send() { + Ok(response) => match response.json::() { + Ok(result) => result.data, + Err(_) => { + return Err(BrokerError::BrokerError( + "Error parsing response".to_string(), + )) + } + }, + Err(e) => { + return Err(BrokerError::BrokerError(format!( + "Unable to connect to the URL ({}): {}", + url, e + ))) + } + }; + Ok(peers) + } + + fn run_files_query(&self, url: &str) -> Result, BrokerError> { log::info!("sending broker query to {}", &url); match self.client.get(url).send() { - Ok(res) => { - match res.json::() { - Ok(res) => { - if let Some(e) = res.error { - Err(BrokerError::BrokerError(e)) - } else { - Ok(res.data) - } - } - Err(e) => { - // json decoding error. most likely the service returns an error message without - // `data` field. - Err(BrokerError::BrokerError(e.to_string())) + Ok(res) => match res.json::() { + Ok(res) => { + if let Some(e) = res.error { + Err(BrokerError::BrokerError(e)) + } else { + Ok(res.data) } } - } + Err(e) => { + // json decoding error. most likely the service returns an error message without + // `data` field. + Err(BrokerError::BrokerError(e.to_string())) + } + }, Err(e) => Err(BrokerError::from(e)), } } @@ -814,7 +982,7 @@ mod tests { #[test] fn test_latest_no_ssl() { - let broker = BgpkitBroker::new().disable_ssl_check(); + let broker = BgpkitBroker::new().accept_invalid_certs(); let items = broker.latest().unwrap(); assert!(items.len() >= 125); } @@ -825,4 +993,41 @@ mod tests { let res = broker.health_check(); assert!(res.is_ok()); } + + #[test] + fn test_peers() { + let broker = BgpkitBroker::new(); + let all_peers = broker.get_peers().unwrap(); + assert!(!all_peers.is_empty()); + let first_peer = all_peers.first().unwrap(); + let first_ip = first_peer.ip; + let first_asn = first_peer.asn; + + let broker = BgpkitBroker::new().peers_ip(first_ip); + let peers = broker.get_peers().unwrap(); + assert!(!peers.is_empty()); + + let broker = BgpkitBroker::new().peers_asn(first_asn); + let peers = broker.get_peers().unwrap(); + assert!(!peers.is_empty()); + + let broker = BgpkitBroker::new().peers_only_full_feed(true); + let full_feed_peers = broker.get_peers().unwrap(); + assert!(!full_feed_peers.is_empty()); + assert!(full_feed_peers.len() < all_peers.len()); + + let broker = BgpkitBroker::new().collector_id("rrc00"); + let rrc_peers = broker.get_peers().unwrap(); + assert!(!rrc_peers.is_empty()); + assert!(rrc_peers.iter().all(|peer| peer.collector == "rrc00")); + + let broker = BgpkitBroker::new().collector_id("rrc00,route-views2"); + let rrc_rv_peers = broker.get_peers().unwrap(); + assert!(!rrc_rv_peers.is_empty()); + assert!(rrc_rv_peers + .iter() + .any(|peer| peer.collector == "rrc00" || peer.collector == "route-views2")); + + assert!(rrc_rv_peers.len() > rrc_peers.len()); + } } diff --git a/src/peer.rs b/src/peer.rs new file mode 100644 index 0000000..32bfb71 --- /dev/null +++ b/src/peer.rs @@ -0,0 +1,30 @@ +use chrono::NaiveDate; +use serde::{Deserialize, Serialize}; +use std::net::IpAddr; + +/// MRT collector peer information +/// +/// Represents the information of an MRT collector peer. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[cfg_attr(feature = "cli", derive(tabled::Tabled))] +pub struct BrokerPeer { + /// The date of the latest available data. + pub date: NaiveDate, + /// The IP address of the collector peer. + pub ip: IpAddr, + /// The ASN (Autonomous System Number) of the collector peer. + pub asn: u32, + /// The name of the collector. + pub collector: String, + /// The number of IPv4 prefixes. + pub num_v4_pfxs: u32, + /// The number of IPv6 prefixes. + pub num_v6_pfxs: u32, + /// The number of connected ASNs. + pub num_connected_asns: u32, +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct BrokerPeersResult { + pub count: u32, + pub data: Vec, +} diff --git a/src/query.rs b/src/query.rs index c0a2f30..0a77a91 100644 --- a/src/query.rs +++ b/src/query.rs @@ -2,6 +2,7 @@ use crate::BrokerItem; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; +use std::net::IpAddr; /// QueryParams represents the query parameters to the backend API. /// @@ -41,6 +42,12 @@ pub struct QueryParams { pub page: i64, /// number of items each page contains, default to 10, max to 100000 pub page_size: i64, + /// collector peer IP address (for listing peers info) + pub peers_ip: Option, + /// collector peer ASN (for listing peers info) + pub peers_asn: Option, + /// collector peer full feed status (for listing peers info) + pub peers_only_full_feed: bool, } /// Sorting order enum @@ -63,6 +70,9 @@ impl Default for QueryParams { data_type: None, page: 1, page_size: 100, + peers_ip: None, + peers_asn: None, + peers_only_full_feed: false, } } } @@ -119,6 +129,7 @@ impl QueryParams { data_type: None, page: 1, page_size: 10, + ..Default::default() } } @@ -250,7 +261,7 @@ pub(crate) struct CollectorLatestResult { /// Query result struct that contains data or error message #[derive(Debug, Serialize, Deserialize)] -pub(crate) struct QueryResult { +pub(crate) struct BrokerQueryResult { /// number of items returned in **current** call pub count: Option, /// the page number of the current call @@ -263,7 +274,7 @@ pub(crate) struct QueryResult { pub data: Vec, } -impl Display for QueryResult { +impl Display for BrokerQueryResult { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", serde_json::to_string(self).unwrap()) } @@ -283,6 +294,7 @@ mod tests { data_type: None, page: 1, page_size: 20, + ..Default::default() }; assert_eq!( @@ -298,6 +310,7 @@ mod tests { data_type: None, page: 1, page_size: 20, + ..Default::default() }; assert_eq!("?page=1&page_size=20".to_string(), param.to_string());