net: don't hide connection upgrade inside perform_handshake_protocols()

make a standalone function that removes the address from the white or
greylist and adds it to the anchorlist. call it inside outbound and
manual session.

this commit also removes a redundant Result<()> from
[hostlist]_store_or_upgrade() method and updates its usage.

We also add various debug statements and cleanup.
This commit is contained in:
lunar-mining
2024-01-14 11:42:01 +01:00
parent f08ce9a4c8
commit 3abd2c62bb
9 changed files with 163 additions and 126 deletions

View File

@@ -106,7 +106,7 @@ impl GreylistRefinery {
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
// Append to the whitelist.
hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await.unwrap();
hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await;
// Remove whitelisted peer from the greylist.
hosts.greylist_remove(url, position).await;

View File

@@ -276,7 +276,7 @@ impl Hosts {
/// Stores an address on the greylist or updates its last_seen field if we already
/// have the address.
pub async fn greylist_store_or_update(&self, addrs: &[(Url, u64)]) -> Result<()> {
pub async fn greylist_store_or_update(&self, addrs: &[(Url, u64)]) {
trace!(target: "store::greylist_store_or_update()", "[START]");
// Filter addresses before writing to the greylist.
@@ -299,12 +299,11 @@ impl Hosts {
self.greylist_update_last_seen(&addr, last_seen, index).await;
}
self.store_subscriber.notify(filtered_addrs_len).await;
Ok(())
}
/// Stores an address on the whitelist or updates its last_seen field if we already
/// have the address.
pub async fn whitelist_store_or_update(&self, addrs: &[(Url, u64)]) -> Result<()> {
pub async fn whitelist_store_or_update(&self, addrs: &[(Url, u64)]) {
trace!(target: "store::whitelist_store_or_update()", "[START]");
// No address filtering for whitelist (whitelist is created from greylist)
@@ -325,12 +324,11 @@ impl Hosts {
.expect("Expected whitelist entry to exist");
self.whitelist_update_last_seen(addr, last_seen.clone(), index).await;
}
Ok(())
}
/// Stores an address on the anchorlist or updates its last_seen field if we already
/// have the address.
pub async fn anchorlist_store_or_update(&self, addrs: &[(Url, u64)]) -> Result<()> {
pub async fn anchorlist_store_or_update(&self, addrs: &[(Url, u64)]) {
trace!(target: "store::anchor_store_or_update()", "[START]");
// No address filtering for anchorlist (contains addresses we have already connected to)
@@ -350,7 +348,6 @@ impl Hosts {
.expect("Expected anchorlist entry to exist");
self.anchorlist_update_last_seen(addr, last_seen.clone(), index).await;
}
Ok(())
}
/// Append host to the greylist. Called on learning of a new peer.
@@ -407,58 +404,58 @@ impl Hosts {
trace!(target: "store::anchorlist_store()", "[END]");
}
// Downgrade a non-responsive host.
// TODO: it is perhaps more efficient simply to select another connection and allow
// the greylist refinery process to filter address.
pub async fn downgrade_host(&self, addr: &Url) -> Result<()> {
if self.anchorlist_contains(addr).await {
debug!(target: "net::store::downgrade_host()",
"Removing non responsive peer from anchorlist");
//// Downgrade a non-responsive host.
//// TODO: it is perhaps more efficient simply to select another connection and allow
//// the greylist refinery process to filter address.
//pub async fn downgrade_host(&self, addr: &Url) -> Result<()> {
// if self.anchorlist_contains(addr).await {
// debug!(target: "net::store::downgrade_host()",
// "Removing non responsive peer from anchorlist");
let index = self
.get_anchorlist_index_at_addr(addr.clone())
.await
.expect("Expected anchorlist index to exist");
let entry = self
.get_anchorlist_entry_at_addr(addr)
.await
.expect("Expected anchorlist entry to exist");
// let index = self
// .get_anchorlist_index_at_addr(addr.clone())
// .await
// .expect("Expected anchorlist index to exist");
// let entry = self
// .get_anchorlist_entry_at_addr(addr)
// .await
// .expect("Expected anchorlist entry to exist");
self.anchorlist_remove(addr, index).await;
self.greylist_store_or_update(&[entry]).await?;
// self.anchorlist_remove(addr, index).await;
// self.greylist_store_or_update(&[entry]).await?;
Ok(())
} else if self.whitelist_contains(addr).await {
debug!(target: "net::store::downgrade_host()",
"Removing non responsive peer from whitelist");
// Ok(())
// } else if self.whitelist_contains(addr).await {
// debug!(target: "net::store::downgrade_host()",
// "Removing non responsive peer from whitelist");
let index = self
.get_whitelist_index_at_addr(addr.clone())
.await
.expect("Expected whitelist index to exist");
let entry = self
.get_whitelist_entry_at_addr(addr)
.await
.expect("Expected whitelist entry to exist");
// let index = self
// .get_whitelist_index_at_addr(addr.clone())
// .await
// .expect("Expected whitelist index to exist");
// let entry = self
// .get_whitelist_entry_at_addr(addr)
// .await
// .expect("Expected whitelist entry to exist");
self.whitelist_remove(addr, index).await;
self.greylist_store_or_update(&[entry]).await?;
// self.whitelist_remove(addr, index).await;
// self.greylist_store_or_update(&[entry]).await?;
Ok(())
} else {
debug!(target: "net::store::downgrade_host()",
"Removing non responsive peer from greylist");
// Ok(())
// } else {
// debug!(target: "net::store::downgrade_host()",
// "Removing non responsive peer from greylist");
let index = self
.get_greylist_index_at_addr(addr.clone())
.await
.expect("Expected greylist index to exist");
// let index = self
// .get_greylist_index_at_addr(addr.clone())
// .await
// .expect("Expected greylist index to exist");
self.greylist_remove(addr, index).await;
// self.greylist_remove(addr, index).await;
Ok(())
}
}
// Ok(())
// }
//}
/// Update the last_seen field of a peer on the greylist.
pub async fn greylist_update_last_seen(&self, addr: &Url, last_seen: u64, index: usize) {
@@ -918,7 +915,8 @@ impl Hosts {
ret.push((addr.clone(), *last_seen));
limit -= 1;
if limit == 0 {
debug!(target: "store::greylist_fetch_with_schemes", "Found matching scheme, returning");
debug!(target: "deadlock", "Found matching grey scheme, returning");
debug!(target: "store::greylist_fetch_with_schemes", "Found matching greylist entry, returning");
return ret
}
}
@@ -952,12 +950,16 @@ impl Hosts {
ret.push((addr.clone(), *last_seen));
parsed_limit -= 1;
if parsed_limit == 0 {
debug!(target: "deadlock",
"Found matching white scheme, returning {:?}", ret);
trace!(target: "store::whitelist_fetch_with_schemes",
"Found matching white scheme, returning {:?}", ret);
return ret
}
}
debug!(target: "store::whitelist_fetch_with_schemes",
debug!(target: "deadlock",
"No matching schemes! Trying greylist...");
warn!(target: "store::whitelist_fetch_with_schemes",
"No matching schemes! Trying greylist...");
return self.greylist_fetch_with_schemes(schemes, limit).await
}
@@ -997,14 +999,20 @@ impl Hosts {
ret.push((addr.clone(), *last_seen));
parsed_limit -= 1;
if parsed_limit == 0 {
debug!(target: "deadlock",
"Found matching anchor scheme, returning {:?}", ret);
trace!(target: "store::anchorlist_fetch_with_schemes",
"Found matching anchor scheme, returning {:?}", ret);
return ret
}
}
debug!(target: "store::anchorlist_fetch_with_schemes",
"No matching schemes! Trying greylist...");
debug!(target: "deadlock",
"No matching schemes! Trying whitelist...");
warn!(target: "store::anchorlist_fetch_with_schemes",
"No matching schemes! Trying whitelist...");
return self.whitelist_fetch_with_schemes(schemes, limit).await
}
}

View File

@@ -228,7 +228,9 @@ impl P2p {
/// Check whether we're connected to a given address
pub async fn exists(&self, addr: &Url) -> bool {
self.channels.lock().await.contains_key(addr)
//self.channels.lock().await.contains_key(addr)
//true
false
}
/// Add a channel to the set of connected channels

View File

@@ -100,7 +100,7 @@ impl ProtocolAddress {
"Appending to greylist...",
);
self.hosts.greylist_store_or_update(&addrs_msg.addrs).await?;
self.hosts.greylist_store_or_update(&addrs_msg.addrs).await;
}
}

View File

@@ -129,7 +129,7 @@ impl ProtocolBase for ProtocolSeed {
target: "net::protocol_seed::start()",
"Appending to greylist...",
);
self.hosts.greylist_store_or_update(&addrs_msg.addrs).await?;
self.hosts.greylist_store_or_update(&addrs_msg.addrs).await;
debug!(target: "net::protocol_seed::start()", "END => address={}", self.channel.address());
Ok(())

View File

@@ -134,6 +134,9 @@ impl ManualSession {
// Remove pending lock since register_channel will add the channel to p2p
self.p2p().remove_pending(&addr).await;
// Add this connection to the anchorlist, remove it from the [otherlist]
self.upgrade_connection(&addr).await;
// Notify that channel processing has finished
self.channel_subscriber.notify(Ok(channel)).await;

View File

@@ -24,6 +24,7 @@ use std::{
use async_trait::async_trait;
use log::debug;
use smol::Executor;
use url::Url;
use super::{channel::ChannelPtr, p2p::P2pPtr, protocol::ProtocolVersion};
use crate::Result;
@@ -143,16 +144,6 @@ pub trait Session: Sync {
// Perform handshake
protocol_version.run(executor.clone()).await?;
if self.type_id() != SESSION_INBOUND {
// Channel is now initialized. Timestamp this.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
self.p2p()
.hosts()
.anchorlist_store_or_update(&[(channel.address().clone(), last_seen)])
.await?;
}
// Add channel to p2p
self.p2p().store(channel.clone()).await;
@@ -163,6 +154,26 @@ pub trait Session: Sync {
Ok(())
}
/// Upgrade a connection to the anchorlist and remove it from the white or greylist.
/// Called after a connection has been successfully established in Outbound and Manual
/// sessions.
async fn upgrade_connection(&self, addr: &Url) {
let hosts = self.p2p().hosts();
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
hosts.anchorlist_store_or_update(&[(addr.clone(), last_seen)]).await;
if hosts.whitelist_contains(&addr).await {
let index = hosts.get_whitelist_index_at_addr(addr.clone()).await.unwrap();
hosts.whitelist_remove(&addr, index).await;
}
if hosts.greylist_contains(&addr).await {
let index = hosts.get_greylist_index_at_addr(addr.clone()).await.unwrap();
hosts.greylist_remove(&addr, index).await;
}
}
/// Returns a pointer to the p2p network interface
fn p2p(&self) -> P2pPtr;

View File

@@ -190,14 +190,13 @@ impl Slot {
let connects = self.p2p().settings().outbound_connections;
let white_count = connects * self.p2p().settings().white_connection_percent / 100;
// Up to anchor_connection_count connections:
//
// Select from the anchorlist
// If the anchorlist is empty, select from the whitelist
// If the whitelist is empty, select from the greylist
// If the greylist is empty, do peer discovery
let addrs = {
// Up to anchor_connection_count connections:
//
// Select from the anchorlist
// If the anchorlist is empty, select from the whitelist
// If the whitelist is empty, select from the greylist
// If the greylist is empty, do peer discovery
if slot_count < self.p2p().settings().anchor_connection_count {
debug!(target: "net::outbound_session::fetch_address()",
"First two connections- prefer anchor connections");
@@ -230,6 +229,7 @@ impl Slot {
// * address is already pending a connection
let addr = hosts.check_address_with_lock(self.p2p(), addrs).await;
return addr
//return None
}
// We first try to make connections to the addresses on our anchor list. We then find some
@@ -282,6 +282,8 @@ impl Slot {
continue
};
debug!(target: "deadlock", "Got addrs: {}, slot {} node {}", addr.0, self.slot, self.p2p().settings().node_id);
let host = addr.0;
let slot = self.slot;
@@ -305,12 +307,20 @@ impl Slot {
slot, err, self.p2p().settings().node_id
);
debug!(
target: "deadlock",
"connection failed: {}, slot {} node {}, channel {}",
err, slot, self.p2p().settings().node_id, host.clone()
);
dnetev!(self, OutboundSlotDisconnected, {
slot,
err: err.to_string()
});
self.channel_id.store(0, Ordering::Relaxed);
sleep(1).await;
continue
}
};
@@ -347,6 +357,9 @@ impl Slot {
self.channel_id.store(channel.info.id, Ordering::Relaxed);
// Add this connection to the anchorlist, remove it from the [otherlist]
self.session().upgrade_connection(&addr).await;
// Wait for channel to close
stop_sub.receive().await;
self.channel_id.store(0, Ordering::Relaxed);
@@ -374,9 +387,9 @@ impl Slot {
self.slot, addr, e
);
// At this point we've failed to connect.
// If the host is in the anchorlist or whitelist, downgrade it to greylist.
self.p2p().hosts().downgrade_host(&addr).await?;
//// At this point we've failed to connect.
//// If the host is in the anchorlist or whitelist, downgrade it to greylist.
//self.p2p().hosts().downgrade_host(&addr).await?;
// Remove connection from pending
self.p2p().remove_pending(&addr).await;

View File

@@ -31,36 +31,36 @@ use crate::{
};
// Number of nodes to spawn and number of peers each node connects to
const N_NODES: usize = 10;
const N_NODES: usize = 5;
const N_CONNS: usize = 3;
#[test]
fn p2p_test() {
let mut cfg = simplelog::ConfigBuilder::new();
//cfg.add_filter_ignore("sled".to_string());
//cfg.add_filter_ignore("net::protocol_ping".to_string());
//cfg.add_filter_ignore("net::channel::subscribe_stop()".to_string());
//cfg.add_filter_ignore("net::hosts".to_string());
//cfg.add_filter_ignore("net::inbound_session".to_string());
//cfg.add_filter_ignore("net::outbound_session".to_string());
//cfg.add_filter_ignore("net::session".to_string());
//cfg.add_filter_ignore("net::refinery".to_string());
//cfg.add_filter_ignore("net::message_subscriber".to_string());
//cfg.add_filter_ignore("net::protocol_address".to_string());
//cfg.add_filter_ignore("net::protocol_jobs_manager".to_string());
//cfg.add_filter_ignore("net::protocol_version".to_string());
//cfg.add_filter_ignore("net::protocol_registry".to_string());
//cfg.add_filter_ignore("net::protocol_seed".to_string());
//cfg.add_filter_ignore("net::channel".to_string());
//cfg.add_filter_ignore("net::p2p::seed".to_string());
//cfg.add_filter_ignore("net::p2p::start".to_string());
//cfg.add_filter_ignore("store".to_string());
//cfg.add_filter_ignore("net::store".to_string());
//cfg.add_filter_ignore("net::channel::send()".to_string());
//cfg.add_filter_ignore("net::channel::start()".to_string());
//cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string());
//cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string());
cfg.add_filter_ignore("sled".to_string());
cfg.add_filter_ignore("net::protocol_ping".to_string());
cfg.add_filter_ignore("net::channel::subscribe_stop()".to_string());
cfg.add_filter_ignore("net::hosts".to_string());
cfg.add_filter_ignore("net::inbound_session".to_string());
cfg.add_filter_ignore("net::outbound_session".to_string());
cfg.add_filter_ignore("net::session".to_string());
cfg.add_filter_ignore("net::refinery".to_string());
cfg.add_filter_ignore("net::message_subscriber".to_string());
cfg.add_filter_ignore("net::protocol_address".to_string());
cfg.add_filter_ignore("net::protocol_jobs_manager".to_string());
cfg.add_filter_ignore("net::protocol_version".to_string());
cfg.add_filter_ignore("net::protocol_registry".to_string());
cfg.add_filter_ignore("net::protocol_seed".to_string());
cfg.add_filter_ignore("net::channel".to_string());
cfg.add_filter_ignore("net::p2p::seed".to_string());
cfg.add_filter_ignore("net::p2p::start".to_string());
cfg.add_filter_ignore("store".to_string());
cfg.add_filter_ignore("net::store".to_string());
cfg.add_filter_ignore("net::channel::send()".to_string());
cfg.add_filter_ignore("net::channel::start()".to_string());
cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string());
cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string());
cfg.add_filter_ignore("net::tcp".to_string());
simplelog::TermLogger::init(
@@ -116,13 +116,13 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
for i in 0..N_NODES {
// Everyone will connect to N_CONNS random peers.
let mut peers = vec![];
for _ in 0..N_CONNS {
let mut port = 13200 + i;
while port == 13200 + i {
port = 13200 + rng.gen_range(0..N_NODES);
}
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
}
//for _ in 0..N_CONNS {
// let mut port = 13200 + i;
// while port == 13200 + i {
// port = 13200 + rng.gen_range(0..N_NODES);
// }
// peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
//}
let settings = Settings {
localnet: true,
inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()],
@@ -148,29 +148,29 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
}
//info!("Waiting until all peers connect");
sleep(30).await;
//sleep(5).await;
info!("Inspecting hostlists...");
for p2p in p2p_instances.iter() {
let hosts = p2p.hosts();
//info!("Inspecting hostlists...");
//for p2p in p2p_instances.iter() {
// let hosts = p2p.hosts();
let greylist = hosts.greylist.read().await;
let whitelist = hosts.whitelist.read().await;
let anchorlist = hosts.anchorlist.read().await;
// let greylist = hosts.greylist.read().await;
// let whitelist = hosts.whitelist.read().await;
// let anchorlist = hosts.anchorlist.read().await;
info!("Node {}", p2p.settings().node_id);
for (i, (url, last_seen)) in greylist.iter().enumerate() {
info!("Greylist entry {}: {}, {}", i, url, last_seen);
}
// info!("Node {}", p2p.settings().node_id);
// for (i, (url, last_seen)) in greylist.iter().enumerate() {
// info!("Greylist entry {}: {}, {}", i, url, last_seen);
// }
for (i, (url, last_seen)) in whitelist.iter().enumerate() {
info!("Whitelist entry {}: {}, {}", i, url, last_seen);
}
// for (i, (url, last_seen)) in whitelist.iter().enumerate() {
// info!("Whitelist entry {}: {}, {}", i, url, last_seen);
// }
for (i, (url, last_seen)) in anchorlist.iter().enumerate() {
info!("Anchorlist entry {}: {}, {}", i, url, last_seen);
}
}
// for (i, (url, last_seen)) in anchorlist.iter().enumerate() {
// info!("Anchorlist entry {}: {}, {}", i, url, last_seen);
// }
//}
// Stop the P2P network
for p2p in p2p_instances.iter() {