Skip to content

Commit

Permalink
Nostr transport v2
Browse files Browse the repository at this point in the history
  • Loading branch information
bennyhodl committed Dec 17, 2024
1 parent efc100f commit f5b0fec
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 106 deletions.
33 changes: 33 additions & 0 deletions ddk/examples/nostr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use anyhow::Result;
use bitcoin::key::rand::Fill;
use bitcoin::Network;
use ddk::builder::Builder;
use ddk::oracle::memory::MemoryOracle;
use ddk::storage::memory::MemoryStorage;
use ddk::transport::nostr::NostrDlc;
use std::sync::Arc;

type NostrDdk = ddk::DlcDevKit<NostrDlc, MemoryStorage, MemoryOracle>;

#[tokio::main]
async fn main() -> Result<()> {
let mut seed_bytes = [0u8; 32];
seed_bytes.try_fill(&mut bitcoin::key::rand::thread_rng())?;

let transport =
Arc::new(NostrDlc::new(&seed_bytes, "wss://nostr.dlcdevkit.com", Network::Regtest).await?);
let storage = Arc::new(MemoryStorage::new());
let oracle_client = Arc::new(MemoryOracle::default());

let mut builder = Builder::new();
builder.set_seed_bytes(seed_bytes);
builder.set_transport(transport.clone());
builder.set_storage(storage.clone());
builder.set_oracle(oracle_client.clone());

let ddk: NostrDdk = builder.finish().await?;

ddk.start().expect("couldn't start ddk");

loop {}
}
3 changes: 3 additions & 0 deletions ddk/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl<T: Transport, S: Storage, O: Oracle> Builder<T, S, O> {
let esplora_client = Arc::new(EsploraClient::new(&self.esplora_host, self.network)?);

let (sender, receiver) = unbounded::<DlcManagerMessage>();
let (stop_signal_sender, stop_signal) = tokio::sync::watch::channel(false);

let manager = Arc::new(
Manager::new(
Expand All @@ -177,6 +178,8 @@ impl<T: Transport, S: Storage, O: Oracle> Builder<T, S, O> {
storage,
oracle,
network: self.network,
stop_signal,
stop_signal_sender,
})
}
}
31 changes: 16 additions & 15 deletions ddk/src/ddk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use dlc_messages::{AcceptDlc, Message, OfferDlc};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::sync::watch;

/// DlcDevKit type alias for the [ddk_manager::manager::Manager]
pub type DlcDevKitDlcManager<S, O> = ddk_manager::manager::Manager<
Expand Down Expand Up @@ -55,6 +56,8 @@ pub struct DlcDevKit<T: Transport, S: Storage, O: Oracle> {
pub storage: Arc<S>,
pub oracle: Arc<O>,
pub network: Network,
pub stop_signal: watch::Receiver<bool>,
pub stop_signal_sender: watch::Sender<bool>,
}

impl<T, S, O> DlcDevKit<T, S, O>
Expand Down Expand Up @@ -82,15 +85,13 @@ where
let receiver_clone = self.receiver.clone();
runtime.spawn(async move { Self::run_manager(manager_clone, receiver_clone).await });

let transport_clone = self.transport.clone();
runtime.spawn(async move {
transport_clone.listen().await;
});

let transport_clone = self.transport.clone();
let manager_clone = self.manager.clone();
let stop_signal = self.stop_signal.clone();
runtime.spawn(async move {
transport_clone.receive_messages(manager_clone).await;
if let Err(e) = transport_clone.start(stop_signal, manager_clone).await {
tracing::error!(error = e.to_string(), "Error in transport listeners.");
}
});

let wallet_clone = self.wallet.clone();
Expand Down Expand Up @@ -133,6 +134,8 @@ where
}

pub fn stop(&self) -> anyhow::Result<()> {
tracing::warn!("Shutting down DDK runtime and listeners.");
self.stop_signal_sender.send(true)?;
let mut runtime_lock = self.runtime.write().unwrap();
if let Some(rt) = runtime_lock.take() {
rt.shutdown_background();
Expand Down Expand Up @@ -195,15 +198,13 @@ where
oracle_announcements: Vec<OracleAnnouncement>,
) -> anyhow::Result<OfferDlc> {
let (responder, receiver) = unbounded();
self.sender
.send(DlcManagerMessage::OfferDlc {
contract_input: contract_input.to_owned(),
counter_party,
oracle_announcements,
responder,
})
.expect("sending offer message");
let offer = receiver.recv().expect("no offer dlc");
self.sender.send(DlcManagerMessage::OfferDlc {
contract_input: contract_input.to_owned(),
counter_party,
oracle_announcements,
responder,
})?;
let offer = receiver.recv()?;

let contract_id = hex::encode(offer.temporary_contract_id);
self.transport
Expand Down
12 changes: 6 additions & 6 deletions ddk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ pub mod transport;
pub mod util;
/// The internal [`bdk_wallet::PersistedWallet`].
pub mod wallet;
use std::sync::Arc;

use bdk_wallet::ChangeSet;
/// DDK object with all services
pub use ddk::DlcDevKit;
pub use ddk::DlcManagerMessage;
Expand All @@ -32,11 +30,14 @@ pub use ddk::DlcManagerMessage;
pub const DEFAULT_NOSTR_RELAY: &str = "wss://nostr.dlcdevkit.com";

use async_trait::async_trait;
use bdk_wallet::ChangeSet;
use bitcoin::secp256k1::{PublicKey, SecretKey};
use ddk::DlcDevKitDlcManager;
use dlc_messages::oracle_msgs::OracleAnnouncement;
use dlc_messages::Message;
use error::WalletError;
use std::sync::Arc;
use tokio::sync::watch;
use transport::PeerInformation;

#[async_trait]
Expand All @@ -46,13 +47,12 @@ pub trait Transport: Send + Sync + 'static {
fn name(&self) -> String;
/// Get the public key of the transport.
fn public_key(&self) -> PublicKey;
/// Open an incoming listener for DLC messages from peers.
async fn listen(&self);
/// Get messages that have not been processed yet.
async fn receive_messages<S: Storage, O: Oracle>(
async fn start<S: Storage, O: Oracle>(
&self,
mut stop_signal: watch::Receiver<bool>,
manager: Arc<DlcDevKitDlcManager<S, O>>,
);
) -> Result<(), anyhow::Error>;
/// Send a message to a specific counterparty.
fn send_message(&self, counterparty: PublicKey, message: Message);
/// Connect to another peer
Expand Down
43 changes: 25 additions & 18 deletions ddk/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use bitcoin::{
};
use crossbeam::channel::{unbounded, Receiver, Sender};
use dlc_messages::Message;
use tokio::sync::watch;

type CounterPartyTransport = Arc<Mutex<HashMap<PublicKey, Sender<(Message, PublicKey)>>>>;
pub struct MemoryTransport {
Expand Down Expand Up @@ -61,33 +62,39 @@ impl Transport for MemoryTransport {
}
}

async fn listen(&self) {
tracing::info!("Listening on memory listener")
}

async fn receive_messages<S: Storage, O: Oracle>(
async fn start<S: Storage, O: Oracle>(
&self,
mut stop_receiver: watch::Receiver<bool>,
manager: Arc<DlcDevKitDlcManager<S, O>>,
) {
) -> Result<(), anyhow::Error> {
let mut timer = tokio::time::interval(Duration::from_secs(1));
loop {
timer.tick().await;
if let Ok(msg) = self.receiver.recv() {
match manager.on_dlc_message(&msg.0, msg.1).await {
Ok(s) => {
if let Some(reply) = s {
self.send_message(msg.1, reply);
} else {
tracing::info!("Handled on_dlc_message.");
tokio::select! {
_ = stop_receiver.changed() => {
if *stop_receiver.borrow() {
break;
}
},
_ = timer.tick() => {
if let Ok(msg) = self.receiver.recv() {
match manager.on_dlc_message(&msg.0, msg.1).await {
Ok(s) => {
if let Some(reply) = s {
self.send_message(msg.1, reply);
} else {
tracing::info!("Handled on_dlc_message.");
}
}
Err(e) => tracing::error!(
error = e.to_string(),
"In memory transport error on dlc message."
),
}
}
Err(e) => tracing::error!(
error = e.to_string(),
"In memory transport error on dlc message."
),
}
}
}
Ok(())
}

async fn connect_outbound(&self, _pubkey: PublicKey, _host: &str) {
Expand Down
5 changes: 5 additions & 0 deletions ddk/src/transport/nostr/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ pub fn handle_dlc_msg_event(
if event.kind != Kind::Custom(8_888) {
return Err(anyhow::anyhow!("Event reveived was not DLC Message event."));
}
tracing::info!(
kind = 8_888,
pubkey = event.pubkey.to_string(),
"Received DLC message event."
);

let message = parse_dlc_msg_event(&event, secret_key)?;

Expand Down
21 changes: 13 additions & 8 deletions ddk/src/transport/nostr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ mod messages;
mod relay_handler;

pub use relay_handler::NostrDlc;
use tokio::sync::watch;

use crate::{DlcDevKitDlcManager, Oracle, Storage, Transport};
use async_trait::async_trait;
use bitcoin::secp256k1::PublicKey as BitcoinPublicKey;
use dlc_messages::Message;
use nostr_rs::PublicKey;
use std::sync::Arc;

#[async_trait::async_trait]
#[async_trait]
impl Transport for NostrDlc {
fn name(&self) -> String {
"nostr".to_string()
Expand All @@ -20,16 +22,19 @@ impl Transport for NostrDlc {
.expect("Should not fail converting nostr key to bitcoin key.")
}

async fn listen(&self) {
self.listen().await.expect("Did not start nostr listener.");
}

/// Get messages that have not been processed yet.
async fn receive_messages<S: Storage, O: Oracle>(
async fn start<S: Storage, O: Oracle>(
&self,
mut stop_signal: watch::Receiver<bool>,
manager: Arc<DlcDevKitDlcManager<S, O>>,
) {
self.receive_dlc_messages(manager).await
) -> Result<(), anyhow::Error> {
let listen_handle = self.start(stop_signal.clone(), manager);

// Wait for either task to complete or stop signal
tokio::select! {
_ = stop_signal.changed() => Ok(()),
res = listen_handle => res?,
}
}
/// Send a message to a specific counterparty.
fn send_message(&self, counterparty: BitcoinPublicKey, message: Message) {
Expand Down
Loading

0 comments on commit f5b0fec

Please sign in to comment.