diff --git a/ddk/examples/nostr.rs b/ddk/examples/nostr.rs new file mode 100644 index 0000000..654ccb0 --- /dev/null +++ b/ddk/examples/nostr.rs @@ -0,0 +1,33 @@ +use anyhow::Result; +use bitcoin::key::rand::Fill; +use bitcoin::Network; +use ddk::builder::Builder; +use ddk::oracle::memory::MemoryOracle; +use ddk::storage::memory::MemoryStorage; +use ddk::transport::nostr::NostrDlc; +use std::sync::Arc; + +type NostrDdk = ddk::DlcDevKit; + +#[tokio::main] +async fn main() -> Result<()> { + let mut seed_bytes = [0u8; 32]; + seed_bytes.try_fill(&mut bitcoin::key::rand::thread_rng())?; + + let transport = + Arc::new(NostrDlc::new(&seed_bytes, "wss://nostr.dlcdevkit.com", Network::Regtest).await?); + let storage = Arc::new(MemoryStorage::new()); + let oracle_client = Arc::new(MemoryOracle::default()); + + let mut builder = Builder::new(); + builder.set_seed_bytes(seed_bytes); + builder.set_transport(transport.clone()); + builder.set_storage(storage.clone()); + builder.set_oracle(oracle_client.clone()); + + let ddk: NostrDdk = builder.finish().await?; + + ddk.start().expect("couldn't start ddk"); + + loop {} +} diff --git a/ddk/src/builder.rs b/ddk/src/builder.rs index e036d69..58a028d 100644 --- a/ddk/src/builder.rs +++ b/ddk/src/builder.rs @@ -152,6 +152,7 @@ impl Builder { let esplora_client = Arc::new(EsploraClient::new(&self.esplora_host, self.network)?); let (sender, receiver) = unbounded::(); + let (stop_signal_sender, stop_signal) = tokio::sync::watch::channel(false); let manager = Arc::new( Manager::new( @@ -177,6 +178,8 @@ impl Builder { storage, oracle, network: self.network, + stop_signal, + stop_signal_sender, }) } } diff --git a/ddk/src/ddk.rs b/ddk/src/ddk.rs index 9281029..4af87a8 100644 --- a/ddk/src/ddk.rs +++ b/ddk/src/ddk.rs @@ -17,6 +17,7 @@ use dlc_messages::{AcceptDlc, Message, OfferDlc}; use std::sync::{Arc, RwLock}; use std::time::Duration; use tokio::runtime::Runtime; +use tokio::sync::watch; /// DlcDevKit type alias for the [ddk_manager::manager::Manager] pub type DlcDevKitDlcManager = ddk_manager::manager::Manager< @@ -55,6 +56,8 @@ pub struct DlcDevKit { pub storage: Arc, pub oracle: Arc, pub network: Network, + pub stop_signal: watch::Receiver, + pub stop_signal_sender: watch::Sender, } impl DlcDevKit @@ -82,15 +85,13 @@ where let receiver_clone = self.receiver.clone(); runtime.spawn(async move { Self::run_manager(manager_clone, receiver_clone).await }); - let transport_clone = self.transport.clone(); - runtime.spawn(async move { - transport_clone.listen().await; - }); - let transport_clone = self.transport.clone(); let manager_clone = self.manager.clone(); + let stop_signal = self.stop_signal.clone(); runtime.spawn(async move { - transport_clone.receive_messages(manager_clone).await; + if let Err(e) = transport_clone.start(stop_signal, manager_clone).await { + tracing::error!(error = e.to_string(), "Error in transport listeners."); + } }); let wallet_clone = self.wallet.clone(); @@ -133,6 +134,8 @@ where } pub fn stop(&self) -> anyhow::Result<()> { + tracing::warn!("Shutting down DDK runtime and listeners."); + self.stop_signal_sender.send(true)?; let mut runtime_lock = self.runtime.write().unwrap(); if let Some(rt) = runtime_lock.take() { rt.shutdown_background(); @@ -195,15 +198,13 @@ where oracle_announcements: Vec, ) -> anyhow::Result { let (responder, receiver) = unbounded(); - self.sender - .send(DlcManagerMessage::OfferDlc { - contract_input: contract_input.to_owned(), - counter_party, - oracle_announcements, - responder, - }) - .expect("sending offer message"); - let offer = receiver.recv().expect("no offer dlc"); + self.sender.send(DlcManagerMessage::OfferDlc { + contract_input: contract_input.to_owned(), + counter_party, + oracle_announcements, + responder, + })?; + let offer = receiver.recv()?; let contract_id = hex::encode(offer.temporary_contract_id); self.transport diff --git a/ddk/src/lib.rs b/ddk/src/lib.rs index f7d42ff..15c205a 100644 --- a/ddk/src/lib.rs +++ b/ddk/src/lib.rs @@ -21,9 +21,7 @@ pub mod transport; pub mod util; /// The internal [`bdk_wallet::PersistedWallet`]. pub mod wallet; -use std::sync::Arc; -use bdk_wallet::ChangeSet; /// DDK object with all services pub use ddk::DlcDevKit; pub use ddk::DlcManagerMessage; @@ -32,11 +30,14 @@ pub use ddk::DlcManagerMessage; pub const DEFAULT_NOSTR_RELAY: &str = "wss://nostr.dlcdevkit.com"; use async_trait::async_trait; +use bdk_wallet::ChangeSet; use bitcoin::secp256k1::{PublicKey, SecretKey}; use ddk::DlcDevKitDlcManager; use dlc_messages::oracle_msgs::OracleAnnouncement; use dlc_messages::Message; use error::WalletError; +use std::sync::Arc; +use tokio::sync::watch; use transport::PeerInformation; #[async_trait] @@ -46,13 +47,12 @@ pub trait Transport: Send + Sync + 'static { fn name(&self) -> String; /// Get the public key of the transport. fn public_key(&self) -> PublicKey; - /// Open an incoming listener for DLC messages from peers. - async fn listen(&self); /// Get messages that have not been processed yet. - async fn receive_messages( + async fn start( &self, + mut stop_signal: watch::Receiver, manager: Arc>, - ); + ) -> Result<(), anyhow::Error>; /// Send a message to a specific counterparty. fn send_message(&self, counterparty: PublicKey, message: Message); /// Connect to another peer diff --git a/ddk/src/transport/memory.rs b/ddk/src/transport/memory.rs index ce575c2..7883faa 100644 --- a/ddk/src/transport/memory.rs +++ b/ddk/src/transport/memory.rs @@ -11,6 +11,7 @@ use bitcoin::{ }; use crossbeam::channel::{unbounded, Receiver, Sender}; use dlc_messages::Message; +use tokio::sync::watch; type CounterPartyTransport = Arc>>>; pub struct MemoryTransport { @@ -61,33 +62,39 @@ impl Transport for MemoryTransport { } } - async fn listen(&self) { - tracing::info!("Listening on memory listener") - } - - async fn receive_messages( + async fn start( &self, + mut stop_receiver: watch::Receiver, manager: Arc>, - ) { + ) -> Result<(), anyhow::Error> { let mut timer = tokio::time::interval(Duration::from_secs(1)); loop { - timer.tick().await; - if let Ok(msg) = self.receiver.recv() { - match manager.on_dlc_message(&msg.0, msg.1).await { - Ok(s) => { - if let Some(reply) = s { - self.send_message(msg.1, reply); - } else { - tracing::info!("Handled on_dlc_message."); + tokio::select! { + _ = stop_receiver.changed() => { + if *stop_receiver.borrow() { + break; + } + }, + _ = timer.tick() => { + if let Ok(msg) = self.receiver.recv() { + match manager.on_dlc_message(&msg.0, msg.1).await { + Ok(s) => { + if let Some(reply) = s { + self.send_message(msg.1, reply); + } else { + tracing::info!("Handled on_dlc_message."); + } + } + Err(e) => tracing::error!( + error = e.to_string(), + "In memory transport error on dlc message." + ), } } - Err(e) => tracing::error!( - error = e.to_string(), - "In memory transport error on dlc message." - ), } } } + Ok(()) } async fn connect_outbound(&self, _pubkey: PublicKey, _host: &str) { diff --git a/ddk/src/transport/nostr/messages.rs b/ddk/src/transport/nostr/messages.rs index 0357311..7a43ed5 100644 --- a/ddk/src/transport/nostr/messages.rs +++ b/ddk/src/transport/nostr/messages.rs @@ -50,6 +50,11 @@ pub fn handle_dlc_msg_event( if event.kind != Kind::Custom(8_888) { return Err(anyhow::anyhow!("Event reveived was not DLC Message event.")); } + tracing::info!( + kind = 8_888, + pubkey = event.pubkey.to_string(), + "Received DLC message event." + ); let message = parse_dlc_msg_event(&event, secret_key)?; diff --git a/ddk/src/transport/nostr/mod.rs b/ddk/src/transport/nostr/mod.rs index d9b1b44..130ab12 100644 --- a/ddk/src/transport/nostr/mod.rs +++ b/ddk/src/transport/nostr/mod.rs @@ -2,14 +2,16 @@ mod messages; mod relay_handler; pub use relay_handler::NostrDlc; +use tokio::sync::watch; use crate::{DlcDevKitDlcManager, Oracle, Storage, Transport}; +use async_trait::async_trait; use bitcoin::secp256k1::PublicKey as BitcoinPublicKey; use dlc_messages::Message; use nostr_rs::PublicKey; use std::sync::Arc; -#[async_trait::async_trait] +#[async_trait] impl Transport for NostrDlc { fn name(&self) -> String { "nostr".to_string() @@ -20,16 +22,19 @@ impl Transport for NostrDlc { .expect("Should not fail converting nostr key to bitcoin key.") } - async fn listen(&self) { - self.listen().await.expect("Did not start nostr listener."); - } - /// Get messages that have not been processed yet. - async fn receive_messages( + async fn start( &self, + mut stop_signal: watch::Receiver, manager: Arc>, - ) { - self.receive_dlc_messages(manager).await + ) -> Result<(), anyhow::Error> { + let listen_handle = self.start(stop_signal.clone(), manager); + + // Wait for either task to complete or stop signal + tokio::select! { + _ = stop_signal.changed() => Ok(()), + res = listen_handle => res?, + } } /// Send a message to a specific counterparty. fn send_message(&self, counterparty: BitcoinPublicKey, message: Message) { diff --git a/ddk/src/transport/nostr/relay_handler.rs b/ddk/src/transport/nostr/relay_handler.rs index 6cc7ea3..3269bac 100644 --- a/ddk/src/transport/nostr/relay_handler.rs +++ b/ddk/src/transport/nostr/relay_handler.rs @@ -4,8 +4,10 @@ use crate::DlcDevKitDlcManager; use crate::{Oracle, Storage}; use bitcoin::bip32::Xpriv; use bitcoin::Network; -use nostr_rs::{secp256k1::Secp256k1, Keys, PublicKey, SecretKey, Timestamp, Url}; +use nostr_rs::{secp256k1::Secp256k1, Keys, PublicKey, Timestamp, Url}; use nostr_sdk::{Client, RelayPoolNotification}; +use tokio::sync::watch; +use tokio::task::JoinHandle; pub struct NostrDlc { pub keys: Keys, @@ -14,18 +16,20 @@ pub struct NostrDlc { } impl NostrDlc { - pub fn new( + pub async fn new( seed_bytes: &[u8; 32], relay_host: &str, network: Network, ) -> anyhow::Result { + tracing::info!("Creating Nostr Dlc handler."); let secp = Secp256k1::new(); let seed = Xpriv::new_master(network, seed_bytes)?; - let secret_key = SecretKey::from_slice(&seed.encode())?; - let keys = Keys::new_with_ctx(&secp, secret_key.into()); + let keys = Keys::new_with_ctx(&secp, seed.private_key.into()); let relay_url = relay_host.parse()?; let client = Client::new(&keys); + client.add_relay(&relay_url).await?; + client.connect().await; Ok(NostrDlc { keys, @@ -38,68 +42,78 @@ impl NostrDlc { self.keys.public_key() } - pub async fn listen(&self) -> anyhow::Result { - let client = Client::new(&self.keys); - - let since = Timestamp::now(); - - client.add_relay(&self.relay_url).await?; - - let msg_subscription = super::messages::create_dlc_message_filter(since, self.public_key()); - // Removing the oracle messages for right now. - // let oracle_subscription = super::messages::create_oracle_message_filter(since); - - client.subscribe(vec![msg_subscription], None).await?; - - client.connect().await; - - Ok(client) - } - - pub async fn receive_dlc_messages( + pub fn start( &self, + mut stop_signal: watch::Receiver, manager: Arc>, - ) { - while let Ok(notification) = self.client.notifications().recv().await { - match notification { - RelayPoolNotification::Event { - relay_url: _, - subscription_id: _, - event, - } => { - let (pubkey, message, event) = match super::messages::handle_dlc_msg_event( - &event, - &self.keys.secret_key(), - ) { - Ok(msg) => (msg.0, msg.1, msg.2), - Err(_) => { - tracing::error!("Could not parse event {}", event.id); - continue; + ) -> JoinHandle> { + let public_key = self.public_key(); + tracing::info!( + pubkey = public_key.to_string(), + "Starting Nostr DLC listener." + ); + let nostr_client = self.client.clone(); + let keys = self.keys.clone(); + tokio::spawn(async move { + let since = Timestamp::now(); + let msg_subscription = super::messages::create_dlc_message_filter(since, public_key); + nostr_client.subscribe(vec![msg_subscription], None).await?; + let mut notifications = nostr_client.notifications(); + loop { + tokio::select! { + _ = stop_signal.changed() => { + if *stop_signal.borrow() { + tracing::warn!("Stopping nostr dlc message subscription."); + break; } - }; + }, + Ok(notification) = notifications.recv() => { + match notification { + RelayPoolNotification::Event { + relay_url: _, + subscription_id: _, + event, + } => { + let (pubkey, message, event) = match super::messages::handle_dlc_msg_event( + &event, + &keys.secret_key(), + ) { + Ok(msg) => { + tracing::info!(pubkey=msg.0.to_string(), "Received DLC nostr message."); + (msg.0, msg.1, msg.2) + }, + Err(_) => { + tracing::error!("Could not parse event {}", event.id); + continue; + } + }; - match manager.on_dlc_message(&message, pubkey).await { - Ok(Some(msg)) => { - let event = super::messages::create_dlc_msg_event( - event.pubkey, - Some(event.id), - msg, - &self.keys, - ) - .expect("no message"); - self.client - .send_event(event) - .await - .expect("Break out into functions."); - } - Ok(None) => (), - Err(_) => { - // handle the error case and send + match manager.on_dlc_message(&message, pubkey).await { + Ok(Some(msg)) => { + let event = super::messages::create_dlc_msg_event( + event.pubkey, + Some(event.id), + msg, + &keys, + ) + .expect("no message"); + nostr_client + .send_event(event) + .await + .expect("Break out into functions."); + } + Ok(None) => (), + Err(_) => { + // handle the error case and send + } + } + } + _ => () } } } - other => println!("Other event: {:?}", other), } - } + Ok::<_, anyhow::Error>(()) + }) } }