From d053349131f3a7bab11a2d52a5f84b084acb38ec Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Wed, 13 Sep 2023 18:06:25 -0700 Subject: [PATCH 1/4] feat: Send push notifications for expired and expiring positions Push notifications are sent in the following time windows: 1. Expiring positions - [EXPIRING_THRESHOLD, now] 2. Expired positions: - [now, EXPIRED_THRESHOLD] Here both of the thresholds are set to 1h. - Add unit tests for the logic determining which notification to send. - periodically load all relevant positions with their fcm_tokens from the DB and attempt to deliver notifications Notification task runs every 30 mins, so that the user would get 2 of each. This way they'll likely get it at least 30 mins before the expiry. --- coordinator/src/bin/coordinator.rs | 44 ++- coordinator/src/db/positions.rs | 18 + coordinator/src/lib.rs | 4 +- coordinator/src/logger.rs | 13 + .../src/{notification/mod.rs => message.rs} | 26 +- coordinator/src/node/rollover.rs | 9 +- coordinator/src/notification_service.rs | 132 ------- coordinator/src/notifications.rs | 348 ++++++++++++++++++ coordinator/src/orderbook/async_match.rs | 10 +- coordinator/src/orderbook/tests/mod.rs | 12 - .../src/orderbook/tests/registration_test.rs | 4 +- .../src/orderbook/tests/sample_test.rs | 6 +- coordinator/src/orderbook/trading.rs | 8 +- coordinator/src/orderbook/websocket.rs | 2 +- coordinator/src/position/models.rs | 6 +- coordinator/src/routes.rs | 2 +- 16 files changed, 449 insertions(+), 195 deletions(-) rename coordinator/src/{notification/mod.rs => message.rs} (71%) delete mode 100644 coordinator/src/notification_service.rs create mode 100644 coordinator/src/notifications.rs diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 706ad0436..76641b0fb 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -2,6 +2,8 @@ use anyhow::Context; use anyhow::Result; use coordinator::cli::Opts; use coordinator::logger; +use coordinator::message::spawn_delivering_messages_to_authenticated_users; +use coordinator::message::NewUserMessage; use coordinator::metrics; use coordinator::metrics::init_meter; use coordinator::node; @@ -12,9 +14,8 @@ use coordinator::node::rollover; use coordinator::node::storage::NodeStorage; use coordinator::node::unrealized_pnl; use coordinator::node::Node; -use coordinator::notification; -use coordinator::notification::NewUserMessage; -use coordinator::notification_service::NotificationService; +use coordinator::notifications::query_and_send_position_notifications; +use coordinator::notifications::NotificationService; use coordinator::orderbook::async_match; use coordinator::orderbook::trading; use coordinator::routes::router; @@ -42,10 +43,11 @@ use tracing::metadata::LevelFilter; const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10); const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_millis(200); -const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(300); +const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(5 * 60); const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30); -const UNREALIZED_PNL_SYNC_INTERVAL: Duration = Duration::from_secs(600); +const UNREALIZED_PNL_SYNC_INTERVAL: Duration = Duration::from_secs(10 * 60); const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30); +const EXPIRED_POSITION_NOTIFICATION_INTERVAL: Duration = Duration::from_secs(30 * 60); const NODE_ALIAS: &str = "10101.finance"; @@ -201,7 +203,10 @@ async fn main() -> Result<()> { let (tx_user_feed, _rx) = broadcast::channel::(100); let (tx_price_feed, _rx) = broadcast::channel(100); - let (_handle, notifier) = notification::start(tx_user_feed.clone()); + let (_handle, auth_users_notifier) = + spawn_delivering_messages_to_authenticated_users(tx_user_feed.clone()); + + let notification_service = NotificationService::new(opts.fcm_api_key.clone()); let (_handle, trading_sender) = trading::start( pool.clone(), @@ -216,7 +221,7 @@ async fn main() -> Result<()> { notifier.clone(), network, ); - let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier, network); + let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier); tokio::spawn({ let node = node.clone(); @@ -251,7 +256,7 @@ async fn main() -> Result<()> { let app = router( node, - pool, + pool.clone(), settings, exporter, opts.p2p_announcement_addresses(), @@ -263,7 +268,28 @@ async fn main() -> Result<()> { let notification_service = NotificationService::new(opts.fcm_api_key); - let _sender = notification_service.get_sender(); + tokio::spawn({ + let sender = notification_service.get_sender(); + let pool = pool.clone(); + async move { + loop { + match pool.get() { + Ok(mut conn) => { + if let Err(e) = + query_and_send_position_notifications(&mut conn, &sender).await + { + tracing::error!("Failed to send notifications: {e:#}"); + } + } + Err(e) => { + tracing::error!("Failed to get pool connection. Error: {e:?}"); + } + } + + tokio::time::sleep(EXPIRED_POSITION_NOTIFICATION_INTERVAL).await; + } + } + }); // Start the metrics exporter autometrics::prometheus_exporter::init(); diff --git a/coordinator/src/db/positions.rs b/coordinator/src/db/positions.rs index 7b4fadcdc..47ef64efa 100644 --- a/coordinator/src/db/positions.rs +++ b/coordinator/src/db/positions.rs @@ -67,6 +67,24 @@ impl Position { Ok(positions) } + pub fn get_all_positions_with_expiry_within( + conn: &mut PgConnection, + start: OffsetDateTime, + end: OffsetDateTime, + ) -> QueryResult> { + let positions = positions::table + .filter(positions::expiry_timestamp.gt(start)) + .filter(positions::expiry_timestamp.lt(end)) + .load::(conn)?; + + let positions = positions + .into_iter() + .map(crate::position::models::Position::from) + .collect(); + + Ok(positions) + } + pub fn get_all_open_or_closing_positions( conn: &mut PgConnection, ) -> QueryResult> { diff --git a/coordinator/src/lib.rs b/coordinator/src/lib.rs index 1fad8a83d..b5304fb9e 100644 --- a/coordinator/src/lib.rs +++ b/coordinator/src/lib.rs @@ -12,10 +12,10 @@ pub mod admin; pub mod cli; pub mod db; pub mod logger; +pub mod message; pub mod metrics; pub mod node; -pub mod notification; -pub mod notification_service; +pub mod notifications; pub mod orderbook; pub mod position; pub mod routes; diff --git a/coordinator/src/logger.rs b/coordinator/src/logger.rs index c4c785584..074a077f7 100644 --- a/coordinator/src/logger.rs +++ b/coordinator/src/logger.rs @@ -87,3 +87,16 @@ pub fn init_tracing(level: LevelFilter, json_format: bool, tokio_console: bool) Ok(()) } + +/// Initialise tracing for tests +#[cfg(test)] +pub(crate) fn init_tracing_for_test() { + static TRACING_TEST_SUBSCRIBER: std::sync::Once = std::sync::Once::new(); + + TRACING_TEST_SUBSCRIBER.call_once(|| { + tracing_subscriber::fmt() + .with_env_filter("debug") + .with_test_writer() + .init() + }) +} diff --git a/coordinator/src/notification/mod.rs b/coordinator/src/message.rs similarity index 71% rename from coordinator/src/notification/mod.rs rename to coordinator/src/message.rs index 2e5092b4d..8bfb59d74 100644 --- a/coordinator/src/notification/mod.rs +++ b/coordinator/src/message.rs @@ -3,19 +3,19 @@ use bitcoin::secp256k1::PublicKey; use futures::future::RemoteHandle; use futures::FutureExt; use orderbook_commons::Message; +use parking_lot::RwLock; use std::collections::HashMap; use std::sync::Arc; -use std::sync::RwLock; use tokio::sync::broadcast; use tokio::sync::mpsc; -/// This value is arbitrarily set to 100 and defines the message accepted in the notification +/// This value is arbitrarily set to 100 and defines the message accepted in the message /// channel buffer. const NOTIFICATION_BUFFER_SIZE: usize = 100; -// TODO(holzeis): This enum should be extended to allow for sending push notifications. -pub enum Notification { - Message { +/// Message sent to users via the websocket. +pub enum OrderbookMessage { + TraderMessage { trader_id: PublicKey, message: Message, }, @@ -27,10 +27,10 @@ pub struct NewUserMessage { pub sender: mpsc::Sender, } -pub fn start( +pub fn spawn_delivering_messages_to_authenticated_users( tx_user_feed: broadcast::Sender, -) -> (RemoteHandle>, mpsc::Sender) { - let (sender, mut receiver) = mpsc::channel::(NOTIFICATION_BUFFER_SIZE); +) -> (RemoteHandle>, mpsc::Sender) { + let (sender, mut receiver) = mpsc::channel::(NOTIFICATION_BUFFER_SIZE); let authenticated_users = Arc::new(RwLock::new(HashMap::new())); @@ -41,7 +41,6 @@ pub fn start( while let Ok(new_user_msg) = user_feed.recv().await { traders .write() - .expect("RwLock to not be poisoned") .insert(new_user_msg.new_user, new_user_msg.sender); } } @@ -51,15 +50,10 @@ pub fn start( async move { while let Some(notification) = receiver.recv().await { match notification { - Notification::Message { trader_id, message } => { + OrderbookMessage::TraderMessage { trader_id, message } => { tracing::info!(%trader_id, "Sending message: {message:?}"); - let trader = { - let traders = authenticated_users - .read() - .expect("RwLock to not be poisoned"); - traders.get(&trader_id).cloned() - }; + let trader = authenticated_users.read().get(&trader_id).cloned(); match trader { Some(sender) => { diff --git a/coordinator/src/node/rollover.rs b/coordinator/src/node/rollover.rs index 84ef211b3..ab1267980 100644 --- a/coordinator/src/node/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -1,8 +1,7 @@ use crate::db; use crate::db::positions; +use crate::message::NewUserMessage; use crate::node::Node; -use crate::notification::NewUserMessage; -use crate::notification::Notification; use anyhow::bail; use anyhow::Context; use anyhow::Result; @@ -44,7 +43,7 @@ struct Rollover { pub fn monitor( pool: Pool>, tx_user_feed: broadcast::Sender, - notifier: mpsc::Sender, + notifier: mpsc::Sender, network: Network, ) -> RemoteHandle> { let mut user_feed = tx_user_feed.subscribe(); @@ -78,7 +77,7 @@ pub fn monitor( async fn check_if_eligible_for_rollover( conn: &mut PgConnection, - notifier: mpsc::Sender, + notifier: mpsc::Sender, trader_id: PublicKey, network: Network, ) -> Result<()> { @@ -98,7 +97,7 @@ async fn check_if_eligible_for_rollover( tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position"); - let message = Notification::Message { + let message = OrderbookMessage::TraderMessage { trader_id, message: Message::Rollover, }; diff --git a/coordinator/src/notification_service.rs b/coordinator/src/notification_service.rs deleted file mode 100644 index 17d7c7ff1..000000000 --- a/coordinator/src/notification_service.rs +++ /dev/null @@ -1,132 +0,0 @@ -use anyhow::Context; -use anyhow::Result; -use std::fmt::Display; -use tokio::sync::mpsc; - -/// Types of notification that can be sent to 10101 app users - -#[derive(Debug, Clone)] -pub enum NotificationKind { - /// Coordinator would like to settle the channel - ChannelClose, - PositionSoonToExpire, - PositionExpired, -} - -impl Display for NotificationKind { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - NotificationKind::ChannelClose => write!(f, "ChannelClose"), - NotificationKind::PositionSoonToExpire => write!(f, "PositionSoonToExpire"), - NotificationKind::PositionExpired => write!(f, "PositionExpired"), - } - } -} - -#[derive(Debug, Clone)] -pub struct Notification { - pub user_fcm_token: String, - pub notification_kind: NotificationKind, -} - -impl Notification { - pub fn new(user_fcm_token: String, notification_kind: NotificationKind) -> Self { - Self { - notification_kind, - user_fcm_token, - } - } -} - -/// Actor managing the notifications -pub struct NotificationService { - notification_sender: mpsc::Sender, -} - -impl NotificationService { - /// Start the notification service - /// - /// If an empty string is passed in the constructor, the service will not send any notification. - /// It will only log the notification that it would have sent. - pub fn new(fcm_api_key: String) -> Self { - if fcm_api_key.is_empty() { - // Log it as error, as in production it should always be set - tracing::error!("FCM API key is empty. No notifications will not be sent."); - } - - let (notification_sender, mut notification_receiver) = mpsc::channel(100); - - // TODO: use RAII here - tokio::spawn({ - let fcm_api_key = fcm_api_key; - let client = fcm::Client::new(); - async move { - while let Some(Notification { - user_fcm_token, - notification_kind, - }) = notification_receiver.recv().await - { - tracing::info!(%notification_kind, %user_fcm_token, "Sending notification"); - - if !fcm_api_key.is_empty() { - let notification = build_notification(notification_kind); - if let Err(e) = - send_notification(&client, &fcm_api_key, &user_fcm_token, notification) - .await - { - tracing::error!("Could not send notification to FCM: {:?}", e); - } - } - } - } - }); - - Self { - notification_sender, - } - } - - /// Constructs a new sender. Use a sender to send notification from any part of the system. - pub fn get_sender(&self) -> mpsc::Sender { - self.notification_sender.clone() - } -} - -/// Prepares the notification text -fn build_notification<'a>(kind: NotificationKind) -> fcm::Notification<'a> { - let mut notification_builder = fcm::NotificationBuilder::new(); - match kind { - NotificationKind::ChannelClose => { - notification_builder.body("Close channel request."); - notification_builder.title("Someone wants to close a position with you! 🌻"); - } - NotificationKind::PositionSoonToExpire => { - notification_builder.title("Your Position is about to expire"); - notification_builder.body("Open the app to react."); - } - NotificationKind::PositionExpired => { - notification_builder.title("Your position has expired"); - notification_builder.body("Open the app to react."); - } - } - notification_builder.finalize() -} - -async fn send_notification<'a>( - client: &fcm::Client, - api_key: &str, - fcm_token: &str, - notification: fcm::Notification<'a>, -) -> Result<()> { - anyhow::ensure!(!api_key.is_empty(), "FCM API key is empty"); - - let mut message_builder = fcm::MessageBuilder::new(api_key, fcm_token); - message_builder.notification(notification); - let message = message_builder.finalize(); - let response = client - .send(message) - .await - .context("could not send FCM notification")?; - tracing::debug!("Sent: {:?}", response); - Ok(()) -} diff --git a/coordinator/src/notifications.rs b/coordinator/src/notifications.rs new file mode 100644 index 000000000..d7ac1d316 --- /dev/null +++ b/coordinator/src/notifications.rs @@ -0,0 +1,348 @@ +use crate::db::positions; +use crate::db::user; +use crate::position::models::Position; +use anyhow::Context; +use anyhow::Result; +use diesel::PgConnection; +use std::fmt::Display; +use time::OffsetDateTime; +use tokio::sync::mpsc; + +/// Consider a position expiring soon if it expires in less than this time +const START_OF_EXPIRING_POSITION: time::Duration = time::Duration::hours(1); + +/// Consider a position expired if it expired more than this time ago +const END_OF_EXPIRED_POSITION: time::Duration = time::Duration::hours(1); + +/// Types of notification that can be sent to 10101 app users + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NotificationKind { + PositionSoonToExpire, + PositionExpired, +} + +impl Display for NotificationKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + NotificationKind::PositionSoonToExpire => write!(f, "PositionSoonToExpire"), + NotificationKind::PositionExpired => write!(f, "PositionExpired"), + } + } +} + +#[derive(Debug, Clone)] +pub struct Notification { + pub user_fcm_token: FcmToken, + pub notification_kind: NotificationKind, +} + +impl Notification { + pub fn new(user_fcm_token: FcmToken, notification_kind: NotificationKind) -> Self { + Self { + notification_kind, + user_fcm_token, + } + } +} + +/// Actor managing the notifications +pub struct NotificationService { + notification_sender: mpsc::Sender, +} + +impl NotificationService { + /// Start the notification service + /// + /// If an empty string is passed in the constructor, the service will not send any notification. + /// It will only log the notification that it would have sent. + pub fn new(fcm_api_key: String) -> Self { + if fcm_api_key.is_empty() { + // Log it as error, as in production it should always be set + tracing::error!("FCM API key is empty. No notifications will not be sent."); + } + + let (notification_sender, mut notification_receiver) = mpsc::channel(100); + + // TODO: use RAII here + tokio::spawn({ + let fcm_api_key = fcm_api_key; + let client = fcm::Client::new(); + async move { + while let Some(Notification { + user_fcm_token, + notification_kind, + }) = notification_receiver.recv().await + { + tracing::info!(%notification_kind, %user_fcm_token, "Sending notification"); + + if !fcm_api_key.is_empty() { + let notification = build_notification(notification_kind); + if let Err(e) = + send_notification(&client, &fcm_api_key, &user_fcm_token, notification) + .await + { + tracing::error!("Could not send notification to FCM: {:?}", e); + } + } + } + } + }); + + Self { + notification_sender, + } + } + + /// Constructs a new sender. Use a sender to send notification from any part of the system. + pub fn get_sender(&self) -> mpsc::Sender { + self.notification_sender.clone() + } +} + +/// Prepares the notification text +fn build_notification<'a>(kind: NotificationKind) -> fcm::Notification<'a> { + let mut notification_builder = fcm::NotificationBuilder::new(); + match kind { + NotificationKind::PositionSoonToExpire => { + notification_builder.title("Your position is about to expire"); + notification_builder.body("Open the app to react."); + } + NotificationKind::PositionExpired => { + notification_builder.title("Your position has expired"); + notification_builder.body("Open the app to react."); + } + } + notification_builder.finalize() +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FcmToken(pub String); + +impl Display for FcmToken { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", &self.0) + } +} + +async fn send_notification<'a>( + client: &fcm::Client, + api_key: &str, + fcm_token: &FcmToken, + notification: fcm::Notification<'a>, +) -> Result<()> { + anyhow::ensure!(!api_key.is_empty(), "FCM API key is empty"); + + let mut message_builder = fcm::MessageBuilder::new(api_key, &fcm_token.0); + message_builder.notification(notification); + let message = message_builder.finalize(); + let response = client + .send(message) + .await + .context("could not send FCM notification")?; + tracing::debug!("Sent notification. Response: {:?}", response); + Ok(()) +} + +/// Load all recent positions with the DB with associated fcm tokens and send push +/// notification about expiring/expired positions if needed. +pub async fn query_and_send_position_notifications( + conn: &mut PgConnection, + notification_sender: &mpsc::Sender, +) -> Result<()> { + let users = user::all(conn)?; + + let positions_with_fcm_tokens = positions::Position::get_all_positions_with_expiry_within( + conn, + OffsetDateTime::now_utc() - START_OF_EXPIRING_POSITION, + OffsetDateTime::now_utc() + END_OF_EXPIRED_POSITION, + )? + .into_iter() + // Join positions with users to add the FCM tokens. + // Filter out positions that don't have a FCM token stored in the users + // table which is with them. + // This can be done at the DB level if it ever becomes a performance issue. + .filter_map(|p| { + let maybe_fcm_token = users + .iter() + .find(|u| u.pubkey == p.trader.to_string()) + .map(|u| FcmToken(u.fcm_token.clone())); + + if let Some(fcm_token) = maybe_fcm_token { + Some((p, fcm_token)) + } else { + tracing::warn!(?p, "No FCM token for position"); + None + } + }) + .collect::>(); + + send_expiry_notifications_if_applicable(&positions_with_fcm_tokens, notification_sender).await; + + Ok(()) +} + +/// Send notifications to users with positions that are about to expire or have +/// just expired +async fn send_expiry_notifications_if_applicable( + positions: &[(Position, FcmToken)], + notification_sender: &mpsc::Sender, +) { + let now = OffsetDateTime::now_utc(); + + for (position, fcm_token) in positions { + if position.expiry_timestamp <= now + && now < position.expiry_timestamp + END_OF_EXPIRED_POSITION + { + if let Err(e) = notification_sender + .send(Notification::new( + fcm_token.clone(), + NotificationKind::PositionExpired, + )) + .await + { + tracing::error!("Failed to send PositionExpired notification: {:?}", e); + } + } else if position.expiry_timestamp > now + && position.expiry_timestamp <= now + START_OF_EXPIRING_POSITION + { + if let Err(e) = notification_sender + .send(Notification::new( + fcm_token.clone(), + NotificationKind::PositionSoonToExpire, + )) + .await + { + tracing::error!("Failed to send PositionSoonToExpire notification: {:?}", e); + } + } + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use crate::logger::init_tracing_for_test; + use time::Duration; + + fn soon_expiring_position() -> (Position, FcmToken) { + let mut position = Position::dummy(); + position.creation_timestamp = OffsetDateTime::now_utc() - Duration::days(1); + position.expiry_timestamp = OffsetDateTime::now_utc() + Duration::minutes(5); + (position, FcmToken("soon_to_expire".to_string())) + } + + fn just_expired_position() -> (Position, FcmToken) { + let mut position = Position::dummy(); + position.creation_timestamp = OffsetDateTime::now_utc() - Duration::days(1); + position.expiry_timestamp = OffsetDateTime::now_utc() - Duration::minutes(5); + (position, FcmToken("just expired".to_string())) + } + + fn ancient_expired_position() -> (Position, FcmToken) { + let mut position = Position::dummy(); + position.creation_timestamp = OffsetDateTime::now_utc() - Duration::days(1); + position.expiry_timestamp = OffsetDateTime::now_utc() - Duration::hours(2); + (position, FcmToken("long time ago expired".to_string())) + } + + fn far_from_expiry_position() -> (Position, FcmToken) { + let mut position = Position::dummy(); + position.creation_timestamp = OffsetDateTime::now_utc() - Duration::days(1); + position.expiry_timestamp = OffsetDateTime::now_utc() + Duration::days(1); + (position, FcmToken("far_from_expiry".to_string())) + } + + // Receive all values that could have been sent to the channel + fn receive_all_notifications( + notification_rx: &mut mpsc::Receiver, + ) -> Vec { + let mut received_notifications = vec![]; + while let Ok(notification) = notification_rx.try_recv() { + tracing::info!(?notification, "Received notification"); + received_notifications.push(notification); + } + received_notifications + } + + #[tokio::test] + async fn send_no_notifications_when_too_far_to_expiry() { + init_tracing_for_test(); + + let position_far_from_expiring_1 = far_from_expiry_position(); + let position_far_from_expiring_2 = far_from_expiry_position(); + let (notification_sender, mut notification_receiver) = mpsc::channel(100); + + send_expiry_notifications_if_applicable( + &[position_far_from_expiring_1, position_far_from_expiring_2], + ¬ification_sender, + ) + .await; + + let received_notifications = receive_all_notifications(&mut notification_receiver); + + assert_eq!(received_notifications.len(), 0); + } + + #[tokio::test] + async fn test_deliving_notifications_before_expiry() { + init_tracing_for_test(); + + let position_far_from_expiring = far_from_expiry_position(); + let position_soon_to_expire = soon_expiring_position(); + let ancient_position = ancient_expired_position(); // too old to send notification + let (notification_sender, mut notification_receiver) = mpsc::channel(100); + + send_expiry_notifications_if_applicable( + &[ + position_far_from_expiring, + position_soon_to_expire.clone(), + ancient_position, + ], + ¬ification_sender, + ) + .await; + + let received_notifications = receive_all_notifications(&mut notification_receiver); + + assert_eq!(received_notifications.len(), 1); + + let notification = received_notifications.first().unwrap(); + assert_eq!( + notification.notification_kind, + NotificationKind::PositionSoonToExpire + ); + assert_eq!(notification.user_fcm_token, position_soon_to_expire.1); + } + + #[tokio::test] + async fn send_only_recently_expired_notifications() { + init_tracing_for_test(); + + let position_far_from_expiring = far_from_expiry_position(); + let position_just_expired = just_expired_position(); + let ancient_position = ancient_expired_position(); // too old to send notification + let (notification_sender, mut notification_receiver) = mpsc::channel(100); + + send_expiry_notifications_if_applicable( + &[ + position_far_from_expiring, + position_just_expired.clone(), + ancient_position, + ], + ¬ification_sender, + ) + .await; + + let received_notifications = receive_all_notifications(&mut notification_receiver); + + assert_eq!(received_notifications.len(), 1); + + let notification = received_notifications.first().unwrap(); + assert_eq!( + notification.notification_kind, + NotificationKind::PositionExpired + ); + assert_eq!(notification.user_fcm_token, position_just_expired.1); + } +} diff --git a/coordinator/src/orderbook/async_match.rs b/coordinator/src/orderbook/async_match.rs index 0dd08a9bd..a98d9adcb 100644 --- a/coordinator/src/orderbook/async_match.rs +++ b/coordinator/src/orderbook/async_match.rs @@ -1,5 +1,5 @@ -use crate::notification::NewUserMessage; -use crate::notification::Notification; +use crate::message::NewUserMessage; +use crate::message::OrderbookMessage; use crate::orderbook::db::matches; use crate::orderbook::db::orders; use anyhow::ensure; @@ -26,7 +26,7 @@ use tokio::sync::mpsc; pub fn monitor( pool: Pool>, tx_user_feed: broadcast::Sender, - notifier: mpsc::Sender, + notifier: mpsc::Sender, network: Network, ) -> RemoteHandle> { let mut user_feed = tx_user_feed.subscribe(); @@ -54,7 +54,7 @@ pub fn monitor( /// Checks if there are any pending matches async fn process_pending_match( conn: &mut PgConnection, - notifier: mpsc::Sender, + notifier: mpsc::Sender, trader_id: PublicKey, network: Network, ) -> Result<()> { @@ -69,7 +69,7 @@ async fn process_pending_match( OrderReason::Expired => Message::AsyncMatch { order, filled_with }, }; - let msg = Notification::Message { trader_id, message }; + let msg = OrderbookMessage::TraderMessage { trader_id, message }; if let Err(e) = notifier.send(msg).await { tracing::error!("Failed to send notification. Error: {e:#}"); } diff --git a/coordinator/src/orderbook/tests/mod.rs b/coordinator/src/orderbook/tests/mod.rs index a0575a4c6..a66bdce52 100644 --- a/coordinator/src/orderbook/tests/mod.rs +++ b/coordinator/src/orderbook/tests/mod.rs @@ -7,24 +7,12 @@ use diesel::r2d2; use diesel::r2d2::ConnectionManager; use diesel::r2d2::PooledConnection; use diesel::PgConnection; -use std::sync::Once; use testcontainers::clients::Cli; use testcontainers::core::WaitFor; use testcontainers::images; use testcontainers::images::generic::GenericImage; use testcontainers::Container; -pub fn init_tracing() { - static TRACING_TEST_SUBSCRIBER: Once = Once::new(); - - TRACING_TEST_SUBSCRIBER.call_once(|| { - tracing_subscriber::fmt() - .with_env_filter("debug") - .with_test_writer() - .init() - }) -} - pub fn start_postgres(docker: &Cli) -> Result<(Container, String)> { let db = "postgres-db-test"; let user = "postgres-user-test"; diff --git a/coordinator/src/orderbook/tests/registration_test.rs b/coordinator/src/orderbook/tests/registration_test.rs index 220f2fa6f..97fe5fac1 100644 --- a/coordinator/src/orderbook/tests/registration_test.rs +++ b/coordinator/src/orderbook/tests/registration_test.rs @@ -1,6 +1,6 @@ use crate::db::user; use crate::db::user::User; -use crate::orderbook::tests::init_tracing; +use crate::logger::init_tracing_for_test; use crate::orderbook::tests::setup_db; use crate::orderbook::tests::start_postgres; use bitcoin::secp256k1::PublicKey; @@ -10,7 +10,7 @@ use testcontainers::clients::Cli; #[tokio::test] async fn registered_user_is_stored_in_db() { - init_tracing(); + init_tracing_for_test(); let docker = Cli::default(); let (_container, conn_spec) = start_postgres(&docker).unwrap(); diff --git a/coordinator/src/orderbook/tests/sample_test.rs b/coordinator/src/orderbook/tests/sample_test.rs index 67bbd65de..f95f719a3 100644 --- a/coordinator/src/orderbook/tests/sample_test.rs +++ b/coordinator/src/orderbook/tests/sample_test.rs @@ -1,5 +1,5 @@ +use crate::logger::init_tracing_for_test; use crate::orderbook::db::orders; -use crate::orderbook::tests::init_tracing; use crate::orderbook::tests::setup_db; use crate::orderbook::tests::start_postgres; use bitcoin::secp256k1::PublicKey; @@ -17,7 +17,7 @@ use uuid::Uuid; #[tokio::test] async fn crud_test() { - init_tracing(); + init_tracing_for_test(); let docker = Cli::default(); let (_container, conn_spec) = start_postgres(&docker).unwrap(); @@ -46,7 +46,7 @@ async fn crud_test() { #[tokio::test] async fn test_filter_expired_orders() { - init_tracing(); + init_tracing_for_test(); let docker = Cli::default(); let (_container, conn_spec) = start_postgres(&docker).unwrap(); diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index ebf706dee..319eef690 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,4 +1,4 @@ -use crate::notification::Notification; +use crate::message::CoordinatorMessage; use crate::orderbook::db::matches; use crate::orderbook::db::orders; use anyhow::anyhow; @@ -85,7 +85,7 @@ impl From<&TradeParams> for TraderMatchParams { pub fn start( pool: Pool>, tx_price_feed: broadcast::Sender, - notifier: mpsc::Sender, + notifier: mpsc::Sender, network: Network, ) -> (RemoteHandle>, mpsc::Sender) { let (sender, mut receiver) = mpsc::channel::(NEW_ORDERS_BUFFER_SIZE); @@ -132,7 +132,7 @@ pub fn start( /// Market order: find match and notify traders async fn process_new_order( conn: &mut PgConnection, - notifier: mpsc::Sender, + notifier: mpsc::Sender, tx_price_feed: broadcast::Sender, new_order: NewOrder, order_reason: OrderReason, @@ -216,7 +216,7 @@ async fn process_new_order( }, }; - let msg = Notification::Message { trader_id, message }; + let msg = OrderbookMessage::TraderMessage { trader_id, message }; let order_state = match notifier.send(msg).await { Ok(()) => { tracing::debug!(%trader_id, order_id, "Successfully notified trader"); diff --git a/coordinator/src/orderbook/websocket.rs b/coordinator/src/orderbook/websocket.rs index 353731e9c..25b1bb480 100644 --- a/coordinator/src/orderbook/websocket.rs +++ b/coordinator/src/orderbook/websocket.rs @@ -1,4 +1,4 @@ -use crate::notification::NewUserMessage; +use crate::message::NewUserMessage; use crate::orderbook; use crate::orderbook::db::orders; use crate::routes::AppState; diff --git a/coordinator/src/position/models.rs b/coordinator/src/position/models.rs index 2e9ede8ee..c8b878090 100644 --- a/coordinator/src/position/models.rs +++ b/coordinator/src/position/models.rs @@ -26,7 +26,7 @@ pub struct NewPosition { pub temporary_contract_id: ContractId, } -#[derive(PartialEq, Debug)] +#[derive(Clone, PartialEq, Debug)] pub enum PositionState { Open, /// The position is in the process of being closed @@ -46,7 +46,7 @@ pub enum PositionState { /// The position acts as an aggregate of one contract of one user. /// The position represents the values of the trader; i.e. the leverage, collateral and direction /// are stored from the trader's perspective and not the coordinator's. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Position { pub id: i32, pub contract_symbol: ContractSymbol, @@ -461,7 +461,7 @@ pub mod tests { } impl Position { - fn dummy() -> Self { + pub(crate) fn dummy() -> Self { Position { id: 0, contract_symbol: ContractSymbol::BtcUsd, diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index c40f564c6..4b90ad2a2 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -10,8 +10,8 @@ use crate::admin::open_channel; use crate::admin::send_payment; use crate::admin::sign_message; use crate::db::user; +use crate::message::NewUserMessage; use crate::node::Node; -use crate::notification::NewUserMessage; use crate::orderbook::routes::delete_order; use crate::orderbook::routes::get_order; use crate::orderbook::routes::get_orders; From bcab9784178af86ccc64b849fce66f13ec8e86e5 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Mon, 18 Sep 2023 15:33:19 -0700 Subject: [PATCH 2/4] feat: Send a notification prompt when executing AsyncTrade When the other party is not online, send a notification that a position has expired. --- coordinator/src/bin/coordinator.rs | 16 ++++++++------ coordinator/src/node/rollover.rs | 1 + coordinator/src/notifications.rs | 15 +++++++++++-- coordinator/src/orderbook/trading.rs | 32 +++++++++++++++++++++++----- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 76641b0fb..c05fc9314 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -201,6 +201,7 @@ async fn main() -> Result<()> { }); let (tx_user_feed, _rx) = broadcast::channel::(100); + let (tx_price_feed, _rx) = broadcast::channel(100); let (_handle, auth_users_notifier) = @@ -211,17 +212,22 @@ async fn main() -> Result<()> { let (_handle, trading_sender) = trading::start( pool.clone(), tx_price_feed.clone(), - notifier.clone(), + auth_users_notifier.clone(), + notification_service.get_sender(), network, ); - let _handle = async_match::monitor( pool.clone(), tx_user_feed.clone(), - notifier.clone(), + auth_users_notifier.clone(), + network, + ); + let _handle = rollover::monitor( + pool.clone(), + tx_user_feed.clone(), + auth_users_notifier, network, ); - let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier); tokio::spawn({ let node = node.clone(); @@ -266,8 +272,6 @@ async fn main() -> Result<()> { tx_user_feed, ); - let notification_service = NotificationService::new(opts.fcm_api_key); - tokio::spawn({ let sender = notification_service.get_sender(); let pool = pool.clone(); diff --git a/coordinator/src/node/rollover.rs b/coordinator/src/node/rollover.rs index ab1267980..b49e3ecf9 100644 --- a/coordinator/src/node/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -1,6 +1,7 @@ use crate::db; use crate::db::positions; use crate::message::NewUserMessage; +use crate::message::OrderbookMessage; use crate::node::Node; use anyhow::bail; use anyhow::Context; diff --git a/coordinator/src/notifications.rs b/coordinator/src/notifications.rs index d7ac1d316..82e02e58f 100644 --- a/coordinator/src/notifications.rs +++ b/coordinator/src/notifications.rs @@ -117,7 +117,18 @@ fn build_notification<'a>(kind: NotificationKind) -> fcm::Notification<'a> { } #[derive(Clone, Debug, PartialEq, Eq)] -pub struct FcmToken(pub String); +pub struct FcmToken(String); + +impl FcmToken { + pub fn new(token: String) -> Result { + anyhow::ensure!(!token.is_empty(), "FCM token cannot be empty"); + Ok(Self(token)) + } + + pub fn get(&self) -> &str { + &self.0 + } +} impl Display for FcmToken { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -133,7 +144,7 @@ async fn send_notification<'a>( ) -> Result<()> { anyhow::ensure!(!api_key.is_empty(), "FCM API key is empty"); - let mut message_builder = fcm::MessageBuilder::new(api_key, &fcm_token.0); + let mut message_builder = fcm::MessageBuilder::new(api_key, fcm_token.get()); message_builder.notification(notification); let message = message_builder.finalize(); let response = client diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 319eef690..fdb344e1c 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,4 +1,8 @@ -use crate::message::CoordinatorMessage; +use crate::db::user; +use crate::message::OrderbookMessage; +use crate::notifications::FcmToken; +use crate::notifications::Notification; +use crate::notifications::NotificationKind; use crate::orderbook::db::matches; use crate::orderbook::db::orders; use anyhow::anyhow; @@ -85,7 +89,8 @@ impl From<&TradeParams> for TraderMatchParams { pub fn start( pool: Pool>, tx_price_feed: broadcast::Sender, - notifier: mpsc::Sender, + notifier: mpsc::Sender, + notification_sender: mpsc::Sender, network: Network, ) -> (RemoteHandle>, mpsc::Sender) { let (sender, mut receiver) = mpsc::channel::(NEW_ORDERS_BUFFER_SIZE); @@ -96,12 +101,14 @@ pub fn start( let mut conn = pool.get()?; let tx_price_feed = tx_price_feed.clone(); let notifier = notifier.clone(); + let notification_sender = notification_sender.clone(); async move { let new_order = new_order_msg.new_order; let result = process_new_order( &mut conn, notifier, tx_price_feed, + notification_sender, new_order, new_order_msg.order_reason, network, @@ -134,6 +141,7 @@ async fn process_new_order( conn: &mut PgConnection, notifier: mpsc::Sender, tx_price_feed: broadcast::Sender, + notification_sender: mpsc::Sender, new_order: NewOrder, order_reason: OrderReason, network: Network, @@ -146,7 +154,7 @@ async fn process_new_order( ))?; } - // before processing any match we set all expired limit orders to failed, to ensure the do + // Before processing any match we set all expired limit orders to failed, to ensure the do // not get matched. // TODO(holzeis): orders should probably do not have an expiry, but should either be // replaced or deleted if not wanted anymore. @@ -161,7 +169,7 @@ async fn process_new_order( .send(Message::NewOrder(order.clone())) .map_err(|error| anyhow!("Could not update price feed due to '{error}'"))?; } else { - // reject new order if there is already a matched order waiting for execution. + // Reject new order if there is already a matched order waiting for execution. if let Some(order) = orders::get_by_trader_id_and_state(conn, new_order.trader_id, OrderState::Matched)? { @@ -224,7 +232,21 @@ async fn process_new_order( } Err(e) => { tracing::warn!(%trader_id, order_id, "{e:#}"); - // TODO(holzeis): send push notification to user + + if let Some(user) = user::by_id(conn, trader_id.to_string())? { + tracing::debug!(%trader_id, order_id, "Sending push notification to user"); + + if let Ok(fcm_token) = FcmToken::new(user.fcm_token) { + notification_sender + .send(Notification { + user_fcm_token: fcm_token, + notification_kind: NotificationKind::PositionExpired, + }) + .await?; + } + } else { + tracing::warn!(%trader_id, order_id, "User has no FCM token"); + } if order.order_type == OrderType::Limit { // FIXME: The maker is currently not connected to the web socket so we From c694d36ea1761f36e71ade50890cb786a0c4d8e6 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Tue, 19 Sep 2023 15:58:06 -0700 Subject: [PATCH 3/4] chore: Notify about the soon expiring positions at least 12h before expiry Create a window [START_OF_EXPIRING_POSITION, END_OF_EXPIRING_POSITION] to better configure the notification about imminent position expiry. Configure the window to [13h,12h] before expiry. --- coordinator/src/notifications.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/coordinator/src/notifications.rs b/coordinator/src/notifications.rs index 82e02e58f..9c2abc39c 100644 --- a/coordinator/src/notifications.rs +++ b/coordinator/src/notifications.rs @@ -8,10 +8,13 @@ use std::fmt::Display; use time::OffsetDateTime; use tokio::sync::mpsc; -/// Consider a position expiring soon if it expires in less than this time -const START_OF_EXPIRING_POSITION: time::Duration = time::Duration::hours(1); +/// A position expiring soon if it expires in less than this time +const START_OF_EXPIRING_POSITION: time::Duration = time::Duration::hours(13); -/// Consider a position expired if it expired more than this time ago +/// A position 'expiring soon' if it expires in less than this time +const END_OF_EXPIRING_POSITION: time::Duration = time::Duration::hours(12); + +/// A position expired if it expired more than this time ago const END_OF_EXPIRED_POSITION: time::Duration = time::Duration::hours(1); /// Types of notification that can be sent to 10101 app users @@ -214,7 +217,7 @@ async fn send_expiry_notifications_if_applicable( { tracing::error!("Failed to send PositionExpired notification: {:?}", e); } - } else if position.expiry_timestamp > now + } else if position.expiry_timestamp > now + END_OF_EXPIRING_POSITION && position.expiry_timestamp <= now + START_OF_EXPIRING_POSITION { if let Err(e) = notification_sender @@ -239,7 +242,18 @@ pub mod tests { fn soon_expiring_position() -> (Position, FcmToken) { let mut position = Position::dummy(); position.creation_timestamp = OffsetDateTime::now_utc() - Duration::days(1); - position.expiry_timestamp = OffsetDateTime::now_utc() + Duration::minutes(5); + position.expiry_timestamp = + OffsetDateTime::now_utc() + Duration::hours(12) + Duration::minutes(5); + (position, FcmToken("soon_to_expire".to_string())) + } + + /// This position is outside of the expiry notification window (12 hours + /// before expiry), delivering notification for it would make little sense + /// as user can't really react to it. + fn just_before_expiry_position() -> (Position, FcmToken) { + let mut position = Position::dummy(); + position.creation_timestamp = OffsetDateTime::now_utc() - Duration::days(1); + position.expiry_timestamp = OffsetDateTime::now_utc() + Duration::hours(11); (position, FcmToken("soon_to_expire".to_string())) } @@ -301,12 +315,14 @@ pub mod tests { let position_far_from_expiring = far_from_expiry_position(); let position_soon_to_expire = soon_expiring_position(); + let position_just_before_expiry = just_before_expiry_position(); let ancient_position = ancient_expired_position(); // too old to send notification let (notification_sender, mut notification_receiver) = mpsc::channel(100); send_expiry_notifications_if_applicable( &[ position_far_from_expiring, + position_just_before_expiry, position_soon_to_expire.clone(), ancient_position, ], From 8f887fc0ce8d96c40d7d4eabd1d378984b3402a4 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Tue, 19 Sep 2023 16:08:35 -0700 Subject: [PATCH 4/4] fix: Adjust the frequency of push notifications for positions check Improve documentation and adjust how often we run the check to minimise the changes of user receiving 2 notifications (it can still happen, but it is around 3% now (overlapping the time a bit to ensure we don't out miss sending out the notification). --- coordinator/src/bin/coordinator.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index c05fc9314..bd2128699 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -47,7 +47,11 @@ const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(5 * 60); const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30); const UNREALIZED_PNL_SYNC_INTERVAL: Duration = Duration::from_secs(10 * 60); const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30); -const EXPIRED_POSITION_NOTIFICATION_INTERVAL: Duration = Duration::from_secs(30 * 60); +/// How often to check for expiring/expired positions to send push notifications for. +/// This should be configured in conjunction with the time windows of +/// expiring/expired notifications, ideally a bit less than the time window +/// (e.g. 58min for a 1h time window). +const POSITION_PUSH_NOTIFICATION_INTERVAL: Duration = Duration::from_secs(58 * 60); const NODE_ALIAS: &str = "10101.finance"; @@ -277,6 +281,7 @@ async fn main() -> Result<()> { let pool = pool.clone(); async move { loop { + tracing::debug!("Running expiring/expired position push notification task"); match pool.get() { Ok(mut conn) => { if let Err(e) = @@ -290,7 +295,7 @@ async fn main() -> Result<()> { } } - tokio::time::sleep(EXPIRED_POSITION_NOTIFICATION_INTERVAL).await; + tokio::time::sleep(POSITION_PUSH_NOTIFICATION_INTERVAL).await; } } });