net: bug fixes and cleanup

* move upgrade_host() and downgrade_host() to store.rs
* don't downgrade disconnected inbound connections
* fix logic error (missing else clause) on [hostlist]_store_or_update
* improve code comments
* make [hostlist]_remove a single-line function
This commit is contained in:
lunar-mining
2024-01-18 13:00:55 +01:00
parent 576afd574d
commit fed0e582c1
5 changed files with 119 additions and 115 deletions

View File

@@ -21,6 +21,7 @@ use std::{
fs,
fs::File,
sync::Arc,
time::UNIX_EPOCH,
};
use log::{debug, error, info, trace, warn};
@@ -29,7 +30,7 @@ use rand::{
rngs::OsRng,
Rng,
};
use smol::lock::RwLock;
use smol::lock::{Mutex, RwLock};
use url::Url;
use super::super::{p2p::P2pPtr, settings::SettingsPtr};
@@ -57,16 +58,16 @@ const GREYLIST_MAX_LEN: usize = 2000;
// TODO: Check whether anchorlist has a max size in Monero.
// TODO: we can probably clean up a lot of the repetitive code in this module.
pub struct Hosts {
// Intermediary node list that is periodically probed and updated to whitelist.
/// Intermediary node list that is periodically probed and updated to whitelist.
pub greylist: RwLock<Vec<(Url, u64)>>,
// Recently seen nodes.
/// Recently seen hosts. Shared with other nodes.
pub whitelist: RwLock<Vec<(Url, u64)>>,
// Nodes to which we have already been able to establish a connection.
/// Nodes to which we have already been able to establish a connection.
pub anchorlist: RwLock<Vec<(Url, u64)>>,
/// Peers we reject from connecting
/// Peers we reject from connecting to
rejected: RwLock<HashSet<String>>,
/// Subscriber listening for store updates
@@ -80,8 +81,8 @@ impl Hosts {
/// Create a new hosts list>
pub fn new(settings: SettingsPtr) -> HostsPtr {
Arc::new(Self {
whitelist: RwLock::new(Vec::new()),
greylist: RwLock::new(Vec::new()),
whitelist: RwLock::new(Vec::new()),
anchorlist: RwLock::new(Vec::new()),
rejected: RwLock::new(HashSet::new()),
store_subscriber: Subscriber::new(),
@@ -262,31 +263,98 @@ impl Hosts {
None
}
/// Upgrade a connection to the anchorlist and remove it from the white or greylist.
/// Called after a connection has been successfully established in Outbound and Manual
/// sessions.
pub async fn upgrade_host(&self, addr: &Url) {
// Remove channel from whitelist
if self.whitelist_contains(addr).await {
let index = self
.get_whitelist_index_at_addr(addr.clone())
.await
.expect("Expected whitelist entry to exist");
self.whitelist_remove(addr, index).await;
}
// Remove channel from greylist
if self.greylist_contains(addr).await {
let index = self
.get_greylist_index_at_addr(addr.clone())
.await
.expect("Expected greylist entry to exist");
self.greylist_remove(addr, index).await;
}
// Add channel to anchorlist
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
self.anchorlist_store_or_update(&[(addr.clone(), last_seen)]).await;
}
/// Downgrade a connection. If it's on the anchorlist or the whitelist, remove it and add it
/// to the greylist. Called when we cannot establish a connection to a host or when a
/// pre-existing connection disconnects.
pub async fn downgrade_host(&self, addr: &Url) {
// Remove channel from anchorlist and add it to greylist
if self.anchorlist_contains(addr).await {
let (url, last_seen) = self
.get_anchorlist_entry_at_addr(addr)
.await
.expect("Expected anchorlist entry to exist");
self.greylist_store_or_update(&[(url, last_seen)]).await;
let index = self
.get_anchorlist_index_at_addr(addr.clone())
.await
.expect("Expected anchorlist index to exist");
self.anchorlist_remove(addr, index).await;
}
// Remove channel from whitelist and add to greylist
if self.whitelist_contains(addr).await {
let (url, last_seen) = self
.get_whitelist_entry_at_addr(addr)
.await
.expect("Expected whitelist entry to exist");
self.greylist_store_or_update(&[(url, last_seen)]).await;
let index = self
.get_whitelist_index_at_addr(addr.clone())
.await
.expect("Expected whitelist index to exist");
self.whitelist_remove(addr, index).await;
}
}
/// Stores an address on the greylist or updates its last_seen field if we already
/// have the address.
pub async fn greylist_store_or_update(&self, addrs: &[(Url, u64)]) {
trace!(target: "store::greylist_store_or_update()", "[START]");
// Filter addresses before writing to the greylist.
let filtered_addrs = self.filter_addresses(addrs).await;
let filtered_addrs_len = filtered_addrs.len();
for (addr, last_seen) in filtered_addrs {
if !self.greylist_contains(&addr).await {
debug!(target: "store::greylist_store_or_update()", "We do not have this entry in the hostlist. Adding to store...");
debug!(target: "store::greylist_store_or_update()",
"We do not have this entry in the hostlist. Adding to store...");
self.greylist_store(addr.clone(), last_seen).await;
} else {
debug!(target: "store::greylist_store_or_update()",
"We have this entry in the greylist. Updating last seen...");
let index = self
.get_greylist_index_at_addr(addr.clone())
.await
.expect("Expected greylist entry to exist");
self.greylist_update_last_seen(&addr, last_seen, index).await;
self.store_subscriber.notify(filtered_addrs_len).await;
}
debug!(target: "store::greylist_store_or_update()",
"We have this entry in the greylist. Updating last seen...");
let index = self
.get_greylist_index_at_addr(addr.clone())
.await
.expect("Expected greylist entry to exist");
self.greylist_update_last_seen(&addr, last_seen, index).await;
}
self.store_subscriber.notify(filtered_addrs_len).await;
}
/// Stores an address on the whitelist or updates its last_seen field if we already
@@ -301,16 +369,16 @@ impl Hosts {
"We do not have this entry in the whitelist. Adding to store...");
self.whitelist_store(addr.clone(), *last_seen).await;
}
debug!(target: "store::whitelist_store_or_update()",
} else {
debug!(target: "store::whitelist_store_or_update()",
"We have this entry in the whitelist. Updating last seen...");
let index = self
.get_whitelist_index_at_addr(addr.clone())
.await
.expect("Expected whitelist entry to exist");
self.whitelist_update_last_seen(addr, *last_seen, index).await;
let index = self
.get_whitelist_index_at_addr(addr.clone())
.await
.expect("Expected whitelist entry to exist");
self.whitelist_update_last_seen(addr, *last_seen, index).await;
}
}
}
@@ -326,15 +394,16 @@ impl Hosts {
"We do not have this entry in the whitelist. Adding to store...");
self.anchorlist_store(addr.clone(), *last_seen).await;
}
debug!(target: "store::anchorlist_store_or_update()",
} else {
debug!(target: "store::anchorlist_store_or_update()",
"We have this entry in the anchorlist. Updating last seen...");
let index = self
.get_anchorlist_index_at_addr(addr.clone())
.await
.expect("Expected anchorlist entry to exist");
self.anchorlist_update_last_seen(addr, *last_seen, index).await;
let index = self
.get_anchorlist_index_at_addr(addr.clone())
.await
.expect("Expected anchorlist entry to exist");
self.anchorlist_update_last_seen(addr, *last_seen, index).await;
}
}
}
@@ -435,36 +504,21 @@ impl Hosts {
}
/// Remove an entry from the greylist.
pub async fn greylist_remove(&self, addr: &Url, position: usize) {
debug!(target: "net::refinery::run()", "Removing whitelisted peer {} from greylist", addr);
let mut greylist = self.greylist.write().await;
greylist.remove(position);
// Sort the list by last_seen.
greylist.sort_by_key(|entry| entry.1);
pub async fn greylist_remove(&self, addr: &Url, index: usize) {
debug!(target: "net::refinery::run()", "Removing peer {} from greylist", addr);
self.greylist.write().await.remove(index);
}
/// Remove an entry from the whitelist.
pub async fn whitelist_remove(&self, addr: &Url, position: usize) {
debug!(target: "net::refinery::run()", "Removing disconnected peer {} from whitelist", addr);
let mut whitelist = self.whitelist.write().await;
whitelist.remove(position);
// Sort the list by last_seen.
whitelist.sort_by_key(|entry| entry.1);
pub async fn whitelist_remove(&self, addr: &Url, index: usize) {
debug!(target: "net::refinery::run()", "Removing peer {} from whitelist", addr);
self.whitelist.write().await.remove(index);
}
/// Remove an entry from the anchorlist.
pub async fn anchorlist_remove(&self, addr: &Url, position: usize) {
debug!(target: "net::refinery::run()", "Removing disconnected peer {} from anchorlist", addr);
let mut anchorlist = self.anchorlist.write().await;
anchorlist.remove(position);
// Sort the list by last_seen.
anchorlist.sort_by_key(|entry| entry.1);
pub async fn anchorlist_remove(&self, addr: &Url, index: usize) {
debug!(target: "net::refinery::run()", "Removing peer {} from anchorlist", addr);
self.anchorlist.write().await.remove(index);
}
pub async fn subscribe_store(&self) -> Result<Subscription<usize>> {
@@ -1063,9 +1117,7 @@ impl Hosts {
#[cfg(test)]
mod tests {
use super::{
super::{
super::{settings::Settings, P2p},
},
super::super::{settings::Settings, P2p},
*,
};
use crate::system::sleep;

View File

@@ -171,9 +171,6 @@ impl InboundSession {
self.p2p().remove(channel.clone()).await;
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.downgrade_host(&channel.info.addr).await;
debug!(
target: "net::inbound_session::setup_channel()",
"Received stop_sub, channel removed from P2P",

View File

@@ -135,7 +135,7 @@ impl ManualSession {
self.p2p().remove_pending(&addr).await;
// Add this connection to the anchorlist, remove it from the [otherlist]
self.upgrade_host(&addr).await;
self.p2p().hosts().upgrade_host(&addr).await;
// Notify that channel processing has finished
self.channel_subscriber.notify(Ok(channel)).await;
@@ -147,7 +147,7 @@ impl ManualSession {
"[P2P] Manual outbound disconnected [{}]", url,
);
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.downgrade_host(&addr).await;
self.p2p().hosts().downgrade_host(&addr).await;
// DEV NOTE: Here we can choose to attempt reconnection again
return Ok(())
@@ -160,7 +160,7 @@ impl ManualSession {
);
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.downgrade_host(&addr).await;
//self.downgrade_host(&addr).await;
}
}

View File

@@ -22,7 +22,7 @@ use std::{
};
use async_trait::async_trait;
use log::debug;
use log::{debug, warn};
use smol::Executor;
use url::Url;
@@ -147,51 +147,6 @@ pub trait Session: Sync {
Ok(())
}
/// Upgrade a connection to the anchorlist and remove it from the white or greylist.
/// Called after a connection has been successfully established in Outbound and Manual
/// sessions.
async fn upgrade_host(&self, addr: &Url) {
let hosts = self.p2p().hosts();
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
hosts.anchorlist_store_or_update(&[(addr.clone(), last_seen)]).await;
if hosts.whitelist_contains(addr).await {
let index = hosts.get_whitelist_index_at_addr(addr.clone()).await.unwrap();
hosts.whitelist_remove(addr, index).await;
}
if hosts.greylist_contains(addr).await {
let index = hosts.get_greylist_index_at_addr(addr.clone()).await.unwrap();
hosts.greylist_remove(addr, index).await;
}
}
/// Downgrade a connection. If it's on the anchorlist or the whitelist, remove it and add it
/// to the greylist. Called when we cannot establish a connection to a host or when a
/// pre-existing connection disconnects.
async fn downgrade_host(&self, addr: &Url) {
let hosts = self.p2p().hosts();
// Remove channel from anchorlist and add it to greylist
if hosts.anchorlist_contains(addr).await {
let (url, last_seen) = hosts.get_anchorlist_entry_at_addr(addr).await.unwrap();
hosts.greylist_store_or_update(&[(url, last_seen)]).await;
let index = hosts.get_anchorlist_index_at_addr(addr.clone()).await.unwrap();
hosts.anchorlist_remove(addr, index).await;
}
// Remove channel from whitelist and add to greylist
if hosts.whitelist_contains(addr).await {
let (url, last_seen) = hosts.get_whitelist_entry_at_addr(addr).await.unwrap();
hosts.greylist_store_or_update(&[(url, last_seen)]).await;
let index = hosts.get_whitelist_index_at_addr(addr.clone()).await.unwrap();
hosts.whitelist_remove(addr, index).await;
}
}
/// Returns a pointer to the p2p network interface
fn p2p(&self) -> P2pPtr;

View File

@@ -311,7 +311,7 @@ impl Slot {
});
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.session().downgrade_host(&host).await;
//self.session().downgrade_host(&host).await;
self.channel_id.store(0, Ordering::Relaxed);
continue
@@ -353,14 +353,14 @@ impl Slot {
self.channel_id.store(channel.info.id, Ordering::Relaxed);
// Add this connection to the anchorlist, remove it from the [otherlist]
self.session().upgrade_host(&addr).await;
hosts.upgrade_host(&addr).await;
// Wait for channel to close
stop_sub.receive().await;
self.channel_id.store(0, Ordering::Relaxed);
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.session().downgrade_host(&addr).await;
hosts.downgrade_host(&addr).await;
}
}