Skip to content

Commit

Permalink
feat: Send a notification prompt when executing AsyncTrade
Browse files Browse the repository at this point in the history
When the other party is not online, send a notification that a position has expired.
  • Loading branch information
klochowicz committed Sep 20, 2023
1 parent d053349 commit bcab978
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 13 deletions.
16 changes: 10 additions & 6 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ async fn main() -> Result<()> {
});

let (tx_user_feed, _rx) = broadcast::channel::<NewUserMessage>(100);

let (tx_price_feed, _rx) = broadcast::channel(100);

let (_handle, auth_users_notifier) =
Expand All @@ -211,17 +212,22 @@ async fn main() -> Result<()> {
let (_handle, trading_sender) = trading::start(
pool.clone(),
tx_price_feed.clone(),
notifier.clone(),
auth_users_notifier.clone(),
notification_service.get_sender(),
network,
);

let _handle = async_match::monitor(
pool.clone(),
tx_user_feed.clone(),
notifier.clone(),
auth_users_notifier.clone(),
network,
);
let _handle = rollover::monitor(
pool.clone(),
tx_user_feed.clone(),
auth_users_notifier,
network,
);
let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier);

tokio::spawn({
let node = node.clone();
Expand Down Expand Up @@ -266,8 +272,6 @@ async fn main() -> Result<()> {
tx_user_feed,
);

let notification_service = NotificationService::new(opts.fcm_api_key);

tokio::spawn({
let sender = notification_service.get_sender();
let pool = pool.clone();
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::db;
use crate::db::positions;
use crate::message::NewUserMessage;
use crate::message::OrderbookMessage;
use crate::node::Node;
use anyhow::bail;
use anyhow::Context;
Expand Down
15 changes: 13 additions & 2 deletions coordinator/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,18 @@ fn build_notification<'a>(kind: NotificationKind) -> fcm::Notification<'a> {
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FcmToken(pub String);
pub struct FcmToken(String);

impl FcmToken {
pub fn new(token: String) -> Result<Self> {
anyhow::ensure!(!token.is_empty(), "FCM token cannot be empty");
Ok(Self(token))
}

pub fn get(&self) -> &str {
&self.0
}
}

impl Display for FcmToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -133,7 +144,7 @@ async fn send_notification<'a>(
) -> Result<()> {
anyhow::ensure!(!api_key.is_empty(), "FCM API key is empty");

let mut message_builder = fcm::MessageBuilder::new(api_key, &fcm_token.0);
let mut message_builder = fcm::MessageBuilder::new(api_key, fcm_token.get());
message_builder.notification(notification);
let message = message_builder.finalize();
let response = client
Expand Down
32 changes: 27 additions & 5 deletions coordinator/src/orderbook/trading.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::message::CoordinatorMessage;
use crate::db::user;
use crate::message::OrderbookMessage;
use crate::notifications::FcmToken;
use crate::notifications::Notification;
use crate::notifications::NotificationKind;
use crate::orderbook::db::matches;
use crate::orderbook::db::orders;
use anyhow::anyhow;
Expand Down Expand Up @@ -85,7 +89,8 @@ impl From<&TradeParams> for TraderMatchParams {
pub fn start(
pool: Pool<ConnectionManager<PgConnection>>,
tx_price_feed: broadcast::Sender<Message>,
notifier: mpsc::Sender<CoordinatorMessage>,
notifier: mpsc::Sender<OrderbookMessage>,
notification_sender: mpsc::Sender<Notification>,
network: Network,
) -> (RemoteHandle<Result<()>>, mpsc::Sender<NewOrderMessage>) {
let (sender, mut receiver) = mpsc::channel::<NewOrderMessage>(NEW_ORDERS_BUFFER_SIZE);
Expand All @@ -96,12 +101,14 @@ pub fn start(
let mut conn = pool.get()?;
let tx_price_feed = tx_price_feed.clone();
let notifier = notifier.clone();
let notification_sender = notification_sender.clone();
async move {
let new_order = new_order_msg.new_order;
let result = process_new_order(
&mut conn,
notifier,
tx_price_feed,
notification_sender,
new_order,
new_order_msg.order_reason,
network,
Expand Down Expand Up @@ -134,6 +141,7 @@ async fn process_new_order(
conn: &mut PgConnection,
notifier: mpsc::Sender<OrderbookMessage>,
tx_price_feed: broadcast::Sender<Message>,
notification_sender: mpsc::Sender<Notification>,
new_order: NewOrder,
order_reason: OrderReason,
network: Network,
Expand All @@ -146,7 +154,7 @@ async fn process_new_order(
))?;
}

// before processing any match we set all expired limit orders to failed, to ensure the do
// Before processing any match we set all expired limit orders to failed, to ensure the do
// not get matched.
// TODO(holzeis): orders should probably do not have an expiry, but should either be
// replaced or deleted if not wanted anymore.
Expand All @@ -161,7 +169,7 @@ async fn process_new_order(
.send(Message::NewOrder(order.clone()))
.map_err(|error| anyhow!("Could not update price feed due to '{error}'"))?;
} else {
// reject new order if there is already a matched order waiting for execution.
// Reject new order if there is already a matched order waiting for execution.
if let Some(order) =
orders::get_by_trader_id_and_state(conn, new_order.trader_id, OrderState::Matched)?
{
Expand Down Expand Up @@ -224,7 +232,21 @@ async fn process_new_order(
}
Err(e) => {
tracing::warn!(%trader_id, order_id, "{e:#}");
// TODO(holzeis): send push notification to user

if let Some(user) = user::by_id(conn, trader_id.to_string())? {
tracing::debug!(%trader_id, order_id, "Sending push notification to user");

if let Ok(fcm_token) = FcmToken::new(user.fcm_token) {
notification_sender
.send(Notification {
user_fcm_token: fcm_token,
notification_kind: NotificationKind::PositionExpired,
})
.await?;
}
} else {
tracing::warn!(%trader_id, order_id, "User has no FCM token");
}

if order.order_type == OrderType::Limit {
// FIXME: The maker is currently not connected to the web socket so we
Expand Down

0 comments on commit bcab978

Please sign in to comment.