Skip to content

Commit

Permalink
Periodically update positions to Closed based on DLC
Browse files Browse the repository at this point in the history
  • Loading branch information
da-kami committed Jul 20, 2023
1 parent 0a55965 commit 95d4ca8
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 0 deletions.
14 changes: 14 additions & 0 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use coordinator::logger;
use coordinator::metrics;
use coordinator::metrics::init_meter;
use coordinator::node;
use coordinator::node::closed_positions;
use coordinator::node::connection;
use coordinator::node::expired_positions;
use coordinator::node::storage::NodeStorage;
Expand Down Expand Up @@ -32,6 +33,7 @@ use tracing::metadata::LevelFilter;
const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10);
const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_secs(5);
const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(300);
const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30);
const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30);

#[tokio::main]
Expand Down Expand Up @@ -170,6 +172,18 @@ async fn main() -> Result<()> {
}
});

tokio::spawn({
let node = node.clone();
async move {
loop {
tokio::time::sleep(CLOSED_POSITION_SYNC_INTERVAL).await;
if let Err(e) = closed_positions::sync(node.clone()) {
tracing::error!("Failed to sync closed DLCs with positions in database: {e:#}");
}
}
}
});

tokio::spawn({
let node = node.clone();
connection::keep_public_channel_peers_connected(node.inner, CONNECTION_CHECK_INTERVAL)
Expand Down
41 changes: 41 additions & 0 deletions coordinator/src/db/positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::bail;
use anyhow::Result;
use autometrics::autometrics;
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use diesel::prelude::*;
use diesel::query_builder::QueryId;
use diesel::result::QueryResult;
Expand Down Expand Up @@ -67,6 +68,26 @@ impl Position {
Ok(positions)
}

#[autometrics]
pub fn get_all_open_or_closing_positions(
conn: &mut PgConnection,
) -> QueryResult<Vec<crate::position::models::Position>> {
let positions = positions::table
.filter(
positions::position_state
.eq(PositionState::Open)
.or(positions::position_state.eq(PositionState::Closing)),
)
.load::<Position>(conn)?;

let positions = positions
.into_iter()
.map(crate::position::models::Position::from)
.collect();

Ok(positions)
}

/// sets the status of all open position to closing (note, we expect that number to be always
/// exactly 1)
pub fn set_open_position_to_closing(
Expand All @@ -86,6 +107,26 @@ impl Position {
Ok(())
}

pub fn set_position_to_closed(
conn: &mut PgConnection,
trader_pubkey: PublicKey,
pnl: i64,
) -> Result<()> {
let effected_rows = diesel::update(positions::table)
.filter(positions::trader_pubkey.eq(trader_pubkey.to_string()))
.set((
positions::position_state.eq(PositionState::Closed),
positions::realized_pnl.eq(Some(pnl)),
))
.execute(conn)?;

if effected_rows == 0 {
bail!("Could not update position to Closed with realized pnl {pnl} for {trader_pubkey}")
}

Ok(())
}

/// inserts the given position into the db. Returns the position if successful
#[autometrics]
pub fn insert(
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use trade::cfd::BTCUSD_MAX_PRICE;
use trade::Direction;

pub mod channel_opening_fee;
pub mod closed_positions;
pub mod connection;
pub mod expired_positions;
pub mod order_matching_fee;
Expand Down
39 changes: 39 additions & 0 deletions coordinator/src/node/closed_positions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::db;
use crate::node::Node;
use anyhow::Context;
use anyhow::Result;
use bitcoin::hashes::hex::ToHex;

pub fn sync(node: Node) -> Result<()> {
let mut conn = node.pool.get()?;
let open_and_closing_positions =
db::positions::Position::get_all_open_or_closing_positions(&mut conn)
.context("Failed to load open and closing positions")?;

for position in open_and_closing_positions {
let contract = match node
.inner
.get_closed_contract(position.temporary_contract_id)
{
Ok(Some(closed_contract)) => closed_contract,
Ok(None) => {
tracing::trace!(position_id=%position.id, "Position not closed yet, skipping");
continue;
}
Err(e) => {
tracing::error!(position_id=%position.id, "Failed to get closed contract from DLC manager storage: {e:#}");
continue;
}
};

if let Err(e) = db::positions::Position::set_position_to_closed(
&mut conn,
position.trader,
contract.pnl,
) {
tracing::error!(position_id=%position.id, temporary_contract_id=%position.temporary_contract_id.to_hex(), pnl=%contract.pnl, "Failed to set position to closed: {e:#}")
}
}

Ok(())
}

0 comments on commit 95d4ca8

Please sign in to comment.