Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new-file-notification via NATS #44

Merged
merged 8 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 29 additions & 23 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ A library and command-line to provide indexing and searching functionalities for
keywords = ["bgp", "bgpkit", "api"]

[[bin]]
path= "src/cli/main.rs"
name="bgpkit-broker"
path = "src/cli/main.rs"
name = "bgpkit-broker"
required-features = ["cli"]

[dependencies]
Expand All @@ -23,9 +23,9 @@ required-features = ["cli"]
# Core Broker Rust SDK dependencies
#############################################
chrono = { version = "0.4", features = ["serde"] }
log="0.4"
reqwest = {version = "0.11.17", default-features = false, features = ["blocking", "json", "stream", "rustls-tls-native-roots"]}
serde={version="1", features = ["derive"]}
log = "0.4"
reqwest = { version = "0.11.17", default-features = false, features = ["blocking", "json", "stream", "rustls-tls-native-roots"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1.0"
tracing = "0.1"
Expand All @@ -35,45 +35,48 @@ tracing = "0.1"
#############################################

# command-line interface dependencies
clap = {version= "4.3", features=["derive"], optional=true}
dirs = {version="5", optional=true}
humantime = {version="2.1", optional = true}
num_cpus = {version="1.15", optional=true}
tabled = {version = "0.13", optional = true}
tracing-subscriber = {version="0.3", optional=true}
clap = { version = "4.3", features = ["derive"], optional = true }
dirs = { version = "5", optional = true }
humantime = { version = "2.1", optional = true }
num_cpus = { version = "1.15", optional = true }
tabled = { version = "0.13", optional = true }
tracing-subscriber = { version = "0.3", optional = true }
indicatif = { version = "0.17.7", optional = true }
futures-util = { version = "0.3.28", optional = true }
itertools = { version = "0.12.0" , optional = true }
dotenvy = { version = "0.15" , optional = true }
itertools = { version = "0.12.0", optional = true }
dotenvy = { version = "0.15", optional = true }
tempfile = { version = "3.8", optional = true }
which = { version = "5.0", optional = true }

# crawler dependencies
futures = {version="0.3", optional = true}
oneio = {version="0.16.0", features = ["s3"], optional = true}
futures = { version = "0.3", optional = true }
oneio = { version = "0.16.0", features = ["s3"], optional = true }
regex = { version = "1", optional = true }
scraper = { version = "0.17", optional = true }
tokio = {version="1", optional = true, features = ["full"] }
lazy_static = {version="1", optional = true}
tokio = { version = "1", optional = true, features = ["full"] }
lazy_static = { version = "1", optional = true }

# api dependencies
axum = {version = "0.7", optional = true}
tower-http = {version = "0.5", optional = true, features = ["cors"]}
http = {version="1.0", optional = true}
utoipa = {version = "4", optional = true, features = ["axum_extras", "chrono"]}
axum = { version = "0.7", optional = true }
tower-http = { version = "0.5", optional = true, features = ["cors"] }
http = { version = "1.0", optional = true }
utoipa = { version = "4", optional = true, features = ["axum_extras", "chrono"] }
utoipa-swagger-ui = { version = "5", optional = true, features = ["axum"] }
axum-prometheus = { version = "0.5.0", optional = true }

# database dependencies
sqlx = { version = "0.7", features = [ "runtime-tokio", "sqlite"], optional = true }
sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"], optional = true }
async-nats = { version = "0.34.0", optional = true }

[features]
default=[]
default = []
cli = [
# command-line interface
"clap", "dirs", "humantime", "num_cpus", "tracing-subscriber", "tabled", "itertools", "dotenvy", "tempfile", "which",
# crawler
"futures", "oneio", "regex", "scraper", "tokio", "lazy_static",
# notification
"nats",
# database
"backend",
# bootstrap
Expand All @@ -85,6 +88,9 @@ backend = [
"tokio", "sqlx",
]

# notification features
nats = ["async-nats"]

[dev-dependencies]
tracing-subscriber = "0.3.17"

Expand Down
12 changes: 8 additions & 4 deletions src/cli/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ async fn search(
.await
.unwrap()
.map(|data| Meta {
latest_update_ts: chrono::NaiveDateTime::from_timestamp_opt(data.update_ts, 0).unwrap(),
latest_update_ts: chrono::DateTime::from_timestamp(data.update_ts, 0)
.unwrap()
.naive_utc(),
latest_update_duration: data.update_duration,
});

Expand Down Expand Up @@ -226,7 +228,9 @@ async fn latest(State(state): State<Arc<AppState>>) -> impl IntoResponse {
.await
.unwrap()
.map(|data| Meta {
latest_update_ts: chrono::NaiveDateTime::from_timestamp_opt(data.update_ts, 0).unwrap(),
latest_update_ts: chrono::DateTime::from_timestamp(data.update_ts, 0)
.unwrap()
.naive_utc(),
latest_update_duration: data.update_duration,
});

Expand Down Expand Up @@ -267,7 +271,7 @@ async fn health(State(state): State<Arc<AppState>>) -> impl IntoResponse {
// that does not require fresh data (e.g. historical analysis).
Json(
json!({"status": "OK", "message": "database is healthy", "meta": {
"latest_file_ts": ts.timestamp(),
"latest_file_ts": ts.and_utc().timestamp(),
}}),
)
.into_response()
Expand All @@ -289,7 +293,7 @@ async fn health(State(state): State<Arc<AppState>>) -> impl IntoResponse {
fn parse_time_str(ts_str: &str) -> Result<NaiveDateTime, String> {
let ts = if let Ok(ts_end) = ts_str.parse::<i64>() {
// it's unix timestamp
NaiveDateTime::from_timestamp_opt(ts_end, 0).unwrap()
DateTime::from_timestamp(ts_end, 0).unwrap().naive_utc()
} else {
let ts_str = ts_str.trim_end_matches('Z').to_string() + "+00:00";
match DateTime::parse_from_rfc3339(ts_str.as_str()) {
Expand Down
81 changes: 73 additions & 8 deletions src/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod bootstrap;
use crate::api::{start_api_service, BrokerSearchQuery};
use crate::backup::backup_database;
use crate::bootstrap::download_file;
use bgpkit_broker::notifier::NatsNotifier;
use bgpkit_broker::{
crawl_collector, load_collectors, BgpkitBroker, Collector, LocalBrokerDb, DEFAULT_PAGE_SIZE,
};
Expand Down Expand Up @@ -179,6 +180,22 @@ enum Commands {
#[clap(short, long)]
json: bool,
},

/// Streaming live from a broker NATS server
Live {
/// URL to NATS server, e.g. nats://localhost:4222.
/// If not specified, will try to read from BGPKIT_BROKER_NATS_URL env variable.
#[clap(short, long)]
url: Option<String>,

/// Subject to subscribe to, default to public.broker.>
#[clap(short, long)]
subject: Option<String>,

/// Pretty print JSON output
#[clap(short, long)]
pretty: bool,
},
}

fn min_update_interval_check(s: &str) -> Result<u64, String> {
Expand All @@ -203,17 +220,27 @@ fn get_tokio_runtime() -> Runtime {
}

/// update the database with data crawled from the given collectors
async fn update_database(db: LocalBrokerDb, collectors: Vec<Collector>, days: Option<u32>) {
async fn update_database(
db: LocalBrokerDb,
collectors: Vec<Collector>,
days: Option<u32>,
notify: bool,
) {
let notifier = match notify {
true => NatsNotifier::new(None).await.ok(),
false => None,
};

let now = Utc::now();
let latest_date;
if let Some(d) = days {
// if days is specified, we crawl data from d days ago
latest_date = Some(Utc::now().date_naive() - chrono::Duration::days(d as i64));
latest_date = Some(Utc::now().date_naive() - Duration::days(d as i64));
} else {
// otherwise, we crawl data from the latest timestamp in the database
latest_date = match { db.get_latest_timestamp().await.unwrap().map(|t| t.date()) } {
latest_date = match db.get_latest_timestamp().await.unwrap().map(|t| t.date()) {
Some(t) => {
let start_date = t - chrono::Duration::days(1);
let start_date = t - Duration::days(1);
info!(
"update broker db from the latest date - 1 in db: {}",
start_date
Expand All @@ -222,7 +249,7 @@ async fn update_database(db: LocalBrokerDb, collectors: Vec<Collector>, days: Op
}
None => {
// if bootstrap is false and we have an empty database we crawl data from 30 days ago
let date = Utc::now().date_naive() - chrono::Duration::days(30);
let date = Utc::now().date_naive() - Duration::days(30);
info!(
"empty database, bootstrapping data from {} days ago ({})",
30, date
Expand Down Expand Up @@ -252,10 +279,17 @@ async fn update_database(db: LocalBrokerDb, collectors: Vec<Collector>, days: Op
match res {
Ok(items) => {
let inserted = db.insert_items(&items, true).await.unwrap();
if !inserted.is_empty() {
if let Some(n) = &notifier {
if let Err(e) = n.send(&inserted).await {
error!("{}", e);
}
}
}
total_inserted_count += inserted.len();
}
Err(e) => {
dbg!(e);
error!("{}", e);
continue;
}
}
Expand Down Expand Up @@ -350,7 +384,7 @@ fn main() {
loop {
interval.tick().await;
// updating from the latest data available
update_database(db.clone(), collectors.clone(), None).await;
update_database(db.clone(), collectors.clone(), None, true).await;
info!("wait for {} seconds before next update", update_interval);
}
});
Expand Down Expand Up @@ -464,7 +498,7 @@ fn main() {

rt.block_on(async {
let db = LocalBrokerDb::new(&db_path).await.unwrap();
update_database(db, collectors, days).await;
update_database(db, collectors, days, true).await;
});
}
Commands::Search { query, json, url } => {
Expand Down Expand Up @@ -555,5 +589,36 @@ fn main() {
println!("{}", Table::new(items).with(Style::markdown()));
}
}
Commands::Live {
url,
subject,
pretty,
} => {
dotenvy::dotenv().ok();
if do_log {
enable_logging();
}
let rt = get_tokio_runtime();
rt.block_on(async {
let mut notifier = match NatsNotifier::new(url).await {
Ok(n) => n,
Err(e) => {
error!("{}", e);
return;
}
};
if let Err(e) = notifier.start_subscription(subject).await {
error!("{}", e);
return;
}
while let Some(item) = notifier.next().await {
if pretty {
println!("{}", serde_json::to_string_pretty(&item).unwrap());
} else {
println!("{}", item);
}
}
});
}
}
}
56 changes: 28 additions & 28 deletions src/crawler/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,34 @@ pub(crate) fn remove_trailing_slash(s: impl ToString) -> String {
s
}

pub(crate) async fn crawl_months_list(
collector_root_url: &str,
from_month: Option<NaiveDate>,
) -> Result<Vec<NaiveDate>, BrokerError> {
let rounded_month =
from_month.map(|d| NaiveDate::from_ymd_opt(d.year(), d.month(), 1).unwrap());

let month_link_pattern: Regex = Regex::new(r#"<a href="(....\...)/">.*"#).unwrap();
let body = reqwest::get(collector_root_url).await?.text().await?;
let mut res = vec![];
for cap in month_link_pattern.captures_iter(body.as_str()) {
let month = cap[1].to_owned();
let parsed_month =
NaiveDate::parse_from_str(format!("{}.01", month.as_str()).as_str(), "%Y.%m.%d")?;
if let Some(rounded) = rounded_month {
let new_month = NaiveDate::from_ymd_opt(rounded.year(), rounded.month(), 1).unwrap();
if parsed_month < new_month {
continue;
}
}
if parsed_month > Utc::now().naive_utc().date() {
continue;
}
res.push(parsed_month);
}
Ok(res)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -173,31 +201,3 @@ mod tests {
assert_eq!(res.len(), 4);
}
}

pub(crate) async fn crawl_months_list(
collector_root_url: &str,
from_month: Option<NaiveDate>,
) -> Result<Vec<NaiveDate>, BrokerError> {
let rounded_month =
from_month.map(|d| NaiveDate::from_ymd_opt(d.year(), d.month(), 1).unwrap());

let month_link_pattern: Regex = Regex::new(r#"<a href="(....\...)/">.*"#).unwrap();
let body = reqwest::get(collector_root_url).await?.text().await?;
let mut res = vec![];
for cap in month_link_pattern.captures_iter(body.as_str()) {
let month = cap[1].to_owned();
let parsed_month =
NaiveDate::parse_from_str(format!("{}.01", month.as_str()).as_str(), "%Y.%m.%d")?;
if let Some(rounded) = rounded_month {
let new_month = NaiveDate::from_ymd_opt(rounded.year(), rounded.month(), 1).unwrap();
if parsed_month < new_month {
continue;
}
}
if parsed_month > Utc::now().naive_utc().date() {
continue;
}
res.push(parsed_month);
}
Ok(res)
}
Loading
Loading