From 17892b56c7dfd97043f4c7be8588c406e8cf765b Mon Sep 17 00:00:00 2001 From: Marco Kirchner Date: Tue, 9 Jan 2024 18:09:32 +0100 Subject: [PATCH] improve connection handling --- CHANGELOG | 4 +- src/kuma/client.rs | 311 ++++++++++++++++++++++++++------------------- src/kuma/error.rs | 11 +- src/kuma/models.rs | 11 ++ src/main.rs | 4 +- src/sync.rs | 16 ++- 6 files changed, 219 insertions(+), 138 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 3ad2fd6..c057ac0 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -17,10 +17,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- Increase default timeout for calls and connecting to 5s +- Increase default timeout for calls and connecting to 30s ### Fixed +- Support for authentication using credentials - Fix typo in env variable: AUTOKUMA__DOCKER__LABEL_PREFOX -> AUTOKUMA__DOCKER__LABEL_PREFIX +- Websocket connections were not being closed correctly so they would just accumulate over time ## [0.1.1] - 2024-01-07 diff --git a/src/kuma/client.rs b/src/kuma/client.rs index a5ac503..8d30f29 100644 --- a/src/kuma/client.rs +++ b/src/kuma/client.rs @@ -1,9 +1,11 @@ -use super::{Error, Event, Monitor, MonitorList, MonitorType, Result, Tag, TagDefinition}; +use super::{ + Error, Event, LoginResponse, Monitor, MonitorList, MonitorType, Result, Tag, TagDefinition, +}; use crate::config::Config; use crate::util::ResultLogger; use futures_util::FutureExt; use itertools::Itertools; -use log::{debug, warn}; +use log::{debug, trace, warn}; use rust_socketio::Payload; use rust_socketio::{ asynchronous::{Client as SocketIO, ClientBuilder}, @@ -16,33 +18,32 @@ use std::mem; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, Mutex}; - -type EventArgs = (Event, Value); -type Sender = mpsc::Sender; +use tokio::{runtime::Handle, sync::Mutex}; struct Worker { config: Arc, socket_io: Arc>>, - event_sender: Arc, tags: Arc>>, monitors: Arc>, + is_connected: Arc>, is_ready: Arc>, + is_logged_in: Arc>, } impl Worker { - fn new(config: Arc, event_sender: Sender) -> Self { - Worker { + fn new(config: Arc) -> Arc { + Arc::new(Worker { config: config, socket_io: Arc::new(Mutex::new(None)), - event_sender: (Arc::new(event_sender)), tags: Default::default(), monitors: Default::default(), + is_connected: Arc::new(Mutex::new(false)), is_ready: Arc::new(Mutex::new(false)), - } + is_logged_in: Arc::new(Mutex::new(false)), + }) } - async fn on_monitor_list(&self, monitor_list: MonitorList) -> Result<()> { + async fn on_monitor_list(self: &Arc, monitor_list: MonitorList) -> Result<()> { *self.monitors.lock().await = monitor_list; let tags = self.get_tags().await; @@ -52,10 +53,13 @@ impl Worker { Ok(()) } - async fn on_connect(&self) -> Result<()> { - if let (Some(username), Some(password)) = - (&self.config.kuma.username, &self.config.kuma.password) - { + async fn on_info(self: &Arc) -> Result<()> { + *self.is_connected.lock().await = true; + if let (Some(username), Some(password), true) = ( + &self.config.kuma.username, + &self.config.kuma.password, + !*self.is_logged_in.lock().await, + ) { self.login(username, password, self.config.kuma.mfa_token.clone()) .await?; } @@ -63,32 +67,41 @@ impl Worker { Ok(()) } - async fn on_event(&self, event: Event, payload: Value) -> Result<()> { + async fn on_auto_login(self: &Arc) -> Result<()> { + debug!("Logged in using AutoLogin!"); + *self.is_logged_in.lock().await = true; + Ok(()) + } + + async fn on_event(self: &Arc, event: Event, payload: Value) -> Result<()> { match event { Event::MonitorList => { self.on_monitor_list(serde_json::from_value(payload).unwrap()) .await? } - Event::Connect => self.on_connect().await?, + Event::Info => self.on_info().await?, + Event::AutoLogin => self.on_auto_login().await?, _ => {} } Ok(()) } - fn verify_response( + fn extract_response( response: Vec, result_ptr: impl AsRef, + verify: bool, ) -> Result { let json = json!(response); - if !json - .pointer("/0/0/ok") - .ok_or_else(|| { - Error::InvalidResponse(response.clone(), result_ptr.as_ref().to_owned()) - })? - .as_bool() - .unwrap_or_default() + if verify + && !json + .pointer("/0/0/ok") + .ok_or_else(|| { + Error::InvalidResponse(response.clone(), result_ptr.as_ref().to_owned()) + })? + .as_bool() + .unwrap_or_default() { let error_msg = json .pointer("/0/0/msg") @@ -104,15 +117,16 @@ impl Worker { .ok_or_else(|| Error::InvalidResponse(response, result_ptr.as_ref().to_owned())) } - async fn get_tags(&self) -> Result> { - self.call("getTags", vec![], "/tags").await + async fn get_tags(self: &Arc) -> Result> { + self.call("getTags", vec![], "/tags", true).await } async fn call( - &self, + self: &Arc, method: impl Into, args: A, result_ptr: impl Into, + verify: bool, ) -> Result where A: IntoIterator + Send + Clone, @@ -121,8 +135,8 @@ impl Worker { let method = method.into(); let result_ptr: String = result_ptr.into(); - let method = method.clone(); - let args = args.clone(); + let method_ref = method.clone(); + let args: A = args.clone(); let result_ptr = result_ptr.clone(); let (tx, mut rx) = tokio::sync::mpsc::channel::>(1); @@ -138,12 +152,14 @@ impl Worker { Payload::Text(args.into_iter().collect_vec()), Duration::from_secs_f64(self.config.kuma.call_timeout), move |message: Payload, _: SocketIO| { + debug!("call {} -> {:?}", method_ref, &message); let tx = tx.clone(); let result_ptr = result_ptr.clone(); async move { _ = match message { Payload::Text(response) => { - tx.send(Self::verify_response(response, result_ptr)).await + tx.send(Self::extract_response(response, result_ptr, verify)) + .await } _ => tx.send(Err(Error::UnsupportedResponse)).await, } @@ -154,45 +170,75 @@ impl Worker { .await .map_err(|e| Error::CommunicationError(e.to_string()))?; - tokio::time::timeout( - Duration::from_secs_f64(self.config.kuma.connect_timeout), + let result = tokio::time::timeout( + Duration::from_secs_f64(self.config.kuma.call_timeout), rx.recv(), ) .await .map_err(|_| Error::CallTimeout(method.clone()))? - .ok_or_else(|| Error::CallTimeout(method))? + .ok_or_else(|| Error::CallTimeout(method))?; + + result } pub async fn login( - &self, + self: &Arc, username: impl AsRef, password: impl AsRef, token: Option, - ) -> Result { - self.call( - "login", - vec![serde_json::to_value(HashMap::from([ - ("username", json!(username.as_ref())), - ("password", json!(password.as_ref())), - ("token", json!(token)), - ])) - .unwrap()], - "/ok", - ) - .await + ) -> Result<()> { + let result: Result = self + .call( + "login", + vec![serde_json::to_value(HashMap::from([ + ("username", json!(username.as_ref())), + ("password", json!(password.as_ref())), + ("token", json!(token)), + ])) + .unwrap()], + "", + false, + ) + .await; + + match result { + Ok(LoginResponse { ok: true, .. }) => { + debug!("Logged in as {}!", username.as_ref()); + *self.is_logged_in.lock().await = true; + Ok(()) + } + Ok(LoginResponse { + ok: false, + msg: Some(msg), + .. + }) => Err(Error::LoginError(msg)), + Err(e) => { + *self.is_logged_in.lock().await = false; + Err(e) + } + _ => { + *self.is_logged_in.lock().await = false; + Err(Error::LoginError("Unexpect login response".to_owned())) + } + } + .log_warn(|e| e.to_string()) } - pub async fn add_tag(&self, tag: TagDefinition) -> Result { - self.call( - "addTag", - vec![serde_json::to_value(tag.clone()).unwrap()], - "/tag", - ) - .await + pub async fn add_tag(self: &Arc, tag: &mut TagDefinition) -> Result<()> { + *tag = self + .call( + "addTag", + vec![serde_json::to_value(tag.clone()).unwrap()], + "/tag", + true, + ) + .await?; + + Ok(()) } pub async fn add_monitor_tag( - &self, + self: &Arc, monitor_id: i32, tag_id: i32, value: Option, @@ -206,6 +252,7 @@ impl Worker { json!(value.unwrap_or_default()), ], "/ok", + true, ) .await?; @@ -213,7 +260,7 @@ impl Worker { } pub async fn edit_monitor_tag( - &self, + self: &Arc, monitor_id: i32, tag_id: i32, value: Option, @@ -227,6 +274,7 @@ impl Worker { json!(value.unwrap_or_default()), ], "/ok", + true, ) .await?; @@ -234,7 +282,7 @@ impl Worker { } pub async fn delete_monitor_tag( - &self, + self: &Arc, monitor_id: i32, tag_id: i32, value: Option, @@ -248,21 +296,22 @@ impl Worker { json!(value.unwrap_or_default()), ], "/ok", + true, ) .await?; Ok(()) } - pub async fn delete_monitor(&self, monitor_id: i32) -> Result<()> { + pub async fn delete_monitor(self: &Arc, monitor_id: i32) -> Result<()> { let _: bool = self - .call("deleteMonitor", vec![json!(monitor_id)], "/ok") + .call("deleteMonitor", vec![json!(monitor_id)], "/ok", true) .await?; Ok(()) } - async fn resolve_group(&self, monitor: &mut Monitor) -> Result { + async fn resolve_group(self: &Arc, monitor: &mut Monitor) -> Result<()> { if let Some(group_name) = monitor.common().parent_name.clone() { monitor.common_mut().parent_name = None; @@ -285,15 +334,15 @@ impl Worker { { monitor.common_mut().parent = Some(group_id); } else { - return Ok(false); + return Err(Error::GroupNotFound(group_name)); } } else { monitor.common_mut().parent = None; } - return Ok(true); + return Ok(()); } - async fn update_monitor_tags(&self, monitor_id: i32, tags: &Vec) -> Result<()> { + async fn update_monitor_tags(self: &Arc, monitor_id: i32, tags: &Vec) -> Result<()> { let new_tags = tags .iter() .filter_map(|tag| tag.tag_id.and_then(|id| Some((id, tag)))) @@ -366,52 +415,47 @@ impl Worker { Ok(()) } - pub async fn add_monitor(&self, mut monitor: Monitor) -> Result { - let mut tags = vec![]; - mem::swap(&mut tags, &mut monitor.common_mut().tags); + pub async fn add_monitor(self: &Arc, monitor: &mut Monitor) -> Result<()> { + self.resolve_group(monitor).await?; - if !self.resolve_group(&mut monitor).await? { - return Ok(monitor); - } - - if let Some(notifications) = &mut monitor.common_mut().notification_id_list { - notifications.clear(); - } + let tags = mem::take(&mut monitor.common_mut().tags); + let notifications = mem::take(&mut monitor.common_mut().notification_id_list); let id: i32 = self + .clone() .call( "add", vec![serde_json::to_value(&monitor).unwrap()], "/monitorID", + true, ) .await?; - self.update_monitor_tags(id, &tags).await?; - monitor.common_mut().id = Some(id); + monitor.common_mut().notification_id_list = notifications; monitor.common_mut().tags = tags; + self.edit_monitor(monitor).await?; + self.monitors .lock() .await .insert(id.to_string(), monitor.clone()); - Ok(monitor) + Ok(()) } - pub async fn edit_monitor(&self, mut monitor: Monitor) -> Result { - if !self.resolve_group(&mut monitor).await? { - return Ok(monitor); - } + pub async fn edit_monitor(self: &Arc, monitor: &mut Monitor) -> Result<()> { + self.resolve_group(monitor).await?; - let mut tags = vec![]; - mem::swap(&mut tags, &mut monitor.common_mut().tags); + let tags = mem::take(&mut monitor.common_mut().tags); let id: i32 = self .call( "editMonitor", vec![serde_json::to_value(&monitor).unwrap()], "/monitorID", + true, ) .await?; @@ -419,14 +463,14 @@ impl Worker { monitor.common_mut().tags = tags; - Ok(monitor) + Ok(()) } - pub async fn connect(&self) -> Result<()> { + pub async fn connect(self: &Arc) -> Result<()> { *self.is_ready.lock().await = false; + *self.is_logged_in.lock().await = false; *self.socket_io.lock().await = None; - let callback_tx = self.event_sender.clone(); let mut builder = ClientBuilder::new(self.config.kuma.url.clone()) .transport_type(rust_socketio::TransportType::Websocket); @@ -440,9 +484,13 @@ impl Worker { builder = builder.opening_header(key, value); } + let handle = Handle::current(); + let self_ref = self.to_owned(); let client = builder .on_any(move |event, payload, _| { - let callback_tx = callback_tx.clone(); + let handle = handle.clone(); + let self_ref: Arc = self_ref.clone(); + trace!("Client::on_any({:?}, {:?})", &event, &payload); async move { match (event, payload) { (SocketIOEvent::Message, Payload::Text(params)) => { @@ -452,20 +500,30 @@ impl Worker { .log_warn(|| "Error while deserializing Event...") .unwrap_or(""), ) { - callback_tx - .send((e, json!(null))) - .await - .log_warn(|_| "Error while sending Message event") - .unwrap(); + // tx.send((e, json!(null))).unwrap(); + handle.clone().spawn(async move { + _ = self_ref.clone().on_event(e, json!(null)).await.log_warn( + |e| { + format!( + "Error while sending message event: {}", + e.to_string() + ) + }, + ); + }); } } (event, Payload::Text(params)) => { if let Ok(e) = Event::from_str(&String::from(event)) { - callback_tx - .send((e, params.into_iter().next().unwrap())) - .await - .log_warn(|_| "Error while sending event") - .unwrap(); + handle.clone().spawn(async move { + _ = self_ref + .clone() + .on_event(e, params.into_iter().next().unwrap()) + .await + .log_warn(|e| { + format!("Error while sending event: {}", e.to_string()) + }); + }); } } _ => {} @@ -478,10 +536,7 @@ impl Worker { .log_error(|_| "Error during connect") .ok(); - self.event_sender - .send((Event::Connect, json!(null))) - .await - .ok(); + debug!("Waiting for connection"); *self.socket_io.lock().await = client; @@ -490,23 +545,28 @@ impl Worker { debug!("Connected!"); return Ok(()); } + debug!("Waiting for Kuma to get ready..."); tokio::time::sleep(Duration::from_millis(200 * i)).await; } warn!("Timeout while waiting for Kuma to get ready..."); - Err(Error::ConnectionTimeout) - } - pub async fn disconnect(&self) -> Result<()> { - let mut lock = self.socket_io.lock().await; - if let Some(socket_io) = &*lock { - socket_io - .disconnect() - .await - .map_err(|e| Error::CommunicationError(e.to_string()))?; + match *self.is_connected.lock().await { + true => Err(Error::NotAuthenticated), + false => Err(Error::ConnectionTimeout), } + } - *lock = None; + pub async fn disconnect(self: &Arc) -> Result<()> { + let self_ref = self.to_owned(); + tokio::spawn(async move { + let socket_io = self_ref.socket_io.lock().await; + if let Some(socket_io) = &*socket_io { + _ = socket_io.disconnect().await; + } + drop(socket_io); + debug!("Connection closed!"); + }); Ok(()) } @@ -518,19 +578,7 @@ pub struct Client { impl Client { pub async fn connect(config: Arc) -> Result { - let (tx, mut rx) = mpsc::channel::(100); - - let worker = Arc::new(Worker::new(config, tx)); - - let worker_ref = worker.clone(); - tokio::spawn(async move { - while let Some((event, payload)) = rx.recv().await { - if let Err(err) = worker_ref.on_event(event, payload).await { - println!("{:?}", err); - }; - } - }); - + let worker = Worker::new(config); worker.connect().await?; Ok(Self { worker }) @@ -550,16 +598,19 @@ impl Client { } } - pub async fn add_tag(&self, tag: TagDefinition) -> Result { - self.worker.add_tag(tag).await + pub async fn add_tag(&self, mut tag: TagDefinition) -> Result { + self.worker.add_tag(&mut tag).await?; + Ok(tag) } - pub async fn add_monitor(&self, monitor: Monitor) -> Result { - self.worker.add_monitor(monitor).await + pub async fn add_monitor(&self, mut monitor: Monitor) -> Result { + self.worker.add_monitor(&mut monitor).await?; + Ok(monitor) } - pub async fn edit_monitor(&self, monitor: Monitor) -> Result { - self.worker.edit_monitor(monitor).await + pub async fn edit_monitor(&self, mut monitor: Monitor) -> Result { + self.worker.edit_monitor(&mut monitor).await?; + Ok(monitor) } pub async fn delete_monitor(&self, monitor_id: i32) -> Result<()> { diff --git a/src/kuma/error.rs b/src/kuma/error.rs index 7055546..6fed479 100644 --- a/src/kuma/error.rs +++ b/src/kuma/error.rs @@ -5,12 +5,18 @@ pub enum Error { #[error("Timeout while trying to connect to Uptime Kuma server")] ConnectionTimeout, - #[error("Timeout while trying to call '{0}'")] + #[error("Timeout while trying to call '{0}'.")] CallTimeout(String), #[error("Tried to access Uptime Kuma state before it was ready...")] NotReady, + #[error("The server rejected the login: {0}")] + LoginError(String), + + #[error("It looks like the server is expecting a username/password, but none was provided")] + NotAuthenticated, + #[error("Connection to Uptime Kuma was lost")] Disconnected, @@ -28,6 +34,9 @@ pub enum Error { #[error("Encountered Errors trying to validate '{0}': {1:?}")] ValidationError(String, Vec), + + #[error("No group named {0} could be found")] + GroupNotFound(String), } pub type Result = std::result::Result; diff --git a/src/kuma/models.rs b/src/kuma/models.rs index 6bbf521..c289d02 100644 --- a/src/kuma/models.rs +++ b/src/kuma/models.rs @@ -968,4 +968,15 @@ impl Monitor { } } +#[skip_serializing_none] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct LoginResponse { + #[serde(rename = "ok")] + pub ok: bool, + #[serde(rename = "msg")] + pub msg: Option, + #[serde(rename = "token")] + pub token: Option, +} + pub type MonitorList = HashMap; diff --git a/src/main.rs b/src/main.rs index 14a7150..f1d8567 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,7 @@ +use crate::util::ResultOrDie; use std::sync::Arc; - use util::ResultLogger; -use crate::util::ResultOrDie; - mod config; mod error; mod kuma; diff --git a/src/sync.rs b/src/sync.rs index 3af5573..96df52d 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,7 +1,7 @@ use crate::{ config::Config, error::{Error, Result}, - kuma::{Client, Monitor, MonitorType, Tag, TagDefinition}, + kuma::{self, Client, Monitor, MonitorType, Tag, TagDefinition}, util::{self, group_by_prefix, ResultLogger}, }; use bollard::{ @@ -385,7 +385,17 @@ impl Sync { tag.value = Some(id.clone()); monitor.common_mut().tags.push(tag); - kuma.add_monitor(monitor).await?; + match kuma.add_monitor(monitor).await { + Ok(_) => Ok(()), + Err(kuma::Error::GroupNotFound(group)) => { + warn!( + "Cannot create monitor {} because group {} does not exist", + id, group + ); + Ok(()) + } + Err(err) => Err(err), + }?; } for (id, current, new) in to_update { @@ -422,7 +432,7 @@ impl Sync { pub async fn run(&self) { loop { if let Err(err) = self.do_sync().await { - warn!("Encountered error during sync:\n{}", err); + warn!("Encountered error during sync: {}", err); } tokio::time::sleep(Duration::from_secs(5)).await; }