Skip to content

Commit

Permalink
Periodically update unrealized pnl for position in database
Browse files Browse the repository at this point in the history
  • Loading branch information
da-kami committed Jul 21, 2023
1 parent f6030a8 commit 7f7b17a
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
ALTER TABLE
"positions" DROP COLUMN "unrealized_pnl";
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Your SQL goes here
ALTER TABLE
positions
ADD
COLUMN "unrealized_pnl" BIGINT;
14 changes: 14 additions & 0 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use coordinator::node::closed_positions;
use coordinator::node::connection;
use coordinator::node::expired_positions;
use coordinator::node::storage::NodeStorage;
use coordinator::node::unrealized_pnl;
use coordinator::node::Node;
use coordinator::routes::router;
use coordinator::run_migration;
Expand All @@ -34,6 +35,7 @@ const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10);
const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_secs(5);
const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(300);
const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30);
const UNREALIZED_PNL_SYNC_INTERVAL: Duration = Duration::from_secs(600);
const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30);

#[tokio::main]
Expand Down Expand Up @@ -162,6 +164,18 @@ async fn main() -> Result<()> {
}
});

tokio::spawn({
let node = node.clone();
async move {
loop {
tokio::time::sleep(UNREALIZED_PNL_SYNC_INTERVAL).await;
if let Err(e) = unrealized_pnl::sync(node.clone()).await {
tracing::error!("Failed to sync closed DLCs with positions in database: {e:#}");
}
}
}
});

tokio::spawn({
let node = node.clone();
async move {
Expand Down
17 changes: 17 additions & 0 deletions coordinator/src/db/positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct Position {
pub trader_pubkey: String,
pub temporary_contract_id: String,
pub realized_pnl: Option<i64>,
pub unrealized_pnl: Option<i64>,
}

impl Position {
Expand Down Expand Up @@ -125,6 +126,22 @@ impl Position {
Ok(())
}

pub fn update_unrealized_pnl(conn: &mut PgConnection, id: i32, pnl: i64) -> Result<()> {
let effected_rows = diesel::update(positions::table)
.filter(positions::id.eq(id))
.set((
positions::unrealized_pnl.eq(Some(pnl)),
positions::update_timestamp.eq(OffsetDateTime::now_utc()),
))
.execute(conn)?;

if effected_rows == 0 {
bail!("Could not update unrealized pnl {pnl} for position {id}")
}

Ok(())
}

/// inserts the given position into the db. Returns the position if successful
#[autometrics]
pub fn insert(
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub mod expired_positions;
pub mod order_matching_fee;
pub mod routing_fees;
pub mod storage;
pub mod unrealized_pnl;

/// The leverage used by the coordinator for all trades.
const COORDINATOR_LEVERAGE: f32 = 1.0;
Expand Down
68 changes: 68 additions & 0 deletions coordinator/src/node/unrealized_pnl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::db;
use crate::node::Node;
use crate::position::models::Position;
use anyhow::Context;
use anyhow::Result;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::PooledConnection;
use diesel::PgConnection;
use rust_decimal::Decimal;
use time::OffsetDateTime;
use trade::bitmex_client::BitmexClient;
use trade::bitmex_client::Quote;
use trade::cfd::calculate_pnl;
use trade::Direction;

pub async fn sync(node: Node) -> Result<()> {
let mut conn = node.pool.get()?;

let positions = db::positions::Position::get_all_open_or_closing_positions(&mut conn)?;
let current_quote = BitmexClient::get_quote(&OffsetDateTime::now_utc())
.await
.context("Failed to fetch quote from BitMEX")?;

for position in positions.iter() {
if let Err(e) = sync_position(&mut conn, position, current_quote.clone()) {
tracing::error!(position_id=%position.id, ?current_quote, "Failed to update position's unrealized pnl in database: {e:#}")
}
}

Ok(())
}

fn sync_position(
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
position: &Position,
quote: Quote,
) -> Result<()> {
let current_price = match position.direction {
trade::Direction::Long => quote.bid_price,
trade::Direction::Short => quote.ask_price,
};

let average_entry_price = Decimal::try_from(position.average_entry_price)
.context("Failed to convert average entry price to Decimal")?;

let (long_leverage, short_leverage) = match position.direction {
Direction::Long => (position.leverage, 1.0_f32),
Direction::Short => (1.0_f32, position.leverage),
};

// the position in the database is the trader's position, our direction is opposite
let direction = position.direction.opposite();

let pnl = calculate_pnl(
average_entry_price,
current_price,
position.quantity,
long_leverage,
short_leverage,
direction,
)
.context("Failed to calculate pnl for position")?;

db::positions::Position::update_unrealized_pnl(conn, position.id, pnl)
.context("Failed to update unrealized pnl in db")?;

Ok(())
}
1 change: 1 addition & 0 deletions coordinator/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ diesel::table! {
trader_pubkey -> Text,
temporary_contract_id -> Text,
realized_pnl -> Nullable<Int8>,
unrealized_pnl -> Nullable<Int8>,
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/trade/src/bitmex_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl BitmexClient {
}
}

#[derive(Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Quote {
pub bid_size: u64,
Expand Down

0 comments on commit 7f7b17a

Please sign in to comment.