From 52e6ea0530de80cd9decab5d1bcff03b05ec9bdc Mon Sep 17 00:00:00 2001 From: draoi Date: Mon, 1 Apr 2024 13:26:39 +0200 Subject: [PATCH] net: implement greylist downgrade and goldlist upgrade --- bin/lilith/src/main.rs | 2 +- src/net/channel.rs | 2 +- src/net/hosts.rs | 42 +++++++++++++-------------- src/net/session/mod.rs | 44 ++++++++++++++++++----------- src/net/session/outbound_session.rs | 2 +- src/net/session/refine_session.rs | 2 +- 6 files changed, 51 insertions(+), 43 deletions(-) diff --git a/bin/lilith/src/main.rs b/bin/lilith/src/main.rs index 7467393ca..470426f69 100644 --- a/bin/lilith/src/main.rs +++ b/bin/lilith/src/main.rs @@ -182,7 +182,7 @@ impl Lilith { if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await { debug!(target: "lilith", "Host {} is not responsive. Downgrading from whitelist", url); - hosts.move_host(url, *last_seen, HostColor::Grey, None).await?; + hosts.move_host(url, *last_seen, HostColor::Grey).await?; hosts.unregister(url).await; continue diff --git a/src/net/channel.rs b/src/net/channel.rs index e10c38e24..ccc09e78c 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -320,7 +320,7 @@ impl Channel { pub async fn ban(&self, peer: &Url) { debug!(target: "net::channel::ban()", "START {:?}", self); let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - self.p2p().hosts().move_host(peer, last_seen, HostColor::Black, None).await.unwrap(); + self.p2p().hosts().move_host(peer, last_seen, HostColor::Black).await.unwrap(); self.stop().await; debug!(target: "net::channel::ban()", "STOP {:?}", self); diff --git a/src/net/hosts.rs b/src/net/hosts.rs index 00e49aaa0..60eb8a880 100644 --- a/src/net/hosts.rs +++ b/src/net/hosts.rs @@ -83,7 +83,7 @@ pub type HostRegistry = RwLock>; /// +------+ +---------+ /// +------> | move | ---> | suspend | /// | +------+ +---------+ -/// | ^ | +/// | | | /// | | v +--------+ /// +---------+ | +--------+ | insert | /// | connect | | | refine | +--------+ @@ -133,10 +133,8 @@ pub enum HostState { Connected(ChannelPtr), /// Host that are moving between hostlists, implemented in - /// store::move_host(). Move takes a ChannelPtr so that Channels that - /// are being promoted to the Gold list can be re-inserted into the - /// Connected once the promotion is safely finalized. - Move(Option), + /// store::move_host(). + Move, } impl HostState { @@ -151,7 +149,7 @@ impl HostState { HostState::Connect => Err(Error::HostStateBlocked(start, end)), HostState::Suspend => Err(Error::HostStateBlocked(start, end)), HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)), - HostState::Move(_) => Err(Error::HostStateBlocked(start, end)), + HostState::Move => Err(Error::HostStateBlocked(start, end)), } } @@ -167,7 +165,7 @@ impl HostState { HostState::Connect => Err(Error::HostStateBlocked(start, end)), HostState::Suspend => Ok(HostState::Refine), HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)), - HostState::Move(_) => Err(Error::HostStateBlocked(start, end)), + HostState::Move => Err(Error::HostStateBlocked(start, end)), } } @@ -182,16 +180,15 @@ impl HostState { HostState::Connect => Err(Error::HostStateBlocked(start, end)), HostState::Suspend => Err(Error::HostStateBlocked(start, end)), HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)), - HostState::Move(_) => Err(Error::HostStateBlocked(start, end)), + HostState::Move => Err(Error::HostStateBlocked(start, end)), } } // Try to change state to Connected. Possible if this peer's state - // is currently Connect, Refine or Move. Refine is necessary since the + // is currently Connect or Refine, or Move. Refine is necessary since the // refinery process requires us to establish a connection to a peer. - // Move is necessary in the case that a host is being promoted to Gold list, - // and must be re-added to the Connected() state after the promotion - // has completed. + // Move is necessary due to the upgrade to Gold sequence in + // `session::perform_handshake_protocols`. fn try_connected(&self, channel: ChannelPtr) -> Result { let start = self.to_string(); let end = HostState::Connected(channel.clone()).to_string(); @@ -201,23 +198,23 @@ impl HostState { HostState::Connect => Ok(HostState::Connected(channel)), HostState::Suspend => Err(Error::HostStateBlocked(start, end)), HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)), - HostState::Move(_) => Ok(HostState::Connected(channel)), + HostState::Move => Ok(HostState::Connected(channel)), } } // Try to change state to Move. Possibly if this host is currently // Connect i.e. it is being connected to, or if we are currently Connected - // to this peer (necessary due to Gold list promotion sequence). - fn try_move(&self, channel: Option) -> Result { + // to this peer (due to host Downgrade sequence in `session::remove_sub_on_stop`) + fn try_move(&self) -> Result { let start = self.to_string(); - let end = HostState::Move(channel.clone()).to_string(); + let end = HostState::Move.to_string(); match self { HostState::Insert => Err(Error::HostStateBlocked(start, end)), HostState::Refine => Err(Error::HostStateBlocked(start, end)), - HostState::Connect => Ok(HostState::Move(channel)), + HostState::Connect => Ok(HostState::Move), HostState::Suspend => Err(Error::HostStateBlocked(start, end)), - HostState::Connected(_) => Ok(HostState::Move(channel)), - HostState::Move(_) => Err(Error::HostStateBlocked(start, end)), + HostState::Connected(_) => Ok(HostState::Move), + HostState::Move => Err(Error::HostStateBlocked(start, end)), } } @@ -233,7 +230,7 @@ impl HostState { HostState::Connect => Err(Error::HostStateBlocked(start, end)), HostState::Suspend => Err(Error::HostStateBlocked(start, end)), HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)), - HostState::Move(_) => Ok(HostState::Suspend), + HostState::Move => Ok(HostState::Suspend), } } } @@ -874,7 +871,7 @@ impl Hosts { HostState::Connect => current_state.try_connect(), HostState::Suspend => current_state.try_suspend(), HostState::Connected(c) => current_state.try_connected(c), - HostState::Move(c) => current_state.try_move(c), + HostState::Move => current_state.try_move(), }; if let Ok(state) = &result { @@ -1192,13 +1189,12 @@ impl Hosts { addr: &Url, last_seen: u64, destination: HostColor, - channel: Option, ) -> Result<()> { debug!(target: "net::hosts::move_host()", "Trying to move addr={} node={} destination={:?}", addr, self.settings.node_id, destination); // This should never panic. Failure indicates a misuse of the HostState API. - self.try_register(addr.clone(), HostState::Move(channel.clone())).await.unwrap(); + self.try_register(addr.clone(), HostState::Move).await.unwrap(); match destination { // Downgrade to grey. Remove from white and gold. diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 1288e8c64..01492cea6 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -16,13 +16,16 @@ * along with this program. If not, see . */ -use std::sync::{Arc, Weak}; +use std::{ + sync::{Arc, Weak}, + time::UNIX_EPOCH, +}; use async_trait::async_trait; use log::debug; use smol::Executor; -use super::{channel::ChannelPtr, p2p::P2pPtr, protocol::ProtocolVersion}; +use super::{channel::ChannelPtr, hosts::HostColor, p2p::P2pPtr, protocol::ProtocolVersion}; use crate::Result; pub mod inbound_session; @@ -49,7 +52,7 @@ pub type SessionWeakPtr = Weak; /// Removes channel from the list of connected channels when a stop signal /// is received. -pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, _type_id: SessionBitFlag) { +pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: SessionBitFlag) { debug!(target: "net::session::remove_sub_on_stop()", "[START]"); let addr = channel.address(); @@ -65,16 +68,15 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, _type_id: Sess "Received stop event. Removing channel {}", addr, ); - // TODO: downgrade to greylist if outbound or manual session. - //if let SESSION_OUTBOUND | SESSION_MANUAL = type_id { - // debug!( - // target: "net::session::remove_sub_on_stop()", - // "Downgrading {}", addr, - // ); + if type_id == SESSION_MANUAL || type_id == SESSION_OUTBOUND { + debug!( + target: "net::session::remove_sub_on_stop()", + "Downgrading {}", addr, + ); - // let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap(); - // p2p.hosts().move_host(addr, last_seen, HostColor::Grey, None).await.unwrap(); - //} + let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap(); + p2p.hosts().move_host(addr, last_seen, HostColor::Grey).await.unwrap(); + } // Remove channel from p2p p2p.hosts().unregister(channel.address()).await; @@ -160,10 +162,20 @@ pub trait Session: Sync { // Perform handshake protocol_version.run(executor.clone()).await?; - // TODO: Upgrade to goldlist if outbound or manual session. - //if let SESSION_OUTBOUND | SESSION_MANUAL = type_id { - // //... - //} + // Upgrade to goldlist if outbound or manual session. + if self.type_id() == SESSION_MANUAL || self.type_id() == SESSION_OUTBOUND { + debug!( + target: "net::session::perform_handshake_protocols()", + "Upgrading {}", channel.address(), + ); + + let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); + self.p2p() + .hosts() + .move_host(channel.address(), last_seen, HostColor::Gold) + .await + .unwrap(); + } // Attempt to add channel to registry self.p2p().hosts().register_channel(channel.clone()).await; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index fb0f533ce..a799d936c 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -429,7 +429,7 @@ impl Slot { ); // At this point we failed to connect. We'll downgrade this peer now. - self.p2p().hosts().move_host(&addr, last_seen, HostColor::Grey, None).await?; + self.p2p().hosts().move_host(&addr, last_seen, HostColor::Grey).await?; // Mark its state as Suspend, which sends it to the Refinery for processing. self.p2p().hosts().try_register(addr.clone(), HostState::Suspend).await.unwrap(); diff --git a/src/net/session/refine_session.rs b/src/net/session/refine_session.rs index 991d32b3e..0a5a0b01e 100644 --- a/src/net/session/refine_session.rs +++ b/src/net/session/refine_session.rs @@ -274,7 +274,7 @@ impl GreylistRefinery { let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); // Add to the whitelist and remove from the greylist. - hosts.move_host(url, last_seen, HostColor::White, None).await.unwrap(); + hosts.move_host(url, last_seen, HostColor::White).await.unwrap(); hosts.unregister(url).await; debug!(target: "net::refinery", "GreylistRefinery complete!");