diff --git a/aquadoggo/src/api/api.rs b/aquadoggo/src/api/api.rs index 247c72bda..5e697ac2a 100644 --- a/aquadoggo/src/api/api.rs +++ b/aquadoggo/src/api/api.rs @@ -1,11 +1,18 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use anyhow::{bail, Result}; +use tokio::sync::mpsc::Receiver; use crate::api::{migrate, LockFile}; use crate::bus::{ServiceMessage, ServiceSender}; use crate::context::Context; +#[derive(Debug, Clone)] +pub enum NodeEvent { + PeerConnected, + PeerDisconnected, +} + /// Interface to interact with the node in a programmatic, "low-level" way. #[derive(Debug)] pub struct NodeInterface { @@ -42,4 +49,26 @@ impl NodeInterface { Ok(did_migration_happen) } + + pub async fn subscribe(&self) -> Receiver { + let mut rx = self.tx.subscribe(); + let (events_tx, events_rx) = tokio::sync::mpsc::channel::(256); + + tokio::task::spawn(async move { + loop { + match rx.recv().await { + Ok(ServiceMessage::PeerConnected(_)) => { + let _ = events_tx.send(NodeEvent::PeerConnected).await; + } + Ok(ServiceMessage::PeerDisconnected(_)) => { + let _ = events_tx.send(NodeEvent::PeerDisconnected).await; + } + Ok(_) => continue, + Err(_) => break, + } + } + }); + + events_rx + } } diff --git a/aquadoggo/src/api/mod.rs b/aquadoggo/src/api/mod.rs index 89c255bd3..0e968329c 100644 --- a/aquadoggo/src/api/mod.rs +++ b/aquadoggo/src/api/mod.rs @@ -6,7 +6,7 @@ mod config_file; mod lock_file; mod migration; -pub use api::NodeInterface; +pub use api::{NodeEvent, NodeInterface}; pub use config_file::ConfigFile; pub use lock_file::LockFile; pub use migration::migrate; diff --git a/aquadoggo/src/node.rs b/aquadoggo/src/node.rs index ac866c9b9..f36ffac27 100644 --- a/aquadoggo/src/node.rs +++ b/aquadoggo/src/node.rs @@ -2,8 +2,9 @@ use anyhow::Result; use p2panda_rs::identity::KeyPair; +use tokio::sync::mpsc::Receiver; -use crate::api::NodeInterface; +use crate::api::{NodeEvent, NodeInterface}; use crate::bus::ServiceMessage; use crate::config::Configuration; use crate::context::Context; @@ -131,4 +132,10 @@ impl Node { pub async fn migrate(&self, lock_file: LockFile) -> Result { self.api.migrate(lock_file).await } + + /// Subscribe to channel reporting on significant node events which can be interesting for + /// clients, for example when peers connect or disconnect. + pub async fn subscribe(&self) -> Receiver { + self.api.subscribe().await + } }