net: move downgrade to greylist into remove_sub_on_stop()

Previously there was a bug which happened v rarely in which:

> Outbound and Manual Session are waiting on a stop signal
> Outbound/ Manual receives a stop signal, de-registers channel (in move_host)
> Channel is selected by Slot 1 to be connected to, state is changed to Connect
> remove_sub_on_stop() receives a stop signal, de-registers channel
> Channel is selected by Slot 5 connected to, state is changed to Connect
> Slot 1 connects, state is changed to Connected
> Slot 5 connects -> panic!

To avoid this happening, we move unregister() out of move_host and perform the sequence:

recv stop signal -> move_host to greylist (if outbond/manual) -> unregister()

We do this inside the shared method remove_sub_on_stop to ensure the execution path always happens in the same way.
This commit is contained in:
draoi
2024-03-31 10:57:54 +02:00
parent 7d4151c230
commit 69c6530a5d
5 changed files with 46 additions and 14 deletions

View File

@@ -187,6 +187,7 @@ impl Lilith {
if !ping_node(url.clone(), p2p.clone()).await {
debug!(target: "lilith", "Host {} is not responsive. Downgrading from whitelist", url);
hosts.move_host(url, *last_seen, HostColor::Grey, false, None).await?;
hosts.unregister(url).await;
continue
}

View File

@@ -844,6 +844,7 @@ impl Hosts {
// We don't know this peer. We can safely update the state.
debug!(target: "net::hosts::try_update_registry()", "Inserting addr={}, state={}",
addr, new_state.to_string());
registry.insert(addr.clone(), new_state.clone());
Ok(new_state)
@@ -983,6 +984,16 @@ impl Hosts {
false
}
// TODO: doc
pub async fn is_connection_to_self(&self, url: &Url) -> bool {
let host_str = url.host_str().unwrap();
if self.settings.localnet {
self.settings.external_addrs.iter().any(|ext| host_str == ext.host_str().unwrap())
} else {
self.settings.external_addrs.iter().any(|ext| url.port() == ext.port())
}
}
/// Filter given addresses based on certain rulesets and validity. Strictly called only on
/// the first time learning of a new peer.
async fn filter_addresses(
@@ -1109,6 +1120,19 @@ impl Hosts {
ret
}
// TODO: doc
pub async fn fetch_last_seen(&self, addr: &Url) -> Option<u64> {
if self.container.contains(HostColor::Gold as usize, addr).await {
self.container.get_last_seen(HostColor::Gold as usize, addr).await
} else if self.container.contains(HostColor::White as usize, addr).await {
self.container.get_last_seen(HostColor::White as usize, addr).await
} else if self.container.contains(HostColor::Grey as usize, addr).await {
self.container.get_last_seen(HostColor::Grey as usize, addr).await
} else {
None
}
}
/// A single atomic function for moving hosts between hostlists. Called on the following occasions:
///
/// * When we cannot connect to a peer: move to grey, remove from white and gold.
@@ -1177,6 +1201,8 @@ impl Hosts {
// has passed through the refinery. This should never panic.
self.try_register(addr.clone(), HostState::Suspend).await.unwrap();
return Ok(());
} else {
return Ok(());
}
}

View File

@@ -151,12 +151,6 @@ impl ManualSession {
// Wait for channel to close
stop_sub.receive().await;
// Channel has disconnected. Downgrade this host to greylist.
self.p2p()
.hosts()
.move_host(&addr, last_seen, HostColor::Grey, false, None)
.await?;
info!(
target: "net::manual_session",
"[P2P] Manual outbound disconnected [{}]", url,

View File

@@ -22,7 +22,7 @@ use async_trait::async_trait;
use log::debug;
use smol::Executor;
use super::{channel::ChannelPtr, p2p::P2pPtr, protocol::ProtocolVersion};
use super::{channel::ChannelPtr, hosts::store::HostColor, p2p::P2pPtr, protocol::ProtocolVersion};
use crate::Result;
pub mod inbound_session;
@@ -46,8 +46,10 @@ pub type SessionWeakPtr = Weak<dyn Session + Send + Sync + 'static>;
/// Removes channel from the list of connected channels when a stop signal
/// is received.
pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) {
pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: SessionBitFlag) {
debug!(target: "net::session::remove_sub_on_stop()", "[START]");
let addr = channel.address();
// Subscribe to stop events
let stop_sub = channel.clone().subscribe_stop().await;
@@ -55,14 +57,26 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) {
// Wait for a stop event
stop_sub.receive().await;
}
debug!(
target: "net::session::remove_sub_on_stop()",
"Received stop event. Removing channel {}", channel.address(),
"Received stop event. Removing channel {}", addr,
);
if let SESSION_OUTBOUND | SESSION_MANUAL = type_id {
debug!(
target: "net::session::remove_sub_on_stop()",
"Downgrading {}", addr,
);
if !p2p.hosts().is_connection_to_self(addr).await {
let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap();
p2p.hosts().move_host(addr, last_seen, HostColor::Grey, false, None).await.unwrap();
}
}
// Remove channel from p2p
p2p.hosts().unregister(channel.address()).await;
debug!(target: "net::session::remove_sub_on_stop()", "[END]");
}
@@ -148,7 +162,7 @@ pub trait Session: Sync {
self.p2p().hosts().register_channel(channel.clone()).await;
// Subscribe to stop, so we can remove from registry
executor.spawn(remove_sub_on_stop(self.p2p(), channel)).detach();
executor.spawn(remove_sub_on_stop(self.p2p(), channel, self.type_id())).detach();
// Channel is ready for use
Ok(())

View File

@@ -408,9 +408,6 @@ impl Slot {
// Wait for channel to close
stop_sub.receive().await;
// Channel has disconnected. Downgrade this host to greylist.
hosts.move_host(&addr, last_seen, HostColor::Grey, false, None).await.unwrap();
self.channel_id.store(0, Ordering::Relaxed);
}
}