Skip to content

Commit

Permalink
remove file persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
Shourya742 committed Jan 9, 2025
1 parent 2c2c58a commit b209f12
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 175 deletions.
94 changes: 44 additions & 50 deletions src/maker/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,63 +157,57 @@ fn network_bootstrap(maker: Arc<Maker>) -> Result<Option<Child>, MakerError> {
metadata: dns_metadata,
};

let _directory_refresh_handle = thread::Builder::new()
.name("Directory refresh handle".to_string())
.spawn(move || {
let outer_interval_duration =
Duration::from_secs(DIRECTORY_SERVERS_REFRESH_INTERVAL_SECS);

while !maker.shutdown.load(Relaxed) {
while !maker.shutdown.load(Relaxed) {
let stream = match maker.config.connection_type {
ConnectionType::CLEARNET => TcpStream::connect(&dns_address),
#[cfg(feature = "tor")]
ConnectionType::TOR => Socks5Stream::connect(
format!("127.0.0.1:{}", maker.config.socks_port),
dns_address.as_str(),
)
.map(|stream| stream.into_inner()),
};
thread::spawn(move || {
let outer_interval_duration = Duration::from_secs(DIRECTORY_SERVERS_REFRESH_INTERVAL_SECS);

log::info!(
"[{}] Connecting to DNS: {}",
maker.config.network_port,
dns_address
);
while !maker.shutdown.load(Relaxed) {
let stream = match maker.config.connection_type {
ConnectionType::CLEARNET => TcpStream::connect(&dns_address),
#[cfg(feature = "tor")]
ConnectionType::TOR => Socks5Stream::connect(
format!("127.0.0.1:{}", maker.config.socks_port),
dns_address.as_str(),
)
.map(|stream| stream.into_inner()),
};

let mut stream = match stream {
Ok(s) => s,
Err(e) => {
log::warn!(
"[{}] TCP connection error with directory, reattempting: {}",
maker_port,
e
);
thread::sleep(HEART_BEAT_INTERVAL);
continue;
}
};

if let Err(e) = send_message(&mut stream, &request) {
log::warn!(
"[{}] Failed to send our address to directory, reattempting: {}",
maker_port,
e
);
thread::sleep(HEART_BEAT_INTERVAL);
continue;
}
log::info!(
"[{}] Connecting to DNS: {}",
maker.config.network_port,
dns_address
);

log::info!(
"[{}] Successfully sent our address to DNS at {}",
let mut stream = match stream {
Ok(s) => s,
Err(e) => {
log::warn!(
"[{}] TCP connection error with directory, reattempting: {}",
maker_port,
dns_address
e
);
break;
thread::sleep(HEART_BEAT_INTERVAL);
continue;
}
thread::sleep(outer_interval_duration);
};

if let Err(e) = send_message(&mut stream, &request) {
log::warn!(
"[{}] Failed to send our address to directory, reattempting: {}",
maker_port,
e
);
thread::sleep(HEART_BEAT_INTERVAL);
continue;
}
})?;

log::info!(
"[{}] Successfully sent our address to DNS at {}",
maker_port,
dns_address
);
thread::sleep(outer_interval_duration);
}
});

Ok(tor_handle)
}
Expand Down
117 changes: 15 additions & 102 deletions src/market/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ use std::{
collections::HashMap,
convert::TryFrom,
fs::{self, File},
io::{BufRead, BufReader, Write},
io::Write,
net::{Ipv4Addr, TcpListener, TcpStream},
path::{Path, PathBuf},
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard,
Expand Down Expand Up @@ -211,10 +210,7 @@ impl DirectoryServer {
config_file.write_all(content.as_bytes())?;
}

// Load all addresses from address.dat file
let address_file = data_dir.join("addresses.dat");
let addresses: Arc<RwLock<HashMap<OutPoint, (String, Instant)>>> =
Arc::new(RwLock::new(read_addresses_from_file(&address_file)?));
let addresses = Arc::new(RwLock::new(HashMap::new()));
let default_dns = Self::default();

Ok(DirectoryServer {
Expand Down Expand Up @@ -303,103 +299,23 @@ fn write_default_directory_config(config_path: &Path) -> Result<(), DirectorySer
pub(crate) fn start_address_writer_thread(
directory: Arc<DirectoryServer>,
) -> Result<(), DirectoryServerError> {
let address_file = directory.data_dir.join("addresses.dat");

let interval = if cfg!(feature = "integration-test") {
3 // 3 seconds for tests
} else {
600 // 10 minutes for production
};
let interval = 60 * 15;
loop {
sleep(Duration::from_secs(interval));

if let Err(e) = write_addresses_to_file(&directory, &address_file) {
log::error!("Error writing addresses: {:?}", e);
let mut directory_address_book = directory.addresses.write()?;
let ttl = Duration::from_secs(60 * 30);

let expired_outpoints: Vec<_> = directory_address_book
.iter()
.filter(|(_, (_, timestamp))| timestamp.elapsed() > ttl)
.map(|(outpoint, _)| *outpoint)
.collect();
for outpoint in &expired_outpoints {
directory_address_book.remove(outpoint);
}
}
}

/// Write in-memory address data to file.
pub(crate) fn write_addresses_to_file(
directory: &Arc<DirectoryServer>,
address_file: &Path,
) -> Result<(), DirectoryServerError> {
let file_content = directory
.addresses
.read()?
.iter()
.map(|(op, (addr, time))| format!("{},{},{:?}\n", op, addr, time))
.collect::<Vec<String>>()
.join("");

let mut file = File::create(address_file)?;
file.write_all(file_content.as_bytes())?;
file.flush()?;
Ok(())
}

/// Reads address data from a file and returns a `HashMap` of valid entries.
/// Entries with timestamps older than 30 minutes are discarded.
#[allow(clippy::type_complexity)]
pub fn read_addresses_from_file(
path: &Path,
) -> Result<HashMap<OutPoint, (String, Instant)>, DirectoryServerError> {
if !path.exists() {
return Ok(HashMap::new());
}

let reader = BufReader::new(File::open(path)?);

let thirty_minutes_ago = Instant::now() - std::time::Duration::from_secs(30 * 60);

reader
.lines()
.map(
|line| -> Result<Option<(OutPoint, (String, Instant))>, DirectoryServerError> {
let line = line.map_err(DirectoryServerError::IO)?;

let mut parts = line.splitn(3, ',');
let outpoint = parts.next().ok_or_else(|| {
DirectoryServerError::AddressFileCorrupted(
"Missing OutPoint in address.dat file".to_string(),
)
})?;
let addr = parts.next().ok_or_else(|| {
DirectoryServerError::AddressFileCorrupted(
"Missing address in address.dat file".to_string(),
)
})?;
let time_duration = parts.next().ok_or_else(|| {
DirectoryServerError::AddressFileCorrupted(
"Missing time duration in address.dat file".to_string(),
)
})?;

let op = OutPoint::from_str(outpoint).map_err(|_| {
DirectoryServerError::AddressFileCorrupted(format!(
"Invalid OutPoint: {}",
outpoint
))
})?;
let duration_secs: u64 = time_duration.parse().map_err(|_| {
DirectoryServerError::AddressFileCorrupted(format!(
"Invalid time duration: {}",
time_duration
))
})?;
let timestamp = Instant::now() - std::time::Duration::from_secs(duration_secs);

if timestamp < thirty_minutes_ago {
return Ok(None);
}

Ok(Some((op, (addr.to_string(), timestamp))))
},
)
.filter_map(|res| res.transpose())
.collect::<Result<HashMap<_, _>, DirectoryServerError>>()
}

/// Initializes and starts the Directory Server with the provided configuration.
///
/// This function configures the Directory Server based on the specified `directory` and optional `rpc_config`.
Expand Down Expand Up @@ -476,7 +392,6 @@ pub fn start_directory_server(
start_rpc_server_thread(directory_clone)
});

let address_file = directory.data_dir.join("addresses.dat");
let directory_clone = directory.clone();
let address_writer_thread = thread::spawn(move || {
log::info!("Spawning Address Writer Thread");
Expand Down Expand Up @@ -522,8 +437,6 @@ pub fn start_directory_server(
}
}

write_addresses_to_file(&directory, &address_file)?;

Ok(())
}

Expand Down Expand Up @@ -569,11 +482,10 @@ fn handle_client(
log::info!("Received GET");

let addresses = directory.addresses.read()?;
let thirty_minutes_ago = Instant::now() - std::time::Duration::from_secs(30 * 60);

let response = addresses
.iter()
.filter(|(_, (_, timestamp))| *timestamp >= thirty_minutes_ago)
.filter(|(_, (_, timestamp))| timestamp.elapsed() >= Duration::from_secs(30 * 60))
.fold(String::new(), |acc, (_, addr)| acc + &addr.0 + "\n");

log::debug!("Sending Addresses: {}", response);
Expand All @@ -582,6 +494,7 @@ fn handle_client(
#[cfg(feature = "integration-test")]
// Used for IT, only checks the updated_address_map() function.
DnsRequest::Dummy { url, vout } => {
use std::str::FromStr;
log::info!("Got new maker address: {}", &url);

// Create a constant txid for tests
Expand Down
23 changes: 0 additions & 23 deletions tests/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,27 +85,4 @@ fn test_dns() {
// Persistence check
process.kill().expect("Failed to kill directoryd process");
process.wait().unwrap();

let mut process = start_dns(&data_dir, &bitcoind);

// Replace address 8082 to 8083 registered for Bond index 2.
// Add a new entry with a new bond index
let additional_addresses = vec![("127.0.0.1:8083", 2), ("127.0.0.1:8084", 3)];
send_addresses(&additional_addresses);
thread::sleep(Duration::from_secs(10));

process.kill().expect("Failed to kill directoryd process");
process.wait().unwrap();

let mut process = start_dns(&data_dir, &bitcoind);
let all_addresses = vec![
("127.0.0.1:8080", 0),
("127.0.0.1:8081", 1),
("127.0.0.1:8083", 2),
("127.0.0.1:8084", 3),
];
verify_addresses(&all_addresses);

process.kill().expect("Failed to kill directoryd process");
process.wait().unwrap();
}

0 comments on commit b209f12

Please sign in to comment.