diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 76641b0fb..c05fc9314 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -201,6 +201,7 @@ async fn main() -> Result<()> { }); let (tx_user_feed, _rx) = broadcast::channel::(100); + let (tx_price_feed, _rx) = broadcast::channel(100); let (_handle, auth_users_notifier) = @@ -211,17 +212,22 @@ async fn main() -> Result<()> { let (_handle, trading_sender) = trading::start( pool.clone(), tx_price_feed.clone(), - notifier.clone(), + auth_users_notifier.clone(), + notification_service.get_sender(), network, ); - let _handle = async_match::monitor( pool.clone(), tx_user_feed.clone(), - notifier.clone(), + auth_users_notifier.clone(), + network, + ); + let _handle = rollover::monitor( + pool.clone(), + tx_user_feed.clone(), + auth_users_notifier, network, ); - let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier); tokio::spawn({ let node = node.clone(); @@ -266,8 +272,6 @@ async fn main() -> Result<()> { tx_user_feed, ); - let notification_service = NotificationService::new(opts.fcm_api_key); - tokio::spawn({ let sender = notification_service.get_sender(); let pool = pool.clone(); diff --git a/coordinator/src/node/rollover.rs b/coordinator/src/node/rollover.rs index ab1267980..b49e3ecf9 100644 --- a/coordinator/src/node/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -1,6 +1,7 @@ use crate::db; use crate::db::positions; use crate::message::NewUserMessage; +use crate::message::OrderbookMessage; use crate::node::Node; use anyhow::bail; use anyhow::Context; diff --git a/coordinator/src/notifications.rs b/coordinator/src/notifications.rs index d7ac1d316..82e02e58f 100644 --- a/coordinator/src/notifications.rs +++ b/coordinator/src/notifications.rs @@ -117,7 +117,18 @@ fn build_notification<'a>(kind: NotificationKind) -> fcm::Notification<'a> { } #[derive(Clone, Debug, PartialEq, Eq)] -pub struct FcmToken(pub String); +pub struct FcmToken(String); + +impl FcmToken { + pub fn new(token: String) -> Result { + anyhow::ensure!(!token.is_empty(), "FCM token cannot be empty"); + Ok(Self(token)) + } + + pub fn get(&self) -> &str { + &self.0 + } +} impl Display for FcmToken { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -133,7 +144,7 @@ async fn send_notification<'a>( ) -> Result<()> { anyhow::ensure!(!api_key.is_empty(), "FCM API key is empty"); - let mut message_builder = fcm::MessageBuilder::new(api_key, &fcm_token.0); + let mut message_builder = fcm::MessageBuilder::new(api_key, fcm_token.get()); message_builder.notification(notification); let message = message_builder.finalize(); let response = client diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 319eef690..fdb344e1c 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,4 +1,8 @@ -use crate::message::CoordinatorMessage; +use crate::db::user; +use crate::message::OrderbookMessage; +use crate::notifications::FcmToken; +use crate::notifications::Notification; +use crate::notifications::NotificationKind; use crate::orderbook::db::matches; use crate::orderbook::db::orders; use anyhow::anyhow; @@ -85,7 +89,8 @@ impl From<&TradeParams> for TraderMatchParams { pub fn start( pool: Pool>, tx_price_feed: broadcast::Sender, - notifier: mpsc::Sender, + notifier: mpsc::Sender, + notification_sender: mpsc::Sender, network: Network, ) -> (RemoteHandle>, mpsc::Sender) { let (sender, mut receiver) = mpsc::channel::(NEW_ORDERS_BUFFER_SIZE); @@ -96,12 +101,14 @@ pub fn start( let mut conn = pool.get()?; let tx_price_feed = tx_price_feed.clone(); let notifier = notifier.clone(); + let notification_sender = notification_sender.clone(); async move { let new_order = new_order_msg.new_order; let result = process_new_order( &mut conn, notifier, tx_price_feed, + notification_sender, new_order, new_order_msg.order_reason, network, @@ -134,6 +141,7 @@ async fn process_new_order( conn: &mut PgConnection, notifier: mpsc::Sender, tx_price_feed: broadcast::Sender, + notification_sender: mpsc::Sender, new_order: NewOrder, order_reason: OrderReason, network: Network, @@ -146,7 +154,7 @@ async fn process_new_order( ))?; } - // before processing any match we set all expired limit orders to failed, to ensure the do + // Before processing any match we set all expired limit orders to failed, to ensure the do // not get matched. // TODO(holzeis): orders should probably do not have an expiry, but should either be // replaced or deleted if not wanted anymore. @@ -161,7 +169,7 @@ async fn process_new_order( .send(Message::NewOrder(order.clone())) .map_err(|error| anyhow!("Could not update price feed due to '{error}'"))?; } else { - // reject new order if there is already a matched order waiting for execution. + // Reject new order if there is already a matched order waiting for execution. if let Some(order) = orders::get_by_trader_id_and_state(conn, new_order.trader_id, OrderState::Matched)? { @@ -224,7 +232,21 @@ async fn process_new_order( } Err(e) => { tracing::warn!(%trader_id, order_id, "{e:#}"); - // TODO(holzeis): send push notification to user + + if let Some(user) = user::by_id(conn, trader_id.to_string())? { + tracing::debug!(%trader_id, order_id, "Sending push notification to user"); + + if let Ok(fcm_token) = FcmToken::new(user.fcm_token) { + notification_sender + .send(Notification { + user_fcm_token: fcm_token, + notification_kind: NotificationKind::PositionExpired, + }) + .await?; + } + } else { + tracing::warn!(%trader_id, order_id, "User has no FCM token"); + } if order.order_type == OrderType::Limit { // FIXME: The maker is currently not connected to the web socket so we