diff --git a/bin/lilith/src/main.rs b/bin/lilith/src/main.rs index 4818e656c..febdc09d2 100644 --- a/bin/lilith/src/main.rs +++ b/bin/lilith/src/main.rs @@ -187,6 +187,7 @@ impl Lilith { if !ping_node(url.clone(), p2p.clone()).await { debug!(target: "lilith", "Host {} is not responsive. Downgrading from whitelist", url); hosts.move_host(url, *last_seen, HostColor::Grey, false, None).await?; + hosts.unregister(url).await; continue } diff --git a/src/net/hosts/store.rs b/src/net/hosts/store.rs index a80a5b6a0..5efc649b0 100644 --- a/src/net/hosts/store.rs +++ b/src/net/hosts/store.rs @@ -844,6 +844,7 @@ impl Hosts { // We don't know this peer. We can safely update the state. debug!(target: "net::hosts::try_update_registry()", "Inserting addr={}, state={}", addr, new_state.to_string()); + registry.insert(addr.clone(), new_state.clone()); Ok(new_state) @@ -983,6 +984,16 @@ impl Hosts { false } + // TODO: doc + pub async fn is_connection_to_self(&self, url: &Url) -> bool { + let host_str = url.host_str().unwrap(); + if self.settings.localnet { + self.settings.external_addrs.iter().any(|ext| host_str == ext.host_str().unwrap()) + } else { + self.settings.external_addrs.iter().any(|ext| url.port() == ext.port()) + } + } + /// Filter given addresses based on certain rulesets and validity. Strictly called only on /// the first time learning of a new peer. async fn filter_addresses( @@ -1109,6 +1120,19 @@ impl Hosts { ret } + // TODO: doc + pub async fn fetch_last_seen(&self, addr: &Url) -> Option { + if self.container.contains(HostColor::Gold as usize, addr).await { + self.container.get_last_seen(HostColor::Gold as usize, addr).await + } else if self.container.contains(HostColor::White as usize, addr).await { + self.container.get_last_seen(HostColor::White as usize, addr).await + } else if self.container.contains(HostColor::Grey as usize, addr).await { + self.container.get_last_seen(HostColor::Grey as usize, addr).await + } else { + None + } + } + /// A single atomic function for moving hosts between hostlists. Called on the following occasions: /// /// * When we cannot connect to a peer: move to grey, remove from white and gold. @@ -1177,6 +1201,8 @@ impl Hosts { // has passed through the refinery. This should never panic. self.try_register(addr.clone(), HostState::Suspend).await.unwrap(); return Ok(()); + } else { + return Ok(()); } } diff --git a/src/net/session/manual_session.rs b/src/net/session/manual_session.rs index 6af8059e6..c492cfd6e 100644 --- a/src/net/session/manual_session.rs +++ b/src/net/session/manual_session.rs @@ -151,12 +151,6 @@ impl ManualSession { // Wait for channel to close stop_sub.receive().await; - // Channel has disconnected. Downgrade this host to greylist. - self.p2p() - .hosts() - .move_host(&addr, last_seen, HostColor::Grey, false, None) - .await?; - info!( target: "net::manual_session", "[P2P] Manual outbound disconnected [{}]", url, diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 566f5a0aa..bd5c3b931 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use log::debug; use smol::Executor; -use super::{channel::ChannelPtr, p2p::P2pPtr, protocol::ProtocolVersion}; +use super::{channel::ChannelPtr, hosts::store::HostColor, p2p::P2pPtr, protocol::ProtocolVersion}; use crate::Result; pub mod inbound_session; @@ -46,8 +46,10 @@ pub type SessionWeakPtr = Weak; /// Removes channel from the list of connected channels when a stop signal /// is received. -pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) { +pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: SessionBitFlag) { debug!(target: "net::session::remove_sub_on_stop()", "[START]"); + let addr = channel.address(); + // Subscribe to stop events let stop_sub = channel.clone().subscribe_stop().await; @@ -55,14 +57,26 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) { // Wait for a stop event stop_sub.receive().await; } - debug!( target: "net::session::remove_sub_on_stop()", - "Received stop event. Removing channel {}", channel.address(), + "Received stop event. Removing channel {}", addr, ); + if let SESSION_OUTBOUND | SESSION_MANUAL = type_id { + debug!( + target: "net::session::remove_sub_on_stop()", + "Downgrading {}", addr, + ); + + if !p2p.hosts().is_connection_to_self(addr).await { + let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap(); + p2p.hosts().move_host(addr, last_seen, HostColor::Grey, false, None).await.unwrap(); + } + } + // Remove channel from p2p p2p.hosts().unregister(channel.address()).await; + debug!(target: "net::session::remove_sub_on_stop()", "[END]"); } @@ -148,7 +162,7 @@ pub trait Session: Sync { self.p2p().hosts().register_channel(channel.clone()).await; // Subscribe to stop, so we can remove from registry - executor.spawn(remove_sub_on_stop(self.p2p(), channel)).detach(); + executor.spawn(remove_sub_on_stop(self.p2p(), channel, self.type_id())).detach(); // Channel is ready for use Ok(()) diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index b75151554..cf5e230a7 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -408,9 +408,6 @@ impl Slot { // Wait for channel to close stop_sub.receive().await; - // Channel has disconnected. Downgrade this host to greylist. - hosts.move_host(&addr, last_seen, HostColor::Grey, false, None).await.unwrap(); - self.channel_id.store(0, Ordering::Relaxed); } }