Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track the user's relay list #380

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dirs = "5.0.1"
tracing-appender = "0.2.3"
urlencoding = "2.1.3"
open = "5.3.0"
futures = "0.3.31"

[dev-dependencies]
tempfile = "3.13.0"
Expand Down
115 changes: 108 additions & 7 deletions enostr/src/relay/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use crate::relay::{Relay, RelayStatus};
use crate::{ClientMessage, Result};
use nostrdb::Filter;

use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::{Duration, Instant};

use url::Url;
Expand All @@ -10,7 +13,7 @@ use url::Url;
use ewebsock::{WsEvent, WsMessage};

#[cfg(not(target_arch = "wasm32"))]
use tracing::{debug, error};
use tracing::{debug, error, info};

#[derive(Debug)]
pub struct PoolEvent<'a> {
Expand Down Expand Up @@ -42,7 +45,16 @@ impl PoolRelay {

pub struct RelayPool {
pub relays: Vec<PoolRelay>,
pub subs: HashMap<String, Vec<Filter>>,
pub ping_rate: Duration,
/// Used when there are no others
pub bootstrapping_relays: BTreeSet<String>,
/// Locally specified relays
pub local_relays: BTreeSet<String>,
/// NIP-65 specified relays
pub advertised_relays: BTreeSet<String>,
/// If non-empty force the relay pool to use exactly this set
pub forced_relays: BTreeSet<String>,
}

impl Default for RelayPool {
Expand All @@ -56,7 +68,12 @@ impl RelayPool {
pub fn new() -> RelayPool {
RelayPool {
relays: vec![],
subs: HashMap::new(),
ping_rate: Duration::from_secs(25),
bootstrapping_relays: BTreeSet::new(),
local_relays: BTreeSet::new(),
advertised_relays: BTreeSet::new(),
forced_relays: BTreeSet::new(),
}
}

Expand Down Expand Up @@ -85,9 +102,11 @@ impl RelayPool {
for relay in &mut self.relays {
relay.relay.send(&ClientMessage::close(subid.clone()));
}
self.subs.remove(&subid);
}

pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) {
self.subs.insert(subid.clone(), filter.clone());
for relay in &mut self.relays {
relay.relay.subscribe(subid.clone(), filter.clone());
}
Expand Down Expand Up @@ -148,30 +167,112 @@ impl RelayPool {
}
}

pub fn configure_relays(
&mut self,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<()> {
let urls = if !self.forced_relays.is_empty() {
debug!("using forced relays");
self.forced_relays.iter().cloned().collect::<Vec<_>>()
} else {
let mut combined_relays = self
.local_relays
.union(&self.advertised_relays)
.cloned()
.collect::<BTreeSet<_>>();

// If the combined set is empty, use `bootstrapping_relays`.
if combined_relays.is_empty() {
debug!("using bootstrapping relays");
combined_relays = self.bootstrapping_relays.clone();
} else {
debug!("using local+advertised relays");
}

// Collect the resulting set into a vector.
combined_relays.into_iter().collect::<Vec<_>>()
};

self.set_relays(&urls, wakeup)
}

// Adds a websocket url to the RelayPool.
pub fn add_url(
fn add_url(
&mut self,
url: String,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<()> {
let url = Self::canonicalize_url(url);
let url = Self::canonicalize_url(&url);
// Check if the URL already exists in the pool.
if self.has(&url) {
return Ok(());
}
let relay = Relay::new(url, wakeup)?;
let pool_relay = PoolRelay::new(relay);
let mut pool_relay = PoolRelay::new(relay);

// Add all of the existing subscriptions to the new relay
for (subid, filters) in &self.subs {
pool_relay.relay.subscribe(subid.clone(), filters.clone());
}
Comment on lines +213 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct at this abstraction level. this is application logic, not enostr/pool logic


self.relays.push(pool_relay);

Ok(())
}

// standardize the format (ie, trailing slashes)
fn canonicalize_url(url: String) -> String {
// Add and remove relays to match the provided list
pub fn set_relays(
&mut self,
urls: &Vec<String>,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<()> {
// Canonicalize the new URLs.
let new_urls = urls
.iter()
.map(|u| Self::canonicalize_url(u))
.collect::<HashSet<_>>();

// Get the old URLs from the existing relays.
let old_urls = self
.relays
.iter()
.map(|pr| pr.relay.url.clone())
.collect::<HashSet<_>>();

debug!("old relays: {:?}", old_urls);
debug!("new relays: {:?}", new_urls);

if new_urls.len() == 0 {
info!("bootstrapping, not clearing the relay list ...");
return Ok(());
}

// Remove the relays that are in old_urls but not in new_urls.
let to_remove: HashSet<_> = old_urls.difference(&new_urls).cloned().collect();
for url in &to_remove {
debug!("removing relay {}", url);
}
self.relays.retain(|pr| !to_remove.contains(&pr.relay.url));

// FIXME - how do we close connections the removed relays?

// Add the relays that are in new_urls but not in old_urls.
let to_add: HashSet<_> = new_urls.difference(&old_urls).cloned().collect();
for url in to_add {
debug!("adding relay {}", url);
if let Err(e) = self.add_url(url.clone(), wakeup.clone()) {
error!("Failed to add relay with URL {}: {:?}", url, e);
}
}

Ok(())
}

// standardize the format (ie, trailing slashes) to avoid dups
pub fn canonicalize_url(url: &String) -> String {
match Url::parse(&url) {
Ok(parsed_url) => parsed_url.to_string(),
Err(_) => url, // If parsing fails, return the original URL.
Err(_) => url.clone(), // If parsing fails, return the original URL.
}
}

Expand Down
69 changes: 27 additions & 42 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
storage::{Directory, FileKeyStorage, KeyStorageType},
subscriptions::{SubKind, Subscriptions},
support::Support,
task,
thread::Thread,
timeline::{Timeline, TimelineId, TimelineKind, ViewFilter},
ui::{self, DesktopSidePanel},
Expand All @@ -34,6 +35,7 @@ use egui_extras::{Size, StripBuilder};

use nostrdb::{Config, Filter, Ndb, Note, Transaction};

use std::collections::BTreeSet;
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
Expand Down Expand Up @@ -73,31 +75,6 @@ pub struct Damus {
pub textmode: bool,
}

fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) {
let ctx = ctx.clone();
let wakeup = move || {
ctx.request_repaint();
};
if let Err(e) = pool.add_url("ws://localhost:8080".to_string(), wakeup.clone()) {
error!("{:?}", e)
}
if let Err(e) = pool.add_url("wss://relay.damus.io".to_string(), wakeup.clone()) {
error!("{:?}", e)
}
//if let Err(e) = pool.add_url("wss://pyramid.fiatjaf.com".to_string(), wakeup.clone()) {
//error!("{:?}", e)
//}
if let Err(e) = pool.add_url("wss://nos.lol".to_string(), wakeup.clone()) {
error!("{:?}", e)
}
if let Err(e) = pool.add_url("wss://nostr.wine".to_string(), wakeup.clone()) {
error!("{:?}", e)
}
if let Err(e) = pool.add_url("wss://purplepag.es".to_string(), wakeup) {
error!("{:?}", e)
}
}

fn send_initial_timeline_filter(
ndb: &Ndb,
can_since_optimize: bool,
Expand Down Expand Up @@ -475,6 +452,8 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
.insert("unknownids".to_string(), SubKind::OneShot);
setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns)
.expect("home subscription failed");

task::spawn_track_user_relays(damus);
}

DamusState::NewTimelineSub(new_timeline_id) => {
Expand Down Expand Up @@ -703,23 +682,29 @@ impl Damus {
}

// setup relays if we have them
let pool = if parsed_args.relays.is_empty() {
let mut pool = RelayPool::new();
relay_setup(&mut pool, &cc.egui_ctx);
pool
} else {
let ctx = cc.egui_ctx.clone();
let wakeup = move || {
ctx.request_repaint();
};
let mut pool = RelayPool::new();
for relay in parsed_args.relays {
if let Err(e) = pool.add_url(relay.clone(), wakeup.clone()) {
error!("error adding relay {}: {}", relay, e);
}
}
pool
};
let mut pool = RelayPool::new();
let bootstrapping_urls = [
"ws://localhost:8080",
"wss://relay.damus.io",
//"wss://pyramid.fiatjaf.com",
"wss://nos.lol",
"wss://nostr.wine",
"wss://purplepag.es",
];
pool.bootstrapping_relays = bootstrapping_urls
.iter()
.map(|&s| s.to_string())
.map(|s| RelayPool::canonicalize_url(&s))
.collect();
// normally empty
pool.forced_relays = parsed_args
.relays
.into_iter()
.map(|s| RelayPool::canonicalize_url(&s))
.collect::<BTreeSet<_>>();
// avoid relay thrash, don't call configure_relays here
// because the initial advertised set will be registered
// shortly and it will be called then

let account = accounts
.get_selected_account()
Expand Down
Loading
Loading