Skip to content

Commit

Permalink
make dns index entries by Outpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
mojoX911 authored and Shourya742 committed Jan 1, 2025
1 parent 138a331 commit 1764cf9
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 69 deletions.
6 changes: 3 additions & 3 deletions src/maker/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use crate::{
handlers::handle_message,
rpc::start_rpc_server,
},
protocol::messages::TakerToMakerMessage,
utill::{read_message, send_message, ConnectionType, DnsMetadata, DnsRequest},
protocol::messages::{DnsMetadata, DnsRequest, TakerToMakerMessage},
utill::{read_message, send_message, ConnectionType},
wallet::WalletError,
};

Expand Down Expand Up @@ -158,7 +158,7 @@ fn network_bootstrap(maker: Arc<Maker>) -> Result<(String, OptionalJoinHandle),
};

let request = DnsRequest::Post {
metadata: Box::new(dns_metadata),
metadata: dns_metadata,
};

// Keep trying until send is successful.
Expand Down
123 changes: 80 additions & 43 deletions src/market/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
//! Handles market-related logic where Makers post their offers. Also provides functions to synchronize
//! maker addresses from directory servers, post maker addresses to directory servers,
use bitcoin::{transaction::ParseOutPointError, OutPoint};
use bitcoind::bitcoincore_rpc::{self, Client, RpcApi};
use std::collections::hash_map::Entry;

use crate::{
market::rpc::start_rpc_server_thread,
protocol::messages::DnsRequest,
utill::{
get_dns_dir, parse_field, parse_toml, read_message, send_message, verify_fidelity_checks,
ConnectionType, DnsRequest,
ConnectionType,
},
wallet::{RPCConfig, WalletError},
};
Expand All @@ -18,13 +21,13 @@ use crate::{
use crate::utill::{get_tor_addrs, monitor_log_for_completion};

use std::{
cmp::Ordering,
collections::BTreeSet,
collections::HashMap,
convert::TryFrom,
fs::{self, File},
io::{BufRead, BufReader, Write},
net::{Ipv4Addr, TcpListener, TcpStream},
path::{Path, PathBuf},
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard,
Expand All @@ -42,6 +45,7 @@ pub enum DirectoryServerError {
Net(NetError),
MutexPossion,
Wallet(WalletError),
AddressFileCorrupted(String),
}

impl From<WalletError> for DirectoryServerError {
Expand Down Expand Up @@ -86,18 +90,9 @@ impl<'a, T> From<PoisonError<RwLockWriteGuard<'a, T>>> for DirectoryServerError
}
}

#[derive(Debug, Eq, PartialEq)]
pub struct AddressEntry(pub u64, pub String);

impl PartialOrd for AddressEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for AddressEntry {
fn cmp(&self, other: &Self) -> Ordering {
other.0.cmp(&self.0).then_with(|| self.1.cmp(&other.1))
impl From<ParseOutPointError> for DirectoryServerError {
fn from(value: ParseOutPointError) -> Self {
Self::AddressFileCorrupted(value.to_string())

Check warning on line 95 in src/market/directory.rs

View check run for this annotation

Codecov / codecov/patch

src/market/directory.rs#L94-L95

Added lines #L94 - L95 were not covered by tests
}
}

Expand All @@ -110,7 +105,7 @@ pub struct DirectoryServer {
pub connection_type: ConnectionType,
pub data_dir: PathBuf,
pub shutdown: AtomicBool,
pub addresses: Arc<RwLock<BTreeSet<AddressEntry>>>,
pub addresses: Arc<RwLock<HashMap<OutPoint, String>>>,
}

impl Default for DirectoryServer {
Expand All @@ -131,7 +126,7 @@ impl Default for DirectoryServer {
},
data_dir: get_dns_dir(),
shutdown: AtomicBool::new(false),
addresses: Arc::new(RwLock::new(BTreeSet::new())),
addresses: Arc::new(RwLock::new(HashMap::new())),
}
}
}
Expand Down Expand Up @@ -178,7 +173,7 @@ impl DirectoryServer {
*value = conn_type_string;

// Update the file on disk
let mut file = File::create(config_path)?;
let mut config_file = File::create(config_path)?;
let mut content = String::new();

for (i, (key, value)) in config_map.iter().enumerate() {
Expand All @@ -189,23 +184,12 @@ impl DirectoryServer {
}
}

file.write_all(content.as_bytes())?;
config_file.write_all(content.as_bytes())?;
}

let addresses = Arc::new(RwLock::new(BTreeSet::new()));
// Load all addresses from address.dat file
let address_file = data_dir.join("addresses.dat");
if let Ok(file) = File::open(&address_file) {
let reader = BufReader::new(file);
for address in reader.lines().map_while(Result::ok) {
if let Some((key, value)) = address.split_once(',') {
if let Ok(key) = key.trim().parse::<u64>() {
addresses
.write()?
.insert(AddressEntry(key, value.to_string()));
}
}
}
}
let addresses = Arc::new(RwLock::new(read_addresses_from_file(&address_file)?));
let default_dns = Self::default();

Ok(DirectoryServer {
Expand All @@ -221,6 +205,29 @@ impl DirectoryServer {
addresses,
})
}

/// Updates the in-memory address map. If entry already exists, updates the value. If new entry, inserts the value.
pub fn updated_address_map(
&self,
metadata: (String, OutPoint),
) -> Result<(), DirectoryServerError> {
match self.addresses.write()?.entry(metadata.1) {
Entry::Occupied(mut value) => {
log::info!("Maker Address Got Updated | Existing Address {} | New Address {} | Fidelity Outpoint {}", value.get(), metadata.0, metadata.1);
*value.get_mut() = metadata.0.clone();
Ok(())

Check warning on line 218 in src/market/directory.rs

View check run for this annotation

Codecov / codecov/patch

src/market/directory.rs#L215-L218

Added lines #L215 - L218 were not covered by tests
}
Entry::Vacant(value) => {
log::info!(
"New Maker Address Added {} | Fidelity Outpoint {}",
metadata.0,
metadata.1
);
value.insert(metadata.0.clone());
Ok(())
}
}
}
}

fn write_default_directory_config(config_path: &Path) -> Result<(), DirectoryServerError> {
Expand Down Expand Up @@ -258,6 +265,7 @@ pub fn start_address_writer_thread(
}
}

/// Write in-memory address data to address file
pub fn write_addresses_to_file(
directory: &Arc<DirectoryServer>,
address_file: &Path,
Expand All @@ -266,7 +274,7 @@ pub fn write_addresses_to_file(
.addresses
.read()?
.iter()
.map(|AddressEntry(addr, amount)| format!("{},{}\n", addr, amount))
.map(|(op, addr)| format!("{},{}\n", op, addr))
.collect::<Vec<String>>()
.join("");

Expand All @@ -275,6 +283,31 @@ pub fn write_addresses_to_file(
file.flush()?;
Ok(())
}

/// Read address data from file and return the HashMap
pub fn read_addresses_from_file(
path: &Path,
) -> Result<HashMap<OutPoint, String>, DirectoryServerError> {
if !path.exists() {
return Ok(HashMap::new());
}
let reader = BufReader::new(File::open(path)?);

Check warning on line 294 in src/market/directory.rs

View check run for this annotation

Codecov / codecov/patch

src/market/directory.rs#L294

Added line #L294 was not covered by tests

reader

Check warning on line 296 in src/market/directory.rs

View check run for this annotation

Codecov / codecov/patch

src/market/directory.rs#L296

Added line #L296 was not covered by tests
.lines()
.map(|line| {
let line = line?;
let (outpoint, addr) =
line.split_once(',')
.ok_or(DirectoryServerError::AddressFileCorrupted(
"deliminator missing in address.dat file".to_string(),

Check warning on line 303 in src/market/directory.rs

View check run for this annotation

Codecov / codecov/patch

src/market/directory.rs#L298-L303

Added lines #L298 - L303 were not covered by tests
))?;
let op = OutPoint::from_str(outpoint)?;
Ok((op, addr.to_string()))

Check warning on line 306 in src/market/directory.rs

View check run for this annotation

Codecov / codecov/patch

src/market/directory.rs#L305-L306

Added lines #L305 - L306 were not covered by tests
})
.collect::<Result<HashMap<_, _>, DirectoryServerError>>()
}

pub fn start_directory_server(
directory: Arc<DirectoryServer>,
rpc_config: Option<RPCConfig>,
Expand Down Expand Up @@ -407,11 +440,7 @@ fn handle_client(
current_height,
) {
Ok(_) => {
log::info!("Maker verified successfully.");
directory.addresses.write()?.insert(AddressEntry(
metadata.proof.bond.amount.to_sat(),
metadata.url.clone(),
));
directory.updated_address_map((metadata.url, metadata.proof.bond.outpoint))?;
}
Err(e) => {
log::error!(
Expand All @@ -427,16 +456,24 @@ fn handle_client(
let addresses = directory.addresses.read()?;
let response = addresses
.iter()
.fold(String::new(), |acc, AddressEntry(_, addr)| {
acc + addr + "\n"
});
.fold(String::new(), |acc, (_, addr)| acc + addr + "\n");
log::debug!("Sending Addresses: {}", response);
send_message(stream, &response)?;
}
#[cfg(feature = "integration-test")]
DnsRequest::Dummy { url } => {
// Used for IT, only checks the updated_address_map() function.
DnsRequest::Dummy { url, vout } => {

Check warning on line 465 in src/market/directory.rs

View check run for this annotation

Codecov / codecov/patch

src/market/directory.rs#L465

Added line #L465 was not covered by tests
log::info!("Got new maker address: {}", &url);
directory.addresses.write()?.insert(AddressEntry(0, url));

// Create a constant txid for tests
// Its okay to unwrap as this is test-only
let txid = bitcoin::Txid::from_str(
"c3a04e4bdf3c8684c5cf5c8b2f3c43009670bc194ac6c856b3ec9d3a7a6e2602",
)
.unwrap();
let fidelity_op = OutPoint::new(txid, vout);

Check warning on line 474 in src/market/directory.rs

View check run for this annotation

Codecov / codecov/patch

src/market/directory.rs#L474

Added line #L474 was not covered by tests

directory.updated_address_map((url, fidelity_op))?;

Check warning on line 476 in src/market/directory.rs

View check run for this annotation

Codecov / codecov/patch

src/market/directory.rs#L476

Added line #L476 was not covered by tests
}
}
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion src/market/rpc/messages.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bitcoin::OutPoint;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;

Expand All @@ -8,5 +9,5 @@ pub enum RpcMsgReq {

#[derive(Serialize, Deserialize, Debug)]
pub enum RpcMsgResp {
ListAddressesResp(BTreeSet<String>),
ListAddressesResp(BTreeSet<(OutPoint, String)>),
}
10 changes: 6 additions & 4 deletions src/market/rpc/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use bitcoin::OutPoint;

use super::{RpcMsgReq, RpcMsgResp};
use crate::{
error::NetError,
market::directory::{AddressEntry, DirectoryServer, DirectoryServerError},
market::directory::{DirectoryServer, DirectoryServerError},
utill::{read_message, send_message},
};
use std::{
collections::BTreeSet,
collections::{BTreeSet, HashMap},
io::ErrorKind,
net::{TcpListener, TcpStream},
sync::{atomic::Ordering::Relaxed, Arc, RwLock},
Expand All @@ -15,7 +17,7 @@ use std::{

fn handle_request(
socket: &mut TcpStream,
address: Arc<RwLock<BTreeSet<AddressEntry>>>,
address: Arc<RwLock<HashMap<OutPoint, String>>>,
) -> Result<(), DirectoryServerError> {
let req_bytes = read_message(socket)?;
let rpc_request: RpcMsgReq = serde_cbor::from_slice(&req_bytes).map_err(NetError::Cbor)?;
Expand All @@ -27,7 +29,7 @@ fn handle_request(
address
.read()?
.iter()
.map(|AddressEntry(_, address)| address.clone())
.map(|(op, address)| (*op, address.clone()))

Check warning on line 32 in src/market/rpc/server.rs

View check run for this annotation

Codecov / codecov/patch

src/market/rpc/server.rs#L32

Added line #L32 was not covered by tests
.collect::<BTreeSet<_>>(),
);
if let Err(e) = send_message(socket, &resp) {
Expand Down
1 change: 1 addition & 0 deletions src/protocol/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ pub struct DnsMetadata {

// Structured requests and responses using serde.
#[derive(Serialize, Deserialize, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum DnsRequest {
Post {
metadata: DnsMetadata,
Expand Down
4 changes: 2 additions & 2 deletions src/taker/offers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use socks::Socks5Stream;

use crate::{
error::NetError,
protocol::messages::Offer,
utill::{read_message, send_message, ConnectionType, DnsRequest, GLOBAL_PAUSE, NET_TIMEOUT},
protocol::messages::{DnsRequest, Offer},
utill::{read_message, send_message, ConnectionType, GLOBAL_PAUSE, NET_TIMEOUT},
};

use super::{config::TakerConfig, error::TakerError, routines::download_maker_offer};
Expand Down
Loading

0 comments on commit 1764cf9

Please sign in to comment.