From fed0e582c19a38493901c066405e956c7323b0ce Mon Sep 17 00:00:00 2001 From: lunar-mining Date: Thu, 18 Jan 2024 13:00:55 +0100 Subject: [PATCH] net: bug fixes and cleanup * move upgrade_host() and downgrade_host() to store.rs * don't downgrade disconnected inbound connections * fix logic error (missing else clause) on [hostlist]_store_or_update * improve code comments * make [hostlist]_remove a single-line function --- src/net/hosts/store.rs | 172 ++++++++++++++++++---------- src/net/session/inbound_session.rs | 3 - src/net/session/manual_session.rs | 6 +- src/net/session/mod.rs | 47 +------- src/net/session/outbound_session.rs | 6 +- 5 files changed, 119 insertions(+), 115 deletions(-) diff --git a/src/net/hosts/store.rs b/src/net/hosts/store.rs index d77215209..d16bc1b39 100644 --- a/src/net/hosts/store.rs +++ b/src/net/hosts/store.rs @@ -21,6 +21,7 @@ use std::{ fs, fs::File, sync::Arc, + time::UNIX_EPOCH, }; use log::{debug, error, info, trace, warn}; @@ -29,7 +30,7 @@ use rand::{ rngs::OsRng, Rng, }; -use smol::lock::RwLock; +use smol::lock::{Mutex, RwLock}; use url::Url; use super::super::{p2p::P2pPtr, settings::SettingsPtr}; @@ -57,16 +58,16 @@ const GREYLIST_MAX_LEN: usize = 2000; // TODO: Check whether anchorlist has a max size in Monero. // TODO: we can probably clean up a lot of the repetitive code in this module. pub struct Hosts { - // Intermediary node list that is periodically probed and updated to whitelist. + /// Intermediary node list that is periodically probed and updated to whitelist. pub greylist: RwLock>, - // Recently seen nodes. + /// Recently seen hosts. Shared with other nodes. pub whitelist: RwLock>, - // Nodes to which we have already been able to establish a connection. + /// Nodes to which we have already been able to establish a connection. pub anchorlist: RwLock>, - /// Peers we reject from connecting + /// Peers we reject from connecting to rejected: RwLock>, /// Subscriber listening for store updates @@ -80,8 +81,8 @@ impl Hosts { /// Create a new hosts list> pub fn new(settings: SettingsPtr) -> HostsPtr { Arc::new(Self { - whitelist: RwLock::new(Vec::new()), greylist: RwLock::new(Vec::new()), + whitelist: RwLock::new(Vec::new()), anchorlist: RwLock::new(Vec::new()), rejected: RwLock::new(HashSet::new()), store_subscriber: Subscriber::new(), @@ -262,31 +263,98 @@ impl Hosts { None } + /// Upgrade a connection to the anchorlist and remove it from the white or greylist. + /// Called after a connection has been successfully established in Outbound and Manual + /// sessions. + pub async fn upgrade_host(&self, addr: &Url) { + // Remove channel from whitelist + if self.whitelist_contains(addr).await { + let index = self + .get_whitelist_index_at_addr(addr.clone()) + .await + .expect("Expected whitelist entry to exist"); + + self.whitelist_remove(addr, index).await; + } + + // Remove channel from greylist + if self.greylist_contains(addr).await { + let index = self + .get_greylist_index_at_addr(addr.clone()) + .await + .expect("Expected greylist entry to exist"); + self.greylist_remove(addr, index).await; + } + + // Add channel to anchorlist + let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); + self.anchorlist_store_or_update(&[(addr.clone(), last_seen)]).await; + } + + /// Downgrade a connection. If it's on the anchorlist or the whitelist, remove it and add it + /// to the greylist. Called when we cannot establish a connection to a host or when a + /// pre-existing connection disconnects. + pub async fn downgrade_host(&self, addr: &Url) { + // Remove channel from anchorlist and add it to greylist + if self.anchorlist_contains(addr).await { + let (url, last_seen) = self + .get_anchorlist_entry_at_addr(addr) + .await + .expect("Expected anchorlist entry to exist"); + + self.greylist_store_or_update(&[(url, last_seen)]).await; + + let index = self + .get_anchorlist_index_at_addr(addr.clone()) + .await + .expect("Expected anchorlist index to exist"); + + self.anchorlist_remove(addr, index).await; + } + + // Remove channel from whitelist and add to greylist + if self.whitelist_contains(addr).await { + let (url, last_seen) = self + .get_whitelist_entry_at_addr(addr) + .await + .expect("Expected whitelist entry to exist"); + + self.greylist_store_or_update(&[(url, last_seen)]).await; + + let index = self + .get_whitelist_index_at_addr(addr.clone()) + .await + .expect("Expected whitelist index to exist"); + + self.whitelist_remove(addr, index).await; + } + } + /// Stores an address on the greylist or updates its last_seen field if we already /// have the address. pub async fn greylist_store_or_update(&self, addrs: &[(Url, u64)]) { trace!(target: "store::greylist_store_or_update()", "[START]"); - // Filter addresses before writing to the greylist. let filtered_addrs = self.filter_addresses(addrs).await; let filtered_addrs_len = filtered_addrs.len(); for (addr, last_seen) in filtered_addrs { if !self.greylist_contains(&addr).await { - debug!(target: "store::greylist_store_or_update()", "We do not have this entry in the hostlist. Adding to store..."); + debug!(target: "store::greylist_store_or_update()", + "We do not have this entry in the hostlist. Adding to store..."); self.greylist_store(addr.clone(), last_seen).await; + } else { + debug!(target: "store::greylist_store_or_update()", + "We have this entry in the greylist. Updating last seen..."); + + let index = self + .get_greylist_index_at_addr(addr.clone()) + .await + .expect("Expected greylist entry to exist"); + self.greylist_update_last_seen(&addr, last_seen, index).await; + self.store_subscriber.notify(filtered_addrs_len).await; } - - debug!(target: "store::greylist_store_or_update()", - "We have this entry in the greylist. Updating last seen..."); - - let index = self - .get_greylist_index_at_addr(addr.clone()) - .await - .expect("Expected greylist entry to exist"); - self.greylist_update_last_seen(&addr, last_seen, index).await; } - self.store_subscriber.notify(filtered_addrs_len).await; } /// Stores an address on the whitelist or updates its last_seen field if we already @@ -301,16 +369,16 @@ impl Hosts { "We do not have this entry in the whitelist. Adding to store..."); self.whitelist_store(addr.clone(), *last_seen).await; - } - - debug!(target: "store::whitelist_store_or_update()", + } else { + debug!(target: "store::whitelist_store_or_update()", "We have this entry in the whitelist. Updating last seen..."); - let index = self - .get_whitelist_index_at_addr(addr.clone()) - .await - .expect("Expected whitelist entry to exist"); - self.whitelist_update_last_seen(addr, *last_seen, index).await; + let index = self + .get_whitelist_index_at_addr(addr.clone()) + .await + .expect("Expected whitelist entry to exist"); + self.whitelist_update_last_seen(addr, *last_seen, index).await; + } } } @@ -326,15 +394,16 @@ impl Hosts { "We do not have this entry in the whitelist. Adding to store..."); self.anchorlist_store(addr.clone(), *last_seen).await; - } - debug!(target: "store::anchorlist_store_or_update()", + } else { + debug!(target: "store::anchorlist_store_or_update()", "We have this entry in the anchorlist. Updating last seen..."); - let index = self - .get_anchorlist_index_at_addr(addr.clone()) - .await - .expect("Expected anchorlist entry to exist"); - self.anchorlist_update_last_seen(addr, *last_seen, index).await; + let index = self + .get_anchorlist_index_at_addr(addr.clone()) + .await + .expect("Expected anchorlist entry to exist"); + self.anchorlist_update_last_seen(addr, *last_seen, index).await; + } } } @@ -435,36 +504,21 @@ impl Hosts { } /// Remove an entry from the greylist. - pub async fn greylist_remove(&self, addr: &Url, position: usize) { - debug!(target: "net::refinery::run()", "Removing whitelisted peer {} from greylist", addr); - let mut greylist = self.greylist.write().await; - - greylist.remove(position); - - // Sort the list by last_seen. - greylist.sort_by_key(|entry| entry.1); + pub async fn greylist_remove(&self, addr: &Url, index: usize) { + debug!(target: "net::refinery::run()", "Removing peer {} from greylist", addr); + self.greylist.write().await.remove(index); } /// Remove an entry from the whitelist. - pub async fn whitelist_remove(&self, addr: &Url, position: usize) { - debug!(target: "net::refinery::run()", "Removing disconnected peer {} from whitelist", addr); - let mut whitelist = self.whitelist.write().await; - - whitelist.remove(position); - - // Sort the list by last_seen. - whitelist.sort_by_key(|entry| entry.1); + pub async fn whitelist_remove(&self, addr: &Url, index: usize) { + debug!(target: "net::refinery::run()", "Removing peer {} from whitelist", addr); + self.whitelist.write().await.remove(index); } /// Remove an entry from the anchorlist. - pub async fn anchorlist_remove(&self, addr: &Url, position: usize) { - debug!(target: "net::refinery::run()", "Removing disconnected peer {} from anchorlist", addr); - let mut anchorlist = self.anchorlist.write().await; - - anchorlist.remove(position); - - // Sort the list by last_seen. - anchorlist.sort_by_key(|entry| entry.1); + pub async fn anchorlist_remove(&self, addr: &Url, index: usize) { + debug!(target: "net::refinery::run()", "Removing peer {} from anchorlist", addr); + self.anchorlist.write().await.remove(index); } pub async fn subscribe_store(&self) -> Result> { @@ -1063,9 +1117,7 @@ impl Hosts { #[cfg(test)] mod tests { use super::{ - super::{ - super::{settings::Settings, P2p}, - }, + super::super::{settings::Settings, P2p}, *, }; use crate::system::sleep; diff --git a/src/net/session/inbound_session.rs b/src/net/session/inbound_session.rs index 8655a00ef..ff0f092e9 100644 --- a/src/net/session/inbound_session.rs +++ b/src/net/session/inbound_session.rs @@ -171,9 +171,6 @@ impl InboundSession { self.p2p().remove(channel.clone()).await; - // Downgrade this host to greylist if it's on the whitelist or anchorlist. - self.downgrade_host(&channel.info.addr).await; - debug!( target: "net::inbound_session::setup_channel()", "Received stop_sub, channel removed from P2P", diff --git a/src/net/session/manual_session.rs b/src/net/session/manual_session.rs index 30599cc7d..e01906362 100644 --- a/src/net/session/manual_session.rs +++ b/src/net/session/manual_session.rs @@ -135,7 +135,7 @@ impl ManualSession { self.p2p().remove_pending(&addr).await; // Add this connection to the anchorlist, remove it from the [otherlist] - self.upgrade_host(&addr).await; + self.p2p().hosts().upgrade_host(&addr).await; // Notify that channel processing has finished self.channel_subscriber.notify(Ok(channel)).await; @@ -147,7 +147,7 @@ impl ManualSession { "[P2P] Manual outbound disconnected [{}]", url, ); // Downgrade this host to greylist if it's on the whitelist or anchorlist. - self.downgrade_host(&addr).await; + self.p2p().hosts().downgrade_host(&addr).await; // DEV NOTE: Here we can choose to attempt reconnection again return Ok(()) @@ -160,7 +160,7 @@ impl ManualSession { ); // Downgrade this host to greylist if it's on the whitelist or anchorlist. - self.downgrade_host(&addr).await; + //self.downgrade_host(&addr).await; } } diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 3df935aa1..a435d5b20 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -22,7 +22,7 @@ use std::{ }; use async_trait::async_trait; -use log::debug; +use log::{debug, warn}; use smol::Executor; use url::Url; @@ -147,51 +147,6 @@ pub trait Session: Sync { Ok(()) } - /// Upgrade a connection to the anchorlist and remove it from the white or greylist. - /// Called after a connection has been successfully established in Outbound and Manual - /// sessions. - async fn upgrade_host(&self, addr: &Url) { - let hosts = self.p2p().hosts(); - - let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - hosts.anchorlist_store_or_update(&[(addr.clone(), last_seen)]).await; - - if hosts.whitelist_contains(addr).await { - let index = hosts.get_whitelist_index_at_addr(addr.clone()).await.unwrap(); - hosts.whitelist_remove(addr, index).await; - } - - if hosts.greylist_contains(addr).await { - let index = hosts.get_greylist_index_at_addr(addr.clone()).await.unwrap(); - hosts.greylist_remove(addr, index).await; - } - } - - /// Downgrade a connection. If it's on the anchorlist or the whitelist, remove it and add it - /// to the greylist. Called when we cannot establish a connection to a host or when a - /// pre-existing connection disconnects. - async fn downgrade_host(&self, addr: &Url) { - let hosts = self.p2p().hosts(); - - // Remove channel from anchorlist and add it to greylist - if hosts.anchorlist_contains(addr).await { - let (url, last_seen) = hosts.get_anchorlist_entry_at_addr(addr).await.unwrap(); - hosts.greylist_store_or_update(&[(url, last_seen)]).await; - - let index = hosts.get_anchorlist_index_at_addr(addr.clone()).await.unwrap(); - hosts.anchorlist_remove(addr, index).await; - } - - // Remove channel from whitelist and add to greylist - if hosts.whitelist_contains(addr).await { - let (url, last_seen) = hosts.get_whitelist_entry_at_addr(addr).await.unwrap(); - hosts.greylist_store_or_update(&[(url, last_seen)]).await; - - let index = hosts.get_whitelist_index_at_addr(addr.clone()).await.unwrap(); - hosts.whitelist_remove(addr, index).await; - } - } - /// Returns a pointer to the p2p network interface fn p2p(&self) -> P2pPtr; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index 3ea2eaa17..ec39fc7de 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -311,7 +311,7 @@ impl Slot { }); // Downgrade this host to greylist if it's on the whitelist or anchorlist. - self.session().downgrade_host(&host).await; + //self.session().downgrade_host(&host).await; self.channel_id.store(0, Ordering::Relaxed); continue @@ -353,14 +353,14 @@ impl Slot { self.channel_id.store(channel.info.id, Ordering::Relaxed); // Add this connection to the anchorlist, remove it from the [otherlist] - self.session().upgrade_host(&addr).await; + hosts.upgrade_host(&addr).await; // Wait for channel to close stop_sub.receive().await; self.channel_id.store(0, Ordering::Relaxed); // Downgrade this host to greylist if it's on the whitelist or anchorlist. - self.session().downgrade_host(&addr).await; + hosts.downgrade_host(&addr).await; } }