diff --git a/bin/lilith/src/main.rs b/bin/lilith/src/main.rs index c48ffeb8f..7467393ca 100644 --- a/bin/lilith/src/main.rs +++ b/bin/lilith/src/main.rs @@ -39,11 +39,7 @@ use url::Url; use darkfi::{ async_daemonize, cli_desc, - net::{ - self, - hosts::{refinery::ping_node, store::HostColor}, - P2p, P2pPtr, - }, + net::{self, hosts::HostColor, P2p, P2pPtr}, rpc::{ jsonrpc::*, server::{listen_and_serve, RequestHandler}, @@ -184,7 +180,7 @@ impl Lilith { let url = &entry.0; let last_seen = &entry.1; - if !ping_node(url.clone(), p2p.clone()).await { + if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await { debug!(target: "lilith", "Host {} is not responsive. Downgrading from whitelist", url); hosts.move_host(url, *last_seen, HostColor::Grey, None).await?; hosts.unregister(url).await; diff --git a/src/net/acceptor.rs b/src/net/acceptor.rs index 36faa85f1..17a679330 100644 --- a/src/net/acceptor.rs +++ b/src/net/acceptor.rs @@ -30,7 +30,7 @@ use url::Url; use super::{ channel::{Channel, ChannelPtr}, - hosts::store::HostColor, + hosts::HostColor, session::SessionWeakPtr, transport::{Listener, PtListener}, }; diff --git a/src/net/channel.rs b/src/net/channel.rs index 9462caeba..e10c38e24 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -37,7 +37,7 @@ use url::Url; use super::{ dnet::{self, dnetev, DnetEvent}, - hosts::store::HostColor, + hosts::HostColor, message, message::Packet, message_subscriber::{MessageSubscription, MessageSubsystem}, diff --git a/src/net/connector.rs b/src/net/connector.rs index 1a206a0a9..3c95a9dd6 100644 --- a/src/net/connector.rs +++ b/src/net/connector.rs @@ -23,7 +23,7 @@ use url::Url; use super::{ channel::{Channel, ChannelPtr}, - hosts::store::HostColor, + hosts::HostColor, session::SessionWeakPtr, settings::SettingsPtr, transport::Dialer, diff --git a/src/net/hosts/store.rs b/src/net/hosts.rs similarity index 96% rename from src/net/hosts/store.rs rename to src/net/hosts.rs index 73251d040..00e49aaa0 100644 --- a/src/net/hosts/store.rs +++ b/src/net/hosts.rs @@ -23,7 +23,7 @@ use rand::{prelude::IteratorRandom, rngs::OsRng, Rng}; use smol::lock::RwLock; use url::Url; -use super::super::{settings::SettingsPtr, ChannelPtr}; +use super::{settings::SettingsPtr, ChannelPtr}; use crate::{ system::{Subscriber, SubscriberPtr, Subscription}, util::{ @@ -33,6 +33,29 @@ use crate::{ Error, Result, }; +/// The main interface for interacting with the hostlist. Contains the following: +/// +/// `Hosts`: the main parent class that manages HostRegistry and HostContainer. It is also +/// responsible for filtering addresses before writing to the hostlist. +/// +/// `HostRegistry`: A locked HashMap that maps peer addresses onto mutually exclusive +/// states (`HostState`). Prevents race conditions by dictating a strict flow of logically +/// acceptable states. +/// +/// `HostContainer`: A wrapper for the hostlists. Each hostlist is represented by a `HostColor`, +/// which can be Grey, White, Gold or Black. Exposes a common interface for hostlist queries and +/// utilities. +/// +/// `HostColor`: White hosts have been seen recently. Gold hosts we have been able to establish +/// a connection to. Grey hosts are recently received hosts that are periodically refreshed +/// using the greylist refinery. Black hosts are considered hostile and are strictly avoided +/// for the duration of the program. Dark hosts are hosts that do not match our transports, but +/// that we continue to share with other peers. They are otherwise ignored. +/// +/// `HostState`: a set of mutually exclusive states that can be Insert, Refine, Connect, Suspend +/// or Connected. The state is `None` when the corresponding host has been removed from the +/// HostRegistry. + // An array containing all possible local host strings // TODO: This could perhaps be more exhaustive? pub const LOCAL_HOST_STRS: [&str; 2] = ["localhost", "localhost.localdomain"]; @@ -108,6 +131,7 @@ pub enum HostState { Suspend, /// Hosts that have been successfully connected to. Connected(ChannelPtr), + /// Host that are moving between hostlists, implemented in /// store::move_host(). Move takes a ChannelPtr so that Channels that /// are being promoted to the Gold list can be re-inserted into the @@ -838,6 +862,9 @@ impl Hosts { pub async fn try_register(&self, addr: Url, new_state: HostState) -> Result { let mut registry = self.registry.write().await; + debug!(target: "net::hosts::try_update_registry()", "Try register addr={}, state={}", + addr, &new_state); + if registry.contains_key(&addr) { let current_state = registry.get(&addr).unwrap().clone(); @@ -854,6 +881,8 @@ impl Hosts { registry.insert(addr.clone(), state.clone()); } + debug!(target: "net::hosts::try_update_registry()", "Returning result {:?}", result); + result } else { // We don't know this peer. We can safely update the state. @@ -904,6 +933,7 @@ impl Hosts { pub async fn unregister(&self, addr: &Url) { debug!(target: "net::hosts::unregister()", "Removing {} from HostRegistry", addr); self.registry.write().await.remove(addr); + debug!(target: "net::hosts::unregister()", "Removed {} from HostRegistry", addr); } /// Returns the list of connected channels. @@ -1218,37 +1248,10 @@ impl Hosts { #[cfg(test)] mod tests { - use smol::Executor; use std::time::UNIX_EPOCH; - use super::{ - super::super::{settings::Settings, P2p}, - *, - }; - use crate::{net::hosts::refinery::ping_node, system::sleep}; - - #[test] - fn test_ping_node() { - smol::block_on(async { - let settings = Settings { - localnet: false, - external_addrs: vec![ - Url::parse("tcp://foo.bar:123").unwrap(), - Url::parse("tcp://lol.cat:321").unwrap(), - ], - ..Default::default() - }; - - let ex = Arc::new(Executor::new()); - let p2p = P2p::new(settings, ex.clone()).await; - - let url = Url::parse("tcp://xeno.systems.wtf").unwrap(); - println!("Pinging node..."); - let task = ex.spawn(ping_node(url.clone(), p2p)); - ex.run(task).await; - println!("Ping node complete!"); - }); - } + use super::{super::settings::Settings, *}; + use crate::system::sleep; #[test] fn test_is_local_host() { diff --git a/src/net/hosts/mod.rs b/src/net/hosts/mod.rs deleted file mode 100644 index fa53a9657..000000000 --- a/src/net/hosts/mod.rs +++ /dev/null @@ -1,57 +0,0 @@ -/* This file is part of DarkFi (https://dark.fi) - * - * Copyright (C) 2020-2024 Dyne.org foundation - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -/// Periodically probes entries in the greylist. -/// -/// Randomly selects a greylist entry and tries to -/// establish a local connection to it using the method probe_node(), which creates -/// a channel and does a version exchange using `perform_handshake_protocols()`. -/// -/// If successful, the entry is removed from the greylist and added to the whitelist -/// with an updated last_seen timestamp. If non-successful, the entry is removed from -/// the greylist. -/// -/// The method `probe_node()` is also used by `ProtocolSeed` and `ProtocolAddress`. -/// We try to establish local connections to our own external addresses using -/// `probe_node()` to ensure the address is valid before propagating in `ProtocolSeed` -/// and `ProtocolAddress`. -pub mod refinery; - -/// The main interface for interacting with the hostlist. Contains the following: -/// -/// `Hosts`: the main parent class that manages HostRegistry and HostContainer. It is also -/// responsible for filtering addresses before writing to the hostlist. -/// -/// `HostRegistry`: A locked HashMap that maps peer addresses onto mutually exclusive -/// states (`HostState`). Prevents race conditions by dictating a strict flow of logically -/// acceptable states. -/// -/// `HostContainer`: A wrapper for the hostlists. Each hostlist is represented by a `HostColor`, -/// which can be Grey, White, Gold or Black. Exposes a common interface for hostlist queries and -/// utilities. -/// -/// `HostColor`: White hosts have been seen recently. Gold hosts we have been able to establish -/// a connection to. Grey hosts are recently received hosts that are periodically refreshed -/// using the greylist refinery. Black hosts are considered hostile and are strictly avoided -/// for the duration of the program. Dark hosts are hosts that do not match our transports, but -/// that we continue to share with other peers. They are otherwise ignored. -/// -/// `HostState`: a set of mutually exclusive states that can be Insert, Refine, Connect, Suspend -/// or Connected. The state is `None` when the corresponding host has been removed from the -/// HostRegistry. -pub mod store; diff --git a/src/net/hosts/refinery.rs b/src/net/hosts/refinery.rs deleted file mode 100644 index 01e08d8cc..000000000 --- a/src/net/hosts/refinery.rs +++ /dev/null @@ -1,244 +0,0 @@ -/* This file is part of DarkFi (https://dark.fi) - * - * Copyright (C) 2020-2024 Dyne.org foundation - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -use std::{ - sync::Arc, - time::{Duration, Instant, UNIX_EPOCH}, -}; - -use log::{debug, warn}; -use url::Url; - -use super::{ - super::p2p::{P2p, P2pPtr}, - store::HostColor, -}; -use crate::{ - net::{ - connector::Connector, hosts::store::HostState, protocol::ProtocolVersion, session::Session, - }, - system::{ - run_until_completion, sleep, timeout::timeout, LazyWeak, StoppableTask, StoppableTaskPtr, - }, - Error, -}; - -pub type GreylistRefineryPtr = Arc; - -/// Probe random peers on the greylist. If a peer is responsive, update the last_seen field and -/// add it to the whitelist. If a node does not respond, remove it from the greylist. -/// Called periodically. -pub struct GreylistRefinery { - /// Weak pointer to parent p2p object - pub(in crate::net) p2p: LazyWeak, - process: StoppableTaskPtr, -} - -impl GreylistRefinery { - pub fn new() -> Arc { - Arc::new(Self { p2p: LazyWeak::new(), process: StoppableTask::new() }) - } - - pub async fn start(self: Arc) { - match self.p2p().hosts().container.load_all(&self.p2p().settings().hostlist).await { - Ok(()) => { - debug!(target: "net::refinery::start()", "Load hosts successful!"); - } - Err(e) => { - warn!(target: "net::refinery::start()", "Error loading hosts {}", e); - } - } - let ex = self.p2p().executor(); - self.process.clone().start( - async move { - self.run().await; - unreachable!(); - }, - // Ignore stop handler - |_| async {}, - Error::NetworkServiceStopped, - ex, - ); - } - - pub async fn stop(self: Arc) { - self.process.stop().await; - - match self.p2p().hosts().container.save_all(&self.p2p().settings().hostlist).await { - Ok(()) => { - debug!(target: "net::refinery::stop()", "Save hosts successful!"); - } - Err(e) => { - warn!(target: "net::refinery::stop()", "Error saving hosts {}", e); - } - } - } - - // Randomly select a peer on the greylist and probe it. This method will remove from the - // greylist and store on the whitelist providing the peer is responsive. - async fn run(self: Arc) { - let settings = self.p2p().settings(); - let hosts = self.p2p().hosts(); - loop { - sleep(settings.greylist_refinery_interval).await; - - if hosts.container.is_empty(HostColor::Grey).await { - debug!(target: "net::refinery", - "Greylist is empty! Cannot start refinery process"); - - continue - } - - // Pause the refinery if we've had zero connections for longer than the configured - // limit. - let offline_limit = Duration::from_secs(settings.time_with_no_connections); - let offline_timer = Instant::now().duration_since(*hosts.last_connection.read().await); - - if hosts.channels().await.is_empty() && offline_timer >= offline_limit { - warn!(target: "net::refinery", "No connections for {}s. Refinery paused.", - offline_timer.as_secs()); - - // It is neccessary to clear suspended hosts at this point, otherwise these - // hosts cannot be connected to in Outbound Session. Failure to do this could - // result in the refinery being paused forver (since connections could never be - // made). - let suspended_hosts = hosts.suspended().await; - for host in suspended_hosts { - hosts.unregister(&host).await; - } - - continue - } - - // Only attempt to refine peers that match our transports. - match hosts - .container - .fetch_random_with_schemes(HostColor::Grey, &settings.allowed_transports) - .await - { - Some((entry, position)) => { - let url = &entry.0; - - if let Err(e) = hosts.try_register(url.clone(), HostState::Refine).await { - debug!(target: "net::refinery", "Unable to refine addr={}, err={}", - url.clone(), e); - continue - } - - if !ping_node(url.clone(), self.p2p().clone()).await { - hosts.container.remove(HostColor::Grey, url, position).await; - - debug!( - target: "net::refinery", - "Peer {} is non-responsive. Removed from greylist", url, - ); - - // Remove this entry from HostRegistry to avoid this host getting - // stuck in the Refining state. - // - // It is not necessary to call this when the refinery passes, since the - // state will be changed to Connected. - hosts.unregister(url).await; - - continue - } - - let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - - // Add to the whitelist and remove from the greylist. - hosts.move_host(url, last_seen, HostColor::White, None).await.unwrap(); - hosts.unregister(url).await; - } - None => { - debug!(target: "net::refinery", "No matching greylist entries found. Cannot proceed with refinery"); - - continue - } - } - } - } - - fn p2p(&self) -> P2pPtr { - self.p2p.upgrade() - } -} - -/// Check a node is online by establishing a channel with it and conducting a handshake with a -/// version exchange. -/// -/// We must use run_until_completion() to ensure this code will complete even if the parent task -/// has been destroyed. Otherwise ping_node() will become a zombie process if the rest of the p2p -/// network has been shutdown but the handshake it still ongoing. -/// -/// Other parts of the p2p stack have safe shutdown methods built into them due to the ownership -/// structure. Here we are creating a outbound session that is not owned by anything and is not -/// so is not safely cancelled on shutdown. -pub async fn ping_node(addr: Url, p2p: P2pPtr) -> bool { - let ex = p2p.executor(); - run_until_completion(ping_node_impl(addr.clone(), p2p), ex).await -} - -async fn ping_node_impl(addr: Url, p2p: P2pPtr) -> bool { - let session_outbound = p2p.session_outbound(); - let parent = Arc::downgrade(&session_outbound); - let connector = Connector::new(p2p.settings(), parent); - - debug!(target: "net::refinery::ping_node()", "Attempting to connect to {}", addr); - match connector.connect(&addr).await { - Ok((url, channel)) => { - debug!(target: "net::refinery::ping_node()", "Successfully created a channel with {}", url); - // First initialize the version protocol and its Version, Verack subscribers. - let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await; - - debug!(target: "net::refinery::ping_node()", "Performing handshake protocols with {}", url); - // Then run the version exchange, store the channel and subscribe to a stop signal. - let handshake_task = session_outbound.perform_handshake_protocols( - proto_ver, - channel.clone(), - p2p.executor(), - ); - - debug!(target: "net::refinery::ping_node()", "Starting channel {}", url); - channel.clone().start(p2p.executor()); - - // Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if - // the handshake does not finish channel.stop() will never get called, resulting in - // zombie processes. - let result = timeout(Duration::from_secs(5), handshake_task).await; - - debug!(target: "net::refinery::ping_node()", "Stopping channel {}", url); - channel.stop().await; - - match result { - Ok(_) => { - debug!(target: "net::refinery::ping_node()", "Handshake success!"); - true - } - Err(e) => { - debug!(target: "net::refinery::ping_node()", "Handshake err: {}", e); - false - } - } - } - - Err(e) => { - debug!(target: "net::refinery::ping_node()", "Failed to connect to {}, ({})", addr, e); - false - } - } -} diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 167d016c8..fe5c986aa 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -29,15 +29,12 @@ use url::Url; use super::{ channel::ChannelPtr, dnet::DnetEvent, - hosts::{ - refinery::{GreylistRefinery, GreylistRefineryPtr}, - store::{Hosts, HostsPtr}, - }, + hosts::{Hosts, HostsPtr}, message::Message, protocol::{protocol_registry::ProtocolRegistry, register_default_protocols}, session::{ InboundSession, InboundSessionPtr, ManualSession, ManualSessionPtr, OutboundSession, - OutboundSessionPtr, SeedSyncSession, + OutboundSessionPtr, RefineSession, RefineSessionPtr, SeedSyncSession, }, settings::{Settings, SettingsPtr}, }; @@ -72,14 +69,13 @@ pub struct P2p { session_inbound: InboundSessionPtr, /// Reference to configured [`OutboundSession`] session_outbound: OutboundSessionPtr, + /// Reference to configured [`RefineSession`] + session_refine: RefineSessionPtr, /// Enable network debugging pub dnet_enabled: Mutex, /// The subscriber for which we can give dnet info over dnet_subscriber: SubscriberPtr, - - /// Greylist refinery process - greylist_refinery: Arc, } impl P2p { @@ -104,18 +100,17 @@ impl P2p { session_manual: ManualSession::new(), session_inbound: InboundSession::new(), session_outbound: OutboundSession::new(), + session_refine: RefineSession::new(), dnet_enabled: Mutex::new(false), dnet_subscriber: Subscriber::new(), - - greylist_refinery: GreylistRefinery::new(), }); self_.session_manual.p2p.init(self_.clone()); self_.session_inbound.p2p.init(self_.clone()); self_.session_outbound.p2p.init(self_.clone()); + self_.session_refine.p2p.init(self_.clone()); - self_.greylist_refinery.p2p.init(self_.clone()); register_default_protocols(self_.clone()).await; self_ @@ -138,12 +133,12 @@ impl P2p { return Err(err) } - info!(target: "net::p2p::start()", "Starting greylist refinery process"); - self.greylist_refinery.clone().start().await; - // Start the outbound session self.session_outbound().start().await; + // Start the refine session + self.session_refine().start().await; + info!(target: "net::p2p::start()", "[P2P] P2P subsystem started"); Ok(()) } @@ -168,9 +163,7 @@ impl P2p { self.session_manual().stop().await; self.session_inbound().stop().await; self.session_outbound().stop().await; - - // Stop greylist refinery process - self.greylist_refinery().stop().await; + self.session_refine().stop().await; } /// Broadcasts a message concurrently across all active channels. @@ -255,9 +248,9 @@ impl P2p { self.session_outbound.clone() } - /// Get pointer to greylist refinery - pub fn greylist_refinery(&self) -> GreylistRefineryPtr { - self.greylist_refinery.clone() + /// Get pointer to refine session + pub fn session_refine(&self) -> RefineSessionPtr { + self.session_refine.clone() } /// Enable network debugging diff --git a/src/net/protocol/protocol_address.rs b/src/net/protocol/protocol_address.rs index d3b55f727..a078c5b97 100644 --- a/src/net/protocol/protocol_address.rs +++ b/src/net/protocol/protocol_address.rs @@ -25,7 +25,7 @@ use smol::Executor; use super::{ super::{ channel::ChannelPtr, - hosts::store::{HostColor, HostsPtr}, + hosts::{HostColor, HostsPtr}, message::{AddrsMessage, GetAddrsMessage}, message_subscriber::MessageSubscription, p2p::P2pPtr, @@ -237,7 +237,7 @@ impl ProtocolAddress { } /// Send our own external addresses over a channel. Get the latest - /// last_seen field from InboundSession, and send it along with our + /// last_seen field from RefineSession, and send it along with our /// external address. /// /// If our external address is misconfigured, send an empty vector. @@ -264,8 +264,8 @@ impl ProtocolAddress { } let mut addrs = vec![]; - let inbound = self.p2p.session_inbound(); - for (addr, last_seen) in inbound.ping_self.addrs.lock().await.iter() { + let refinery = self.p2p.session_refine(); + for (addr, last_seen) in refinery.self_handshake.addrs.lock().await.iter() { addrs.push((addr.clone(), *last_seen)); } diff --git a/src/net/protocol/protocol_seed.rs b/src/net/protocol/protocol_seed.rs index 4d420bfd1..50e6048f9 100644 --- a/src/net/protocol/protocol_seed.rs +++ b/src/net/protocol/protocol_seed.rs @@ -25,7 +25,7 @@ use smol::Executor; use super::{ super::{ channel::ChannelPtr, - hosts::store::{HostColor, HostsPtr}, + hosts::{HostColor, HostsPtr}, message::{AddrsMessage, GetAddrsMessage}, message_subscriber::MessageSubscription, p2p::P2pPtr, @@ -78,8 +78,8 @@ impl ProtocolSeed { } let mut addrs = vec![]; - let inbound = self.p2p.session_inbound(); - for (addr, last_seen) in inbound.ping_self.addrs.lock().await.iter() { + let refinery = self.p2p.session_refine(); + for (addr, last_seen) in refinery.self_handshake.addrs.lock().await.iter() { addrs.push((addr.clone(), *last_seen)); } diff --git a/src/net/session/inbound_session.rs b/src/net/session/inbound_session.rs index b9629ac54..bea2d08f7 100644 --- a/src/net/session/inbound_session.rs +++ b/src/net/session/inbound_session.rs @@ -23,10 +23,10 @@ //! an acceptor pointer, and a stoppable task pointer. Using a weak pointer //! to P2P allows us to avoid circular dependencies. -use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH}; +use std::sync::Arc; use async_trait::async_trait; -use log::{debug, error, info, warn}; +use log::{debug, error, info}; use smol::{lock::Mutex, Executor}; use url::Url; @@ -35,13 +35,12 @@ use super::{ acceptor::{Acceptor, AcceptorPtr}, channel::ChannelPtr, dnet::{self, dnetev, DnetEvent}, - hosts::refinery::ping_node, p2p::{P2p, P2pPtr}, }, Session, SessionBitFlag, SESSION_INBOUND, }; use crate::{ - system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr, Subscription}, + system::{LazyWeak, StoppableTask, StoppableTaskPtr, Subscription}, Error, Result, }; @@ -52,21 +51,16 @@ pub struct InboundSession { pub(in crate::net) p2p: LazyWeak, acceptors: Mutex>, accept_tasks: Mutex>, - /// Task that periodically checks our external addresses. - pub(in crate::net) ping_self: Arc, } impl InboundSession { /// Create a new inbound session pub fn new() -> InboundSessionPtr { - let self_ = Arc::new(Self { + Arc::new(Self { p2p: LazyWeak::new(), acceptors: Mutex::new(Vec::new()), accept_tasks: Mutex::new(Vec::new()), - ping_self: PingSelfProcess::new(), - }); - self_.ping_self.session.init(self_.clone()); - self_ + }) } /// Starts the inbound session. Begins by accepting connections and fails @@ -112,9 +106,6 @@ impl InboundSession { .await?; } - debug!(target: "net::inbound_session", "Starting ping_self process"); - self.ping_self.clone().start().await; - Ok(()) } @@ -134,9 +125,6 @@ impl InboundSession { for accept_task in accept_tasks { accept_task.stop().await; } - - debug!(target: "net::inbound_session", "Stopping ping_self process"); - self.ping_self.clone().stop().await; } /// Start accepting connections for inbound session. @@ -225,105 +213,3 @@ impl Session for InboundSession { SESSION_INBOUND } } - -/// Periodically try to do a version exchange with our own external -/// addresses. If the version exchange is successful, take a timestamp and -/// save it along with the external addresses. Each address along with its -/// timestamp (the `last_seen` data field) is sent in to other nodes in -/// ProtocolAddr and ProtocolSeed. -/// -/// On first run, PingSelfProcess will immediately conduct a version exchange -/// with our external addresses, and if successful update the last_seen -/// field. The process will wait [TODO: ping_self_interval) before retrying. -/// -/// There are two situations in which this can fail: -/// -/// 1. If our external address is misconfigured -/// 2. If we have reached our inbound connection limit. -/// -/// If our external address is misconfigured, doing a version exchange -/// with ourselves will not work and so the external addresses will not -/// be shared with other nodes. -/// -/// If we have reached our inbound connection limit, the external address -/// will continue to be broadcast with an older `last_seen` (from before -/// our inbound connection was reached). -pub struct PingSelfProcess { - process: StoppableTaskPtr, - session: LazyWeak, - pub(in crate::net) addrs: Mutex>, -} - -impl PingSelfProcess { - fn new() -> Arc { - Arc::new(Self { - process: StoppableTask::new(), - session: LazyWeak::new(), - addrs: Mutex::new(HashMap::new()), - }) - } - - async fn start(self: Arc) { - let ex = self.session().p2p().executor(); - self.process.clone().start( - async move { - self.run().await; - unreachable!(); - }, - // Ignore stop handler - |_| async {}, - Error::NetworkServiceStopped, - ex, - ); - } - - async fn stop(self: Arc) { - self.process.stop().await - } - - async fn run(self: Arc) { - let external_addrs = self.session().p2p().settings().external_addrs.clone(); - let mut current_attempt = 0; - - loop { - if current_attempt >= 1 { - // TODO: make this a configurable interval - sleep(600).await; - } - - // Only proceed if the external address is configured. - if external_addrs.is_empty() { - current_attempt += 1; - continue - } - - for addr in external_addrs.iter() { - debug!(target: "net::inbound_session::ping_self", - "Attempting a version exchange addr={}", addr); - - if ping_node(addr.clone(), self.session().p2p()).await { - debug!(target: "net::inbound_session::ping_self", - "Version exchange successful! Updating last seen addr={}", addr); - let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - let mut addrs = self.addrs.lock().await; - - if addrs.contains_key(addr) { - let val = addrs.get_mut(addr).unwrap(); - *val = last_seen; - } - addrs.insert(addr.clone(), last_seen); - } else { - // Either our external addr is invalid or our max inbound - // connection count has been reached. - warn!(target: "net::inbound_session::ping_self", - "Version exchange failed! addr={}", addr); - } - } - current_attempt += 1; - } - } - - fn session(&self) -> InboundSessionPtr { - self.session.upgrade() - } -} diff --git a/src/net/session/manual_session.rs b/src/net/session/manual_session.rs index 10fffb524..ef0b997c9 100644 --- a/src/net/session/manual_session.rs +++ b/src/net/session/manual_session.rs @@ -29,7 +29,7 @@ //! and insures that no other part of the program uses the slots at the //! same time. -use std::{sync::Arc, time::UNIX_EPOCH}; +use std::sync::Arc; use async_trait::async_trait; use log::{debug, error, info, warn}; @@ -44,7 +44,7 @@ use super::{ Session, SessionBitFlag, SESSION_MANUAL, }; use crate::{ - net::hosts::store::{HostColor, HostState}, + net::hosts::HostState, system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr}, Error, Result, }; @@ -134,14 +134,6 @@ impl ManualSession { // Register the new channel self.register_channel(channel.clone(), ex.clone()).await?; - let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - - // Add this connection to the anchorlist - self.p2p() - .hosts() - .move_host(&addr, last_seen, HostColor::Gold, Some(channel.clone())) - .await?; - // Wait for channel to close stop_sub.receive().await; diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 34d8287bf..1288e8c64 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use log::debug; use smol::Executor; -use super::{channel::ChannelPtr, hosts::store::HostColor, p2p::P2pPtr, protocol::ProtocolVersion}; +use super::{channel::ChannelPtr, p2p::P2pPtr, protocol::ProtocolVersion}; use crate::Result; pub mod inbound_session; @@ -33,6 +33,8 @@ pub mod outbound_session; pub use outbound_session::{OutboundSession, OutboundSessionPtr}; pub mod seedsync_session; pub use seedsync_session::{SeedSyncSession, SeedSyncSessionPtr}; +pub mod refine_session; +pub use refine_session::{RefineSession, RefineSessionPtr}; /// Bitwise selectors for the `protocol_registry` pub type SessionBitFlag = u32; @@ -40,13 +42,14 @@ pub const SESSION_INBOUND: SessionBitFlag = 0b0001; pub const SESSION_OUTBOUND: SessionBitFlag = 0b0010; pub const SESSION_MANUAL: SessionBitFlag = 0b0100; pub const SESSION_SEED: SessionBitFlag = 0b1000; +pub const SESSION_REFINE: SessionBitFlag = 0b0000; pub const SESSION_ALL: SessionBitFlag = 0b1111; pub type SessionWeakPtr = Weak; /// Removes channel from the list of connected channels when a stop signal /// is received. -pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: SessionBitFlag) { +pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, _type_id: SessionBitFlag) { debug!(target: "net::session::remove_sub_on_stop()", "[START]"); let addr = channel.address(); @@ -62,17 +65,16 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: Sessi "Received stop event. Removing channel {}", addr, ); - if let SESSION_OUTBOUND | SESSION_MANUAL = type_id { - debug!( - target: "net::session::remove_sub_on_stop()", - "Downgrading {}", addr, - ); + // TODO: downgrade to greylist if outbound or manual session. + //if let SESSION_OUTBOUND | SESSION_MANUAL = type_id { + // debug!( + // target: "net::session::remove_sub_on_stop()", + // "Downgrading {}", addr, + // ); - if !p2p.hosts().is_connection_to_self(addr).await { - let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap(); - p2p.hosts().move_host(addr, last_seen, HostColor::Grey, None).await.unwrap(); - } - } + // let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap(); + // p2p.hosts().move_host(addr, last_seen, HostColor::Grey, None).await.unwrap(); + //} // Remove channel from p2p p2p.hosts().unregister(channel.address()).await; @@ -158,6 +160,11 @@ pub trait Session: Sync { // Perform handshake protocol_version.run(executor.clone()).await?; + // TODO: Upgrade to goldlist if outbound or manual session. + //if let SESSION_OUTBOUND | SESSION_MANUAL = type_id { + // //... + //} + // Attempt to add channel to registry self.p2p().hosts().register_channel(channel.clone()).await; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index cfe75d34c..fb0f533ce 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -44,7 +44,7 @@ use super::{ channel::ChannelPtr, connector::Connector, dnet::{self, dnetev, DnetEvent}, - hosts::store::{HostColor, HostState}, + hosts::{HostColor, HostState}, message::GetAddrsMessage, p2p::{P2p, P2pPtr}, }, @@ -99,6 +99,7 @@ impl OutboundSession { /// Stops the outbound session. pub(crate) async fn stop(&self) { + debug!(target: "net::outbound_session", "Stopping outbound session"); let slots = &*self.slots.lock().await; for slot in slots { @@ -399,12 +400,6 @@ impl Slot { self.channel_id.store(channel.info.id, Ordering::Relaxed); - // Add this connection to the anchorlist - hosts - .move_host(&addr, last_seen, HostColor::Gold, Some(channel.clone())) - .await - .unwrap(); - // Wait for channel to close stop_sub.receive().await; diff --git a/src/net/session/refine_session.rs b/src/net/session/refine_session.rs new file mode 100644 index 000000000..991d32b3e --- /dev/null +++ b/src/net/session/refine_session.rs @@ -0,0 +1,401 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2024 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//! TODO: doc + +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant, UNIX_EPOCH}, +}; + +use async_trait::async_trait; +use log::{debug, warn}; +use smol::lock::Mutex; +use url::Url; + +use super::super::p2p::{P2p, P2pPtr}; + +use crate::{ + net::{ + connector::Connector, + hosts::{HostColor, HostState}, + protocol::ProtocolVersion, + session::{Session, SessionBitFlag, SESSION_REFINE}, + }, + system::{sleep, timeout::timeout, LazyWeak, StoppableTask, StoppableTaskPtr}, + Error, +}; + +pub type RefineSessionPtr = Arc; + +pub struct RefineSession { + /// Weak pointer to parent p2p object + pub(in crate::net) p2p: LazyWeak, + + /// Task that periodically checks entries in the greylist. + pub(in crate::net) refinery: Arc, + + /// Task that periodically checks our external addresses. + pub(in crate::net) self_handshake: Arc, +} + +impl RefineSession { + pub fn new() -> RefineSessionPtr { + let self_ = Arc::new(Self { + p2p: LazyWeak::new(), + refinery: GreylistRefinery::new(), + self_handshake: SelfHandshake::new(), + }); + self_.self_handshake.session.init(self_.clone()); + self_.refinery.session.init(self_.clone()); + self_ + } + + pub(crate) async fn start(self: Arc) { + debug!(target: "net::refine_session", "Starting greylist refinery process"); + self.refinery.clone().start().await; + + debug!(target: "net::refine_session", "Starting self handshake process"); + self.self_handshake.clone().start().await; + } + + pub(crate) async fn stop(&self) { + debug!(target: "net::refine_session", "Stopping refinery process"); + self.refinery.clone().stop().await; + + debug!(target: "net::refine_session", "Stopping self handshake process"); + self.self_handshake.clone().stop().await; + } + + // TODO: doc and explain why it's public + pub async fn handshake_node(self: Arc, addr: Url, p2p: P2pPtr) -> bool { + let self_ = Arc::downgrade(&self); + let connector = Connector::new(self.p2p().settings(), self_); + + debug!(target: "net::refinery::handshake_node()", "Attempting to connect to {}", addr); + match connector.connect(&addr).await { + Ok((url, channel)) => { + debug!(target: "net::refinery::handshake_node()", "Successfully created a channel with {}", url); + // First initialize the version protocol and its Version, Verack subscribers. + let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await; + + debug!(target: "net::refinery::handshake_node()", "Performing handshake protocols with {}", url); + // Then run the version exchange, store the channel and subscribe to a stop signal. + let handshake_task = + self.perform_handshake_protocols(proto_ver, channel.clone(), p2p.executor()); + + debug!(target: "net::refinery::handshake_node()", "Starting channel {}", url); + channel.clone().start(p2p.executor()); + + // Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if + // the handshake does not finish channel.stop() will never get called, resulting in + // zombie processes. + let result = timeout(Duration::from_secs(5), handshake_task).await; + + debug!(target: "net::refinery::handshake_node()", "Stopping channel {}", url); + channel.stop().await; + + match result { + Ok(_) => { + debug!(target: "net::refinery::handshake_node()", "Handshake success!"); + true + } + Err(e) => { + debug!(target: "net::refinery::handshake_node()", "Handshake err: {}", e); + false + } + } + } + + Err(e) => { + debug!(target: "net::refinery::handshake_node()", "Failed to connect to {}, ({})", addr, e); + false + } + } + } +} + +#[async_trait] +impl Session for RefineSession { + fn p2p(&self) -> P2pPtr { + self.p2p.upgrade() + } + + fn type_id(&self) -> SessionBitFlag { + SESSION_REFINE + } +} + +/// Periodically probes entries in the greylist. +/// +/// Randomly selects a greylist entry and tries to establish a local +/// connection to it using the method handshake_node(), which creates a +/// channel and does a version exchange using `perform_handshake_protocols()`. +/// +/// If successful, the entry is removed from the greylist and added to the +/// whitelist with an updated last_seen timestamp. If non-successful, the +/// entry is removed from the greylist. +pub struct GreylistRefinery { + /// Weak pointer to parent object + session: LazyWeak, + process: StoppableTaskPtr, +} + +impl GreylistRefinery { + pub fn new() -> Arc { + Arc::new(Self { session: LazyWeak::new(), process: StoppableTask::new() }) + } + + pub async fn start(self: Arc) { + match self.p2p().hosts().container.load_all(&self.p2p().settings().hostlist).await { + Ok(()) => { + debug!(target: "net::refinery::start()", "Load hosts successful!"); + } + Err(e) => { + warn!(target: "net::refinery::start()", "Error loading hosts {}", e); + } + } + let ex = self.p2p().executor(); + self.process.clone().start( + async move { + self.run().await; + unreachable!(); + }, + // Ignore stop handler + |_| async {}, + Error::NetworkServiceStopped, + ex, + ); + } + + pub async fn stop(self: Arc) { + debug!(target: "net::refinery", "Stopping refinery"); + self.process.stop().await; + + match self.p2p().hosts().container.save_all(&self.p2p().settings().hostlist).await { + Ok(()) => { + debug!(target: "net::refinery::stop()", "Save hosts successful!"); + } + Err(e) => { + warn!(target: "net::refinery::stop()", "Error saving hosts {}", e); + } + } + } + + // Randomly select a peer on the greylist and probe it. This method will remove from the + // greylist and store on the whitelist providing the peer is responsive. + async fn run(self: Arc) { + let settings = self.p2p().settings(); + let hosts = self.p2p().hosts(); + loop { + sleep(settings.greylist_refinery_interval).await; + + if hosts.container.is_empty(HostColor::Grey).await { + debug!(target: "net::refinery", + "Greylist is empty! Cannot start refinery process"); + + continue + } + + // Pause the refinery if we've had zero connections for longer than the configured + // limit. + let offline_limit = Duration::from_secs(settings.time_with_no_connections); + let offline_timer = Instant::now().duration_since(*hosts.last_connection.read().await); + + if hosts.channels().await.is_empty() && offline_timer >= offline_limit { + warn!(target: "net::refinery", "No connections for {}s. GreylistRefinery paused.", + offline_timer.as_secs()); + + // It is neccessary to clear suspended hosts at this point, otherwise these + // hosts cannot be connected to in Outbound Session. Failure to do this could + // result in the refinery being paused forver (since connections could never be + // made). + let suspended_hosts = hosts.suspended().await; + for host in suspended_hosts { + hosts.unregister(&host).await; + } + + continue + } + + // Only attempt to refine peers that match our transports. + match hosts + .container + .fetch_random_with_schemes(HostColor::Grey, &settings.allowed_transports) + .await + { + Some((entry, position)) => { + let url = &entry.0; + + if let Err(e) = hosts.try_register(url.clone(), HostState::Refine).await { + debug!(target: "net::refinery", "Unable to refine addr={}, err={}", + url.clone(), e); + continue + } + + if !self.session().handshake_node(url.clone(), self.p2p().clone()).await { + hosts.container.remove(HostColor::Grey, url, position).await; + + debug!( + target: "net::refinery", + "Peer {} is non-responsive. Removed from greylist", url, + ); + + // Remove this entry from HostRegistry to avoid this host getting + // stuck in the Refining state. + // + // It is not necessary to call this when the refinery passes, since the + // state will be changed to Connected. + hosts.unregister(url).await; + + continue + } + + debug!( + target: "net::refinery", + "Peer {} is responsive. Adding to whitelist", url, + ); + let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); + + // Add to the whitelist and remove from the greylist. + hosts.move_host(url, last_seen, HostColor::White, None).await.unwrap(); + hosts.unregister(url).await; + + debug!(target: "net::refinery", "GreylistRefinery complete!"); + continue + } + None => { + debug!(target: "net::refinery", "No matching greylist entries found. Cannot proceed with refinery"); + + continue + } + } + } + } + + fn session(&self) -> RefineSessionPtr { + self.session.upgrade() + } + + fn p2p(&self) -> P2pPtr { + self.session().p2p() + } +} + +/// Periodically try to do a version exchange with our own external +/// addresses. If the version exchange is successful, take a timestamp and +/// save it along with the external addresses. Each address along with its +/// timestamp (the `last_seen` data field) is sent in to other nodes in +/// ProtocolAddr and ProtocolSeed. +/// +/// On first run, SelfHandshake will immediately conduct a version exchange +/// with our external addresses, and if successful update the last_seen +/// field. The process will wait [TODO: self_handshake_interval) before retrying. +/// +/// There are two situations in which this can fail: +/// +/// 1. If our external address is misconfigured +/// 2. If we have reached our inbound connection limit. +/// +/// If our external address is misconfigured, doing a version exchange +/// with ourselves will not work and so the external addresses will not +/// be shared with other nodes. +/// +/// If we have reached our inbound connection limit, the external address +/// will continue to be broadcast with an older `last_seen` (from before +/// our inbound connection was reached). +pub struct SelfHandshake { + process: StoppableTaskPtr, + session: LazyWeak, + pub(in crate::net) addrs: Mutex>, +} + +impl SelfHandshake { + fn new() -> Arc { + Arc::new(Self { + process: StoppableTask::new(), + session: LazyWeak::new(), + addrs: Mutex::new(HashMap::new()), + }) + } + + async fn start(self: Arc) { + let ex = self.session().p2p().executor(); + self.process.clone().start( + async move { + self.run().await; + unreachable!(); + }, + // Ignore stop handler + |_| async {}, + Error::NetworkServiceStopped, + ex, + ); + } + + async fn stop(self: Arc) { + self.process.stop().await + } + + async fn run(self: Arc) { + let external_addrs = self.session().p2p().settings().external_addrs.clone(); + let mut current_attempt = 0; + + loop { + if current_attempt >= 1 { + // TODO: make this a configurable interval + sleep(600).await; + } + + // Only proceed if the external address is configured. + if external_addrs.is_empty() { + current_attempt += 1; + continue + } + + for addr in external_addrs.iter() { + debug!(target: "net::refine_session::self_handshake", + "Attempting a version exchange addr={}", addr); + + if self.session().handshake_node(addr.clone(), self.session().p2p()).await { + debug!(target: "net::refine_session::self_handshake", + "Version exchange successful! Updating last seen addr={}", addr); + let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); + let mut addrs = self.addrs.lock().await; + + if addrs.contains_key(addr) { + let val = addrs.get_mut(addr).unwrap(); + *val = last_seen; + } + addrs.insert(addr.clone(), last_seen); + } else { + // Either our external addr is invalid or our max inbound + // connection count has been reached. + warn!(target: "net::refine_session::self_handshake", + "Version exchange failed! addr={}", addr); + } + } + current_attempt += 1; + } + } + + fn session(&self) -> RefineSessionPtr { + self.session.upgrade() + } +} diff --git a/src/net/session/seedsync_session.rs b/src/net/session/seedsync_session.rs index 891de2009..68ae198f9 100644 --- a/src/net/session/seedsync_session.rs +++ b/src/net/session/seedsync_session.rs @@ -51,7 +51,7 @@ use url::Url; use super::{ super::{ connector::Connector, - hosts::store::HostColor, + hosts::HostColor, p2p::{P2p, P2pPtr}, }, Session, SessionBitFlag, SESSION_SEED, diff --git a/src/net/tests.rs b/src/net/tests.rs index 147c5ec4c..4b91e3dd5 100644 --- a/src/net/tests.rs +++ b/src/net/tests.rs @@ -26,7 +26,7 @@ use smol::{channel, future, Executor}; use url::Url; use crate::{ - net::{hosts::store::HostColor, P2p, Settings}, + net::{hosts::HostColor, P2p, Settings}, system::sleep, }; @@ -102,6 +102,7 @@ async fn hostlist_propagation(ex: Arc>) { inbound_addrs: vec![seed_addr.clone()], external_addrs: vec![seed_addr.clone()], outbound_connections: 0, + greylist_refinery_interval: 2, inbound_connections: usize::MAX, seeds: vec![], peers: vec![], @@ -146,7 +147,7 @@ async fn hostlist_propagation(ex: Arc>) { } info!("Waiting until all peers connect"); - sleep(10).await; + sleep(15).await; for p2p in p2p_instances.iter() { let hosts = p2p.hosts();