Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/track pnl #966

Merged
merged 5 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
ALTER TABLE
"positions" DROP COLUMN "temporary_contract_id";
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Your SQL goes here
ALTER TABLE
positions
ADD
COLUMN "temporary_contract_id" TEXT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file should undo anything in `up.sql`
-- ... but in this case it does not fully.
-- Postgres does not allow removing enum type values. One can only re-create an enum type with fewer values and replace the references.
-- However, there is no proper way to replace the values to be removed where they are used (i.e. referenced in `positions` table)
-- We opt to NOT remove enum values that were added at a later point.
ALTER TABLE
"positions" DROP COLUMN "realized_pnl";
10 changes: 10 additions & 0 deletions coordinator/migrations/2023-07-19-055143_closed_positions/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Your SQL goes here
-- Note that the `IF NOT EXISTS` is essential because there is no `down` migration for removing this value because it is not really feasible to remove enum values!
-- In order to allow re-running this migration we thus have to make sure to only add the value if it does not exist yet.
ALTER TYPE "PositionState_Type"
ADD
VALUE IF NOT EXISTS 'Closed';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel good state ☺️

ALTER TABLE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙃 Maybe not 100% related to your change, but it would be great if we'd also save the closing price on the position and not just the pnl.

Copy link
Contributor Author

@da-kami da-kami Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will see to add the closing price in a follow up, see #967 (comment)

positions
ADD
COLUMN "realized_pnl" BIGINT;
124 changes: 16 additions & 108 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
use anyhow::Context;
use anyhow::Result;
use coordinator::cli::Opts;
use coordinator::db;
use coordinator::logger;
use coordinator::metrics;
use coordinator::metrics::init_meter;
use coordinator::node;
use coordinator::node::closed_positions;
use coordinator::node::connection;
use coordinator::node::expired_positions;
use coordinator::node::storage::NodeStorage;
use coordinator::node::Node;
use coordinator::node::TradeAction;
use coordinator::position::models::Position;
use coordinator::position::models::PositionState;
use coordinator::routes::router;
use coordinator::run_migration;
use coordinator::settings::Settings;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use diesel::PgConnection;
use hex::FromHex;
use lightning::ln::PaymentHash;
use lightning::util::events::Event;
use ln_dlc_node::seed::Bip39Seed;
use rand::thread_rng;
Expand All @@ -30,15 +26,14 @@ use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::watch;
use tokio::task::spawn_blocking;
use tracing::metadata::LevelFilter;
use trade::bitmex_client::BitmexClient;

const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10);
const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_secs(5);
const POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(300);
const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(300);
const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30);
const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30);

#[tokio::main]
Expand Down Expand Up @@ -171,106 +166,19 @@ async fn main() -> Result<()> {
let node = node.clone();
async move {
loop {
tokio::time::sleep(POSITION_SYNC_INTERVAL).await;

let mut conn = match node.pool.get() {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Failed to get pool connection. Error: {e:?}");
continue;
}
};

let positions = match db::positions::Position::get_all_open_positions(&mut conn) {
Ok(positions) => positions,
Err(e) => {
tracing::error!("Failed to get positions. Error: {e:?}");
continue;
}
};

let positions = positions
.into_iter()
.filter(|p| {
p.position_state == PositionState::Open
&& OffsetDateTime::now_utc().ge(&p.expiry_timestamp)
})
.collect::<Vec<Position>>();

for position in positions.iter() {
tracing::trace!(trader_pk=%position.trader, %position.expiry_timestamp, "Attempting to close expired position");

if !node.is_connected(&position.trader) {
tracing::debug!(
"Could not close expired position with {} as trader is not connected.",
position.trader
);
continue;
}

let channel_id = match node.decide_trade_action(&position.trader) {
Ok(TradeAction::Close(channel_id)) => channel_id,
Ok(_) => {
tracing::error!(
?position,
"Unable to find sub channel of expired position."
);
continue;
}
Err(e) => {
tracing::error!(
?position,
"Failed to decide trade action. Error: {e:?}"
);
continue;
}
};

let closing_price =
match BitmexClient::get_quote(&position.expiry_timestamp).await {
Ok(quote) => match position.direction {
trade::Direction::Long => quote.bid_price,
trade::Direction::Short => quote.ask_price,
},
Err(e) => {
tracing::warn!(
"Failed to get quote from bitmex for {} at {}. Error: {e:?}",
position.trader,
position.expiry_timestamp
);
continue;
}
};

// Upon collab closing an expired position we cannot charge a fee using an
// invoice. This dummy hash exists in the database to
// represent zero-amount invoices.
let zero_amount_payment_hash_dummy = PaymentHash(
<[u8; 32]>::from_hex(
"6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210",
)
.expect("static payment hash to decode"),
);
tokio::time::sleep(EXPIRED_POSITION_SYNC_INTERVAL).await;
expired_positions::close(node.clone()).await;
}
}
});

match node
.close_position(
position,
closing_price,
channel_id,
zero_amount_payment_hash_dummy,
)
.await
{
Ok(_) => tracing::info!(
"Successfully proposed to close expired position with {}",
position.trader
),
Err(e) => tracing::warn!(
?position,
"Failed to close expired position with {}. Error: {e:?}",
position.trader
),
}
tokio::spawn({
let node = node.clone();
async move {
loop {
tokio::time::sleep(CLOSED_POSITION_SYNC_INTERVAL).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Whats the rule of when to put an interval setting into the config file and when static on top? 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh, I did not think about putting it into the config file. My rule of thumb would be: if we think we want to configure it we configure it. Is this relevant for the tests? If so then I'd add it to the config file so we can potentially use shorter intervals for the tests 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need it in the config I'll address it in a follow up!

if let Err(e) = closed_positions::sync(node.clone()) {
tracing::error!("Failed to sync closed DLCs with positions in database: {e:#}");
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions coordinator/src/db/custom_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl ToSql<PositionStateType, Pg> for PositionState {
match *self {
PositionState::Open => out.write_all(b"Open")?,
PositionState::Closing => out.write_all(b"Closing")?,
PositionState::Closed => out.write_all(b"Closed")?,
}
Ok(IsNull::No)
}
Expand All @@ -53,6 +54,7 @@ impl FromSql<PositionStateType, Pg> for PositionState {
match bytes.as_bytes() {
b"Open" => Ok(PositionState::Open),
b"Closing" => Ok(PositionState::Closing),
b"Closed" => Ok(PositionState::Closed),
_ => Err("Unrecognized enum variant".into()),
}
}
Expand Down
65 changes: 60 additions & 5 deletions coordinator/src/db/positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use crate::schema::sql_types::PositionStateType;
use anyhow::bail;
use anyhow::Result;
use autometrics::autometrics;
use bitcoin::hashes::hex::ToHex;
use diesel::prelude::*;
use diesel::query_builder::QueryId;
use diesel::result::QueryResult;
use diesel::AsExpression;
use diesel::FromSqlRow;
use dlc_manager::ContractId;
use hex::FromHex;
use std::any::TypeId;
use time::OffsetDateTime;

Expand All @@ -28,6 +31,8 @@ pub struct Position {
pub expiry_timestamp: OffsetDateTime,
pub update_timestamp: OffsetDateTime,
pub trader_pubkey: String,
pub temporary_contract_id: String,
pub realized_pnl: Option<i64>,
}

impl Position {
Expand Down Expand Up @@ -62,6 +67,25 @@ impl Position {
Ok(positions)
}

pub fn get_all_open_or_closing_positions(
conn: &mut PgConnection,
) -> QueryResult<Vec<crate::position::models::Position>> {
let positions = positions::table
.filter(
positions::position_state
.eq(PositionState::Open)
.or(positions::position_state.eq(PositionState::Closing)),
)
.load::<Position>(conn)?;

let positions = positions
.into_iter()
.map(crate::position::models::Position::from)
.collect();

Ok(positions)
}

/// sets the status of all open position to closing (note, we expect that number to be always
/// exactly 1)
pub fn set_open_position_to_closing(
Expand All @@ -71,7 +95,10 @@ impl Position {
let effected_rows = diesel::update(positions::table)
.filter(positions::trader_pubkey.eq(trader_pubkey.clone()))
.filter(positions::position_state.eq(PositionState::Open))
.set(positions::position_state.eq(PositionState::Closing))
.set((
positions::position_state.eq(PositionState::Closing),
positions::update_timestamp.eq(OffsetDateTime::now_utc()),
))
.execute(conn)?;

if effected_rows == 0 {
Expand All @@ -81,6 +108,23 @@ impl Position {
Ok(())
}

pub fn set_position_to_closed(conn: &mut PgConnection, id: i32, pnl: i64) -> Result<()> {
let effected_rows = diesel::update(positions::table)
.filter(positions::id.eq(id))
.set((
positions::position_state.eq(PositionState::Closed),
positions::realized_pnl.eq(Some(pnl)),
positions::update_timestamp.eq(OffsetDateTime::now_utc()),
))
.execute(conn)?;

if effected_rows == 0 {
bail!("Could not update position to Closed with realized pnl {pnl} for position {id}")
}

Ok(())
}

/// inserts the given position into the db. Returns the position if successful
#[autometrics]
pub fn insert(
Expand All @@ -105,12 +149,17 @@ impl From<Position> for crate::position::models::Position {
direction: trade::Direction::from(value.direction),
average_entry_price: value.average_entry_price,
liquidation_price: value.liquidation_price,
position_state: crate::position::models::PositionState::from(value.position_state),
position_state: crate::position::models::PositionState::from((
value.position_state,
value.realized_pnl,
)),
collateral: value.collateral,
creation_timestamp: value.creation_timestamp,
expiry_timestamp: value.expiry_timestamp,
update_timestamp: value.update_timestamp,
trader: value.trader_pubkey.parse().expect("to be valid public key"),
temporary_contract_id: ContractId::from_hex(value.temporary_contract_id.as_str())
.expect("contract id to decode"),
}
}
}
Expand All @@ -128,6 +177,7 @@ struct NewPosition {
pub collateral: i64,
pub expiry_timestamp: OffsetDateTime,
pub trader_pubkey: String,
pub temporary_contract_id: String,
}

impl From<crate::position::models::NewPosition> for NewPosition {
Expand All @@ -143,6 +193,7 @@ impl From<crate::position::models::NewPosition> for NewPosition {
collateral: value.collateral,
expiry_timestamp: value.expiry_timestamp,
trader_pubkey: value.trader.to_string(),
temporary_contract_id: value.temporary_contract_id.to_hex(),
}
}
}
Expand All @@ -152,6 +203,7 @@ impl From<crate::position::models::NewPosition> for NewPosition {
pub enum PositionState {
Open,
Closing,
Closed,
}

impl QueryId for PositionStateType {
Expand All @@ -163,11 +215,14 @@ impl QueryId for PositionStateType {
}
}

impl From<PositionState> for crate::position::models::PositionState {
fn from(value: PositionState) -> Self {
match value {
impl From<(PositionState, Option<i64>)> for crate::position::models::PositionState {
fn from((position_state, realized_pnl): (PositionState, Option<i64>)) -> Self {
match position_state {
PositionState::Open => crate::position::models::PositionState::Open,
PositionState::Closing => crate::position::models::PositionState::Closing,
PositionState::Closed => crate::position::models::PositionState::Closed {
pnl: realized_pnl.expect("realized pnl to be set when position is closed"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Not sure if I like putting the pnl into the enum. Why not simply keep it as option on the position itself? I think that would also make the transformation from and to the database easier?

Copy link
Contributor Author

@da-kami da-kami Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me this is more idiomatic in rust code. This makes it clear that this pnl is only there in Closed state. I think this is more accurate for the model in the business logic. The database cannot easily depict that so I flattened it out in the db. But that's an implementation detail for me :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this is exactly what you want from Rust.

},
}
}
}
Expand Down
Loading