net: implement greylist downgrade and goldlist upgrade

This commit is contained in:
draoi
2024-04-01 13:26:39 +02:00
parent 1cd330b798
commit 52e6ea0530
6 changed files with 51 additions and 43 deletions

View File

@@ -182,7 +182,7 @@ impl Lilith {
if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
debug!(target: "lilith", "Host {} is not responsive. Downgrading from whitelist", url);
hosts.move_host(url, *last_seen, HostColor::Grey, None).await?;
hosts.move_host(url, *last_seen, HostColor::Grey).await?;
hosts.unregister(url).await;
continue

View File

@@ -320,7 +320,7 @@ impl Channel {
pub async fn ban(&self, peer: &Url) {
debug!(target: "net::channel::ban()", "START {:?}", self);
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
self.p2p().hosts().move_host(peer, last_seen, HostColor::Black, None).await.unwrap();
self.p2p().hosts().move_host(peer, last_seen, HostColor::Black).await.unwrap();
self.stop().await;
debug!(target: "net::channel::ban()", "STOP {:?}", self);

View File

@@ -83,7 +83,7 @@ pub type HostRegistry = RwLock<HashMap<Url, HostState>>;
/// +------+ +---------+
/// +------> | move | ---> | suspend |
/// | +------+ +---------+
/// | ^ |
/// | | |
/// | | v +--------+
/// +---------+ | +--------+ | insert |
/// | connect | | | refine | +--------+
@@ -133,10 +133,8 @@ pub enum HostState {
Connected(ChannelPtr),
/// Host that are moving between hostlists, implemented in
/// store::move_host(). Move takes a ChannelPtr so that Channels that
/// are being promoted to the Gold list can be re-inserted into the
/// Connected once the promotion is safely finalized.
Move(Option<ChannelPtr>),
/// store::move_host().
Move,
}
impl HostState {
@@ -151,7 +149,7 @@ impl HostState {
HostState::Connect => Err(Error::HostStateBlocked(start, end)),
HostState::Suspend => Err(Error::HostStateBlocked(start, end)),
HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)),
HostState::Move(_) => Err(Error::HostStateBlocked(start, end)),
HostState::Move => Err(Error::HostStateBlocked(start, end)),
}
}
@@ -167,7 +165,7 @@ impl HostState {
HostState::Connect => Err(Error::HostStateBlocked(start, end)),
HostState::Suspend => Ok(HostState::Refine),
HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)),
HostState::Move(_) => Err(Error::HostStateBlocked(start, end)),
HostState::Move => Err(Error::HostStateBlocked(start, end)),
}
}
@@ -182,16 +180,15 @@ impl HostState {
HostState::Connect => Err(Error::HostStateBlocked(start, end)),
HostState::Suspend => Err(Error::HostStateBlocked(start, end)),
HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)),
HostState::Move(_) => Err(Error::HostStateBlocked(start, end)),
HostState::Move => Err(Error::HostStateBlocked(start, end)),
}
}
// Try to change state to Connected. Possible if this peer's state
// is currently Connect, Refine or Move. Refine is necessary since the
// is currently Connect or Refine, or Move. Refine is necessary since the
// refinery process requires us to establish a connection to a peer.
// Move is necessary in the case that a host is being promoted to Gold list,
// and must be re-added to the Connected() state after the promotion
// has completed.
// Move is necessary due to the upgrade to Gold sequence in
// `session::perform_handshake_protocols`.
fn try_connected(&self, channel: ChannelPtr) -> Result<Self> {
let start = self.to_string();
let end = HostState::Connected(channel.clone()).to_string();
@@ -201,23 +198,23 @@ impl HostState {
HostState::Connect => Ok(HostState::Connected(channel)),
HostState::Suspend => Err(Error::HostStateBlocked(start, end)),
HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)),
HostState::Move(_) => Ok(HostState::Connected(channel)),
HostState::Move => Ok(HostState::Connected(channel)),
}
}
// Try to change state to Move. Possibly if this host is currently
// Connect i.e. it is being connected to, or if we are currently Connected
// to this peer (necessary due to Gold list promotion sequence).
fn try_move(&self, channel: Option<ChannelPtr>) -> Result<Self> {
// to this peer (due to host Downgrade sequence in `session::remove_sub_on_stop`)
fn try_move(&self) -> Result<Self> {
let start = self.to_string();
let end = HostState::Move(channel.clone()).to_string();
let end = HostState::Move.to_string();
match self {
HostState::Insert => Err(Error::HostStateBlocked(start, end)),
HostState::Refine => Err(Error::HostStateBlocked(start, end)),
HostState::Connect => Ok(HostState::Move(channel)),
HostState::Connect => Ok(HostState::Move),
HostState::Suspend => Err(Error::HostStateBlocked(start, end)),
HostState::Connected(_) => Ok(HostState::Move(channel)),
HostState::Move(_) => Err(Error::HostStateBlocked(start, end)),
HostState::Connected(_) => Ok(HostState::Move),
HostState::Move => Err(Error::HostStateBlocked(start, end)),
}
}
@@ -233,7 +230,7 @@ impl HostState {
HostState::Connect => Err(Error::HostStateBlocked(start, end)),
HostState::Suspend => Err(Error::HostStateBlocked(start, end)),
HostState::Connected(_) => Err(Error::HostStateBlocked(start, end)),
HostState::Move(_) => Ok(HostState::Suspend),
HostState::Move => Ok(HostState::Suspend),
}
}
}
@@ -874,7 +871,7 @@ impl Hosts {
HostState::Connect => current_state.try_connect(),
HostState::Suspend => current_state.try_suspend(),
HostState::Connected(c) => current_state.try_connected(c),
HostState::Move(c) => current_state.try_move(c),
HostState::Move => current_state.try_move(),
};
if let Ok(state) = &result {
@@ -1192,13 +1189,12 @@ impl Hosts {
addr: &Url,
last_seen: u64,
destination: HostColor,
channel: Option<ChannelPtr>,
) -> Result<()> {
debug!(target: "net::hosts::move_host()", "Trying to move addr={} node={} destination={:?}",
addr, self.settings.node_id, destination);
// This should never panic. Failure indicates a misuse of the HostState API.
self.try_register(addr.clone(), HostState::Move(channel.clone())).await.unwrap();
self.try_register(addr.clone(), HostState::Move).await.unwrap();
match destination {
// Downgrade to grey. Remove from white and gold.

View File

@@ -16,13 +16,16 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::{Arc, Weak};
use std::{
sync::{Arc, Weak},
time::UNIX_EPOCH,
};
use async_trait::async_trait;
use log::debug;
use smol::Executor;
use super::{channel::ChannelPtr, p2p::P2pPtr, protocol::ProtocolVersion};
use super::{channel::ChannelPtr, hosts::HostColor, p2p::P2pPtr, protocol::ProtocolVersion};
use crate::Result;
pub mod inbound_session;
@@ -49,7 +52,7 @@ pub type SessionWeakPtr = Weak<dyn Session + Send + Sync + 'static>;
/// Removes channel from the list of connected channels when a stop signal
/// is received.
pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, _type_id: SessionBitFlag) {
pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: SessionBitFlag) {
debug!(target: "net::session::remove_sub_on_stop()", "[START]");
let addr = channel.address();
@@ -65,16 +68,15 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, _type_id: Sess
"Received stop event. Removing channel {}", addr,
);
// TODO: downgrade to greylist if outbound or manual session.
//if let SESSION_OUTBOUND | SESSION_MANUAL = type_id {
// debug!(
// target: "net::session::remove_sub_on_stop()",
// "Downgrading {}", addr,
// );
if type_id == SESSION_MANUAL || type_id == SESSION_OUTBOUND {
debug!(
target: "net::session::remove_sub_on_stop()",
"Downgrading {}", addr,
);
// let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap();
// p2p.hosts().move_host(addr, last_seen, HostColor::Grey, None).await.unwrap();
//}
let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap();
p2p.hosts().move_host(addr, last_seen, HostColor::Grey).await.unwrap();
}
// Remove channel from p2p
p2p.hosts().unregister(channel.address()).await;
@@ -160,10 +162,20 @@ pub trait Session: Sync {
// Perform handshake
protocol_version.run(executor.clone()).await?;
// TODO: Upgrade to goldlist if outbound or manual session.
//if let SESSION_OUTBOUND | SESSION_MANUAL = type_id {
// //...
//}
// Upgrade to goldlist if outbound or manual session.
if self.type_id() == SESSION_MANUAL || self.type_id() == SESSION_OUTBOUND {
debug!(
target: "net::session::perform_handshake_protocols()",
"Upgrading {}", channel.address(),
);
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
self.p2p()
.hosts()
.move_host(channel.address(), last_seen, HostColor::Gold)
.await
.unwrap();
}
// Attempt to add channel to registry
self.p2p().hosts().register_channel(channel.clone()).await;

View File

@@ -429,7 +429,7 @@ impl Slot {
);
// At this point we failed to connect. We'll downgrade this peer now.
self.p2p().hosts().move_host(&addr, last_seen, HostColor::Grey, None).await?;
self.p2p().hosts().move_host(&addr, last_seen, HostColor::Grey).await?;
// Mark its state as Suspend, which sends it to the Refinery for processing.
self.p2p().hosts().try_register(addr.clone(), HostState::Suspend).await.unwrap();

View File

@@ -274,7 +274,7 @@ impl GreylistRefinery {
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
// Add to the whitelist and remove from the greylist.
hosts.move_host(url, last_seen, HostColor::White, None).await.unwrap();
hosts.move_host(url, last_seen, HostColor::White).await.unwrap();
hosts.unregister(url).await;
debug!(target: "net::refinery", "GreylistRefinery complete!");