net: create RefineSession

* Move greylist refinery and ping self process into Session.
* Remove hosts/ submodule and return store to net/hosts.rs
* Temporarily disable Gold/ Grey list upgrade and downgrade (we will move into RefineSession)
This commit is contained in:
draoi
2024-04-01 11:52:26 +02:00
parent c794507458
commit 1cd330b798
17 changed files with 491 additions and 518 deletions

View File

@@ -39,11 +39,7 @@ use url::Url;
use darkfi::{
async_daemonize, cli_desc,
net::{
self,
hosts::{refinery::ping_node, store::HostColor},
P2p, P2pPtr,
},
net::{self, hosts::HostColor, P2p, P2pPtr},
rpc::{
jsonrpc::*,
server::{listen_and_serve, RequestHandler},
@@ -184,7 +180,7 @@ impl Lilith {
let url = &entry.0;
let last_seen = &entry.1;
if !ping_node(url.clone(), p2p.clone()).await {
if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
debug!(target: "lilith", "Host {} is not responsive. Downgrading from whitelist", url);
hosts.move_host(url, *last_seen, HostColor::Grey, None).await?;
hosts.unregister(url).await;

View File

@@ -30,7 +30,7 @@ use url::Url;
use super::{
channel::{Channel, ChannelPtr},
hosts::store::HostColor,
hosts::HostColor,
session::SessionWeakPtr,
transport::{Listener, PtListener},
};

View File

@@ -37,7 +37,7 @@ use url::Url;
use super::{
dnet::{self, dnetev, DnetEvent},
hosts::store::HostColor,
hosts::HostColor,
message,
message::Packet,
message_subscriber::{MessageSubscription, MessageSubsystem},

View File

@@ -23,7 +23,7 @@ use url::Url;
use super::{
channel::{Channel, ChannelPtr},
hosts::store::HostColor,
hosts::HostColor,
session::SessionWeakPtr,
settings::SettingsPtr,
transport::Dialer,

View File

@@ -23,7 +23,7 @@ use rand::{prelude::IteratorRandom, rngs::OsRng, Rng};
use smol::lock::RwLock;
use url::Url;
use super::super::{settings::SettingsPtr, ChannelPtr};
use super::{settings::SettingsPtr, ChannelPtr};
use crate::{
system::{Subscriber, SubscriberPtr, Subscription},
util::{
@@ -33,6 +33,29 @@ use crate::{
Error, Result,
};
/// The main interface for interacting with the hostlist. Contains the following:
///
/// `Hosts`: the main parent class that manages HostRegistry and HostContainer. It is also
/// responsible for filtering addresses before writing to the hostlist.
///
/// `HostRegistry`: A locked HashMap that maps peer addresses onto mutually exclusive
/// states (`HostState`). Prevents race conditions by dictating a strict flow of logically
/// acceptable states.
///
/// `HostContainer`: A wrapper for the hostlists. Each hostlist is represented by a `HostColor`,
/// which can be Grey, White, Gold or Black. Exposes a common interface for hostlist queries and
/// utilities.
///
/// `HostColor`: White hosts have been seen recently. Gold hosts we have been able to establish
/// a connection to. Grey hosts are recently received hosts that are periodically refreshed
/// using the greylist refinery. Black hosts are considered hostile and are strictly avoided
/// for the duration of the program. Dark hosts are hosts that do not match our transports, but
/// that we continue to share with other peers. They are otherwise ignored.
///
/// `HostState`: a set of mutually exclusive states that can be Insert, Refine, Connect, Suspend
/// or Connected. The state is `None` when the corresponding host has been removed from the
/// HostRegistry.
// An array containing all possible local host strings
// TODO: This could perhaps be more exhaustive?
pub const LOCAL_HOST_STRS: [&str; 2] = ["localhost", "localhost.localdomain"];
@@ -108,6 +131,7 @@ pub enum HostState {
Suspend,
/// Hosts that have been successfully connected to.
Connected(ChannelPtr),
/// Host that are moving between hostlists, implemented in
/// store::move_host(). Move takes a ChannelPtr so that Channels that
/// are being promoted to the Gold list can be re-inserted into the
@@ -838,6 +862,9 @@ impl Hosts {
pub async fn try_register(&self, addr: Url, new_state: HostState) -> Result<HostState> {
let mut registry = self.registry.write().await;
debug!(target: "net::hosts::try_update_registry()", "Try register addr={}, state={}",
addr, &new_state);
if registry.contains_key(&addr) {
let current_state = registry.get(&addr).unwrap().clone();
@@ -854,6 +881,8 @@ impl Hosts {
registry.insert(addr.clone(), state.clone());
}
debug!(target: "net::hosts::try_update_registry()", "Returning result {:?}", result);
result
} else {
// We don't know this peer. We can safely update the state.
@@ -904,6 +933,7 @@ impl Hosts {
pub async fn unregister(&self, addr: &Url) {
debug!(target: "net::hosts::unregister()", "Removing {} from HostRegistry", addr);
self.registry.write().await.remove(addr);
debug!(target: "net::hosts::unregister()", "Removed {} from HostRegistry", addr);
}
/// Returns the list of connected channels.
@@ -1218,37 +1248,10 @@ impl Hosts {
#[cfg(test)]
mod tests {
use smol::Executor;
use std::time::UNIX_EPOCH;
use super::{
super::super::{settings::Settings, P2p},
*,
};
use crate::{net::hosts::refinery::ping_node, system::sleep};
#[test]
fn test_ping_node() {
smol::block_on(async {
let settings = Settings {
localnet: false,
external_addrs: vec![
Url::parse("tcp://foo.bar:123").unwrap(),
Url::parse("tcp://lol.cat:321").unwrap(),
],
..Default::default()
};
let ex = Arc::new(Executor::new());
let p2p = P2p::new(settings, ex.clone()).await;
let url = Url::parse("tcp://xeno.systems.wtf").unwrap();
println!("Pinging node...");
let task = ex.spawn(ping_node(url.clone(), p2p));
ex.run(task).await;
println!("Ping node complete!");
});
}
use super::{super::settings::Settings, *};
use crate::system::sleep;
#[test]
fn test_is_local_host() {

View File

@@ -1,57 +0,0 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2024 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
/// Periodically probes entries in the greylist.
///
/// Randomly selects a greylist entry and tries to
/// establish a local connection to it using the method probe_node(), which creates
/// a channel and does a version exchange using `perform_handshake_protocols()`.
///
/// If successful, the entry is removed from the greylist and added to the whitelist
/// with an updated last_seen timestamp. If non-successful, the entry is removed from
/// the greylist.
///
/// The method `probe_node()` is also used by `ProtocolSeed` and `ProtocolAddress`.
/// We try to establish local connections to our own external addresses using
/// `probe_node()` to ensure the address is valid before propagating in `ProtocolSeed`
/// and `ProtocolAddress`.
pub mod refinery;
/// The main interface for interacting with the hostlist. Contains the following:
///
/// `Hosts`: the main parent class that manages HostRegistry and HostContainer. It is also
/// responsible for filtering addresses before writing to the hostlist.
///
/// `HostRegistry`: A locked HashMap that maps peer addresses onto mutually exclusive
/// states (`HostState`). Prevents race conditions by dictating a strict flow of logically
/// acceptable states.
///
/// `HostContainer`: A wrapper for the hostlists. Each hostlist is represented by a `HostColor`,
/// which can be Grey, White, Gold or Black. Exposes a common interface for hostlist queries and
/// utilities.
///
/// `HostColor`: White hosts have been seen recently. Gold hosts we have been able to establish
/// a connection to. Grey hosts are recently received hosts that are periodically refreshed
/// using the greylist refinery. Black hosts are considered hostile and are strictly avoided
/// for the duration of the program. Dark hosts are hosts that do not match our transports, but
/// that we continue to share with other peers. They are otherwise ignored.
///
/// `HostState`: a set of mutually exclusive states that can be Insert, Refine, Connect, Suspend
/// or Connected. The state is `None` when the corresponding host has been removed from the
/// HostRegistry.
pub mod store;

View File

@@ -1,244 +0,0 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2024 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{
sync::Arc,
time::{Duration, Instant, UNIX_EPOCH},
};
use log::{debug, warn};
use url::Url;
use super::{
super::p2p::{P2p, P2pPtr},
store::HostColor,
};
use crate::{
net::{
connector::Connector, hosts::store::HostState, protocol::ProtocolVersion, session::Session,
},
system::{
run_until_completion, sleep, timeout::timeout, LazyWeak, StoppableTask, StoppableTaskPtr,
},
Error,
};
pub type GreylistRefineryPtr = Arc<GreylistRefinery>;
/// Probe random peers on the greylist. If a peer is responsive, update the last_seen field and
/// add it to the whitelist. If a node does not respond, remove it from the greylist.
/// Called periodically.
pub struct GreylistRefinery {
/// Weak pointer to parent p2p object
pub(in crate::net) p2p: LazyWeak<P2p>,
process: StoppableTaskPtr,
}
impl GreylistRefinery {
pub fn new() -> Arc<Self> {
Arc::new(Self { p2p: LazyWeak::new(), process: StoppableTask::new() })
}
pub async fn start(self: Arc<Self>) {
match self.p2p().hosts().container.load_all(&self.p2p().settings().hostlist).await {
Ok(()) => {
debug!(target: "net::refinery::start()", "Load hosts successful!");
}
Err(e) => {
warn!(target: "net::refinery::start()", "Error loading hosts {}", e);
}
}
let ex = self.p2p().executor();
self.process.clone().start(
async move {
self.run().await;
unreachable!();
},
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
ex,
);
}
pub async fn stop(self: Arc<Self>) {
self.process.stop().await;
match self.p2p().hosts().container.save_all(&self.p2p().settings().hostlist).await {
Ok(()) => {
debug!(target: "net::refinery::stop()", "Save hosts successful!");
}
Err(e) => {
warn!(target: "net::refinery::stop()", "Error saving hosts {}", e);
}
}
}
// Randomly select a peer on the greylist and probe it. This method will remove from the
// greylist and store on the whitelist providing the peer is responsive.
async fn run(self: Arc<Self>) {
let settings = self.p2p().settings();
let hosts = self.p2p().hosts();
loop {
sleep(settings.greylist_refinery_interval).await;
if hosts.container.is_empty(HostColor::Grey).await {
debug!(target: "net::refinery",
"Greylist is empty! Cannot start refinery process");
continue
}
// Pause the refinery if we've had zero connections for longer than the configured
// limit.
let offline_limit = Duration::from_secs(settings.time_with_no_connections);
let offline_timer = Instant::now().duration_since(*hosts.last_connection.read().await);
if hosts.channels().await.is_empty() && offline_timer >= offline_limit {
warn!(target: "net::refinery", "No connections for {}s. Refinery paused.",
offline_timer.as_secs());
// It is neccessary to clear suspended hosts at this point, otherwise these
// hosts cannot be connected to in Outbound Session. Failure to do this could
// result in the refinery being paused forver (since connections could never be
// made).
let suspended_hosts = hosts.suspended().await;
for host in suspended_hosts {
hosts.unregister(&host).await;
}
continue
}
// Only attempt to refine peers that match our transports.
match hosts
.container
.fetch_random_with_schemes(HostColor::Grey, &settings.allowed_transports)
.await
{
Some((entry, position)) => {
let url = &entry.0;
if let Err(e) = hosts.try_register(url.clone(), HostState::Refine).await {
debug!(target: "net::refinery", "Unable to refine addr={}, err={}",
url.clone(), e);
continue
}
if !ping_node(url.clone(), self.p2p().clone()).await {
hosts.container.remove(HostColor::Grey, url, position).await;
debug!(
target: "net::refinery",
"Peer {} is non-responsive. Removed from greylist", url,
);
// Remove this entry from HostRegistry to avoid this host getting
// stuck in the Refining state.
//
// It is not necessary to call this when the refinery passes, since the
// state will be changed to Connected.
hosts.unregister(url).await;
continue
}
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
// Add to the whitelist and remove from the greylist.
hosts.move_host(url, last_seen, HostColor::White, None).await.unwrap();
hosts.unregister(url).await;
}
None => {
debug!(target: "net::refinery", "No matching greylist entries found. Cannot proceed with refinery");
continue
}
}
}
}
fn p2p(&self) -> P2pPtr {
self.p2p.upgrade()
}
}
/// Check a node is online by establishing a channel with it and conducting a handshake with a
/// version exchange.
///
/// We must use run_until_completion() to ensure this code will complete even if the parent task
/// has been destroyed. Otherwise ping_node() will become a zombie process if the rest of the p2p
/// network has been shutdown but the handshake it still ongoing.
///
/// Other parts of the p2p stack have safe shutdown methods built into them due to the ownership
/// structure. Here we are creating a outbound session that is not owned by anything and is not
/// so is not safely cancelled on shutdown.
pub async fn ping_node(addr: Url, p2p: P2pPtr) -> bool {
let ex = p2p.executor();
run_until_completion(ping_node_impl(addr.clone(), p2p), ex).await
}
async fn ping_node_impl(addr: Url, p2p: P2pPtr) -> bool {
let session_outbound = p2p.session_outbound();
let parent = Arc::downgrade(&session_outbound);
let connector = Connector::new(p2p.settings(), parent);
debug!(target: "net::refinery::ping_node()", "Attempting to connect to {}", addr);
match connector.connect(&addr).await {
Ok((url, channel)) => {
debug!(target: "net::refinery::ping_node()", "Successfully created a channel with {}", url);
// First initialize the version protocol and its Version, Verack subscribers.
let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
debug!(target: "net::refinery::ping_node()", "Performing handshake protocols with {}", url);
// Then run the version exchange, store the channel and subscribe to a stop signal.
let handshake_task = session_outbound.perform_handshake_protocols(
proto_ver,
channel.clone(),
p2p.executor(),
);
debug!(target: "net::refinery::ping_node()", "Starting channel {}", url);
channel.clone().start(p2p.executor());
// Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if
// the handshake does not finish channel.stop() will never get called, resulting in
// zombie processes.
let result = timeout(Duration::from_secs(5), handshake_task).await;
debug!(target: "net::refinery::ping_node()", "Stopping channel {}", url);
channel.stop().await;
match result {
Ok(_) => {
debug!(target: "net::refinery::ping_node()", "Handshake success!");
true
}
Err(e) => {
debug!(target: "net::refinery::ping_node()", "Handshake err: {}", e);
false
}
}
}
Err(e) => {
debug!(target: "net::refinery::ping_node()", "Failed to connect to {}, ({})", addr, e);
false
}
}
}

View File

@@ -29,15 +29,12 @@ use url::Url;
use super::{
channel::ChannelPtr,
dnet::DnetEvent,
hosts::{
refinery::{GreylistRefinery, GreylistRefineryPtr},
store::{Hosts, HostsPtr},
},
hosts::{Hosts, HostsPtr},
message::Message,
protocol::{protocol_registry::ProtocolRegistry, register_default_protocols},
session::{
InboundSession, InboundSessionPtr, ManualSession, ManualSessionPtr, OutboundSession,
OutboundSessionPtr, SeedSyncSession,
OutboundSessionPtr, RefineSession, RefineSessionPtr, SeedSyncSession,
},
settings::{Settings, SettingsPtr},
};
@@ -72,14 +69,13 @@ pub struct P2p {
session_inbound: InboundSessionPtr,
/// Reference to configured [`OutboundSession`]
session_outbound: OutboundSessionPtr,
/// Reference to configured [`RefineSession`]
session_refine: RefineSessionPtr,
/// Enable network debugging
pub dnet_enabled: Mutex<bool>,
/// The subscriber for which we can give dnet info over
dnet_subscriber: SubscriberPtr<DnetEvent>,
/// Greylist refinery process
greylist_refinery: Arc<GreylistRefinery>,
}
impl P2p {
@@ -104,18 +100,17 @@ impl P2p {
session_manual: ManualSession::new(),
session_inbound: InboundSession::new(),
session_outbound: OutboundSession::new(),
session_refine: RefineSession::new(),
dnet_enabled: Mutex::new(false),
dnet_subscriber: Subscriber::new(),
greylist_refinery: GreylistRefinery::new(),
});
self_.session_manual.p2p.init(self_.clone());
self_.session_inbound.p2p.init(self_.clone());
self_.session_outbound.p2p.init(self_.clone());
self_.session_refine.p2p.init(self_.clone());
self_.greylist_refinery.p2p.init(self_.clone());
register_default_protocols(self_.clone()).await;
self_
@@ -138,12 +133,12 @@ impl P2p {
return Err(err)
}
info!(target: "net::p2p::start()", "Starting greylist refinery process");
self.greylist_refinery.clone().start().await;
// Start the outbound session
self.session_outbound().start().await;
// Start the refine session
self.session_refine().start().await;
info!(target: "net::p2p::start()", "[P2P] P2P subsystem started");
Ok(())
}
@@ -168,9 +163,7 @@ impl P2p {
self.session_manual().stop().await;
self.session_inbound().stop().await;
self.session_outbound().stop().await;
// Stop greylist refinery process
self.greylist_refinery().stop().await;
self.session_refine().stop().await;
}
/// Broadcasts a message concurrently across all active channels.
@@ -255,9 +248,9 @@ impl P2p {
self.session_outbound.clone()
}
/// Get pointer to greylist refinery
pub fn greylist_refinery(&self) -> GreylistRefineryPtr {
self.greylist_refinery.clone()
/// Get pointer to refine session
pub fn session_refine(&self) -> RefineSessionPtr {
self.session_refine.clone()
}
/// Enable network debugging

View File

@@ -25,7 +25,7 @@ use smol::Executor;
use super::{
super::{
channel::ChannelPtr,
hosts::store::{HostColor, HostsPtr},
hosts::{HostColor, HostsPtr},
message::{AddrsMessage, GetAddrsMessage},
message_subscriber::MessageSubscription,
p2p::P2pPtr,
@@ -237,7 +237,7 @@ impl ProtocolAddress {
}
/// Send our own external addresses over a channel. Get the latest
/// last_seen field from InboundSession, and send it along with our
/// last_seen field from RefineSession, and send it along with our
/// external address.
///
/// If our external address is misconfigured, send an empty vector.
@@ -264,8 +264,8 @@ impl ProtocolAddress {
}
let mut addrs = vec![];
let inbound = self.p2p.session_inbound();
for (addr, last_seen) in inbound.ping_self.addrs.lock().await.iter() {
let refinery = self.p2p.session_refine();
for (addr, last_seen) in refinery.self_handshake.addrs.lock().await.iter() {
addrs.push((addr.clone(), *last_seen));
}

View File

@@ -25,7 +25,7 @@ use smol::Executor;
use super::{
super::{
channel::ChannelPtr,
hosts::store::{HostColor, HostsPtr},
hosts::{HostColor, HostsPtr},
message::{AddrsMessage, GetAddrsMessage},
message_subscriber::MessageSubscription,
p2p::P2pPtr,
@@ -78,8 +78,8 @@ impl ProtocolSeed {
}
let mut addrs = vec![];
let inbound = self.p2p.session_inbound();
for (addr, last_seen) in inbound.ping_self.addrs.lock().await.iter() {
let refinery = self.p2p.session_refine();
for (addr, last_seen) in refinery.self_handshake.addrs.lock().await.iter() {
addrs.push((addr.clone(), *last_seen));
}

View File

@@ -23,10 +23,10 @@
//! an acceptor pointer, and a stoppable task pointer. Using a weak pointer
//! to P2P allows us to avoid circular dependencies.
use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH};
use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, error, info, warn};
use log::{debug, error, info};
use smol::{lock::Mutex, Executor};
use url::Url;
@@ -35,13 +35,12 @@ use super::{
acceptor::{Acceptor, AcceptorPtr},
channel::ChannelPtr,
dnet::{self, dnetev, DnetEvent},
hosts::refinery::ping_node,
p2p::{P2p, P2pPtr},
},
Session, SessionBitFlag, SESSION_INBOUND,
};
use crate::{
system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr, Subscription},
system::{LazyWeak, StoppableTask, StoppableTaskPtr, Subscription},
Error, Result,
};
@@ -52,21 +51,16 @@ pub struct InboundSession {
pub(in crate::net) p2p: LazyWeak<P2p>,
acceptors: Mutex<Vec<AcceptorPtr>>,
accept_tasks: Mutex<Vec<StoppableTaskPtr>>,
/// Task that periodically checks our external addresses.
pub(in crate::net) ping_self: Arc<PingSelfProcess>,
}
impl InboundSession {
/// Create a new inbound session
pub fn new() -> InboundSessionPtr {
let self_ = Arc::new(Self {
Arc::new(Self {
p2p: LazyWeak::new(),
acceptors: Mutex::new(Vec::new()),
accept_tasks: Mutex::new(Vec::new()),
ping_self: PingSelfProcess::new(),
});
self_.ping_self.session.init(self_.clone());
self_
})
}
/// Starts the inbound session. Begins by accepting connections and fails
@@ -112,9 +106,6 @@ impl InboundSession {
.await?;
}
debug!(target: "net::inbound_session", "Starting ping_self process");
self.ping_self.clone().start().await;
Ok(())
}
@@ -134,9 +125,6 @@ impl InboundSession {
for accept_task in accept_tasks {
accept_task.stop().await;
}
debug!(target: "net::inbound_session", "Stopping ping_self process");
self.ping_self.clone().stop().await;
}
/// Start accepting connections for inbound session.
@@ -225,105 +213,3 @@ impl Session for InboundSession {
SESSION_INBOUND
}
}
/// Periodically try to do a version exchange with our own external
/// addresses. If the version exchange is successful, take a timestamp and
/// save it along with the external addresses. Each address along with its
/// timestamp (the `last_seen` data field) is sent in to other nodes in
/// ProtocolAddr and ProtocolSeed.
///
/// On first run, PingSelfProcess will immediately conduct a version exchange
/// with our external addresses, and if successful update the last_seen
/// field. The process will wait [TODO: ping_self_interval) before retrying.
///
/// There are two situations in which this can fail:
///
/// 1. If our external address is misconfigured
/// 2. If we have reached our inbound connection limit.
///
/// If our external address is misconfigured, doing a version exchange
/// with ourselves will not work and so the external addresses will not
/// be shared with other nodes.
///
/// If we have reached our inbound connection limit, the external address
/// will continue to be broadcast with an older `last_seen` (from before
/// our inbound connection was reached).
pub struct PingSelfProcess {
process: StoppableTaskPtr,
session: LazyWeak<InboundSession>,
pub(in crate::net) addrs: Mutex<HashMap<Url, u64>>,
}
impl PingSelfProcess {
fn new() -> Arc<Self> {
Arc::new(Self {
process: StoppableTask::new(),
session: LazyWeak::new(),
addrs: Mutex::new(HashMap::new()),
})
}
async fn start(self: Arc<Self>) {
let ex = self.session().p2p().executor();
self.process.clone().start(
async move {
self.run().await;
unreachable!();
},
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
ex,
);
}
async fn stop(self: Arc<Self>) {
self.process.stop().await
}
async fn run(self: Arc<Self>) {
let external_addrs = self.session().p2p().settings().external_addrs.clone();
let mut current_attempt = 0;
loop {
if current_attempt >= 1 {
// TODO: make this a configurable interval
sleep(600).await;
}
// Only proceed if the external address is configured.
if external_addrs.is_empty() {
current_attempt += 1;
continue
}
for addr in external_addrs.iter() {
debug!(target: "net::inbound_session::ping_self",
"Attempting a version exchange addr={}", addr);
if ping_node(addr.clone(), self.session().p2p()).await {
debug!(target: "net::inbound_session::ping_self",
"Version exchange successful! Updating last seen addr={}", addr);
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
let mut addrs = self.addrs.lock().await;
if addrs.contains_key(addr) {
let val = addrs.get_mut(addr).unwrap();
*val = last_seen;
}
addrs.insert(addr.clone(), last_seen);
} else {
// Either our external addr is invalid or our max inbound
// connection count has been reached.
warn!(target: "net::inbound_session::ping_self",
"Version exchange failed! addr={}", addr);
}
}
current_attempt += 1;
}
}
fn session(&self) -> InboundSessionPtr {
self.session.upgrade()
}
}

View File

@@ -29,7 +29,7 @@
//! and insures that no other part of the program uses the slots at the
//! same time.
use std::{sync::Arc, time::UNIX_EPOCH};
use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, error, info, warn};
@@ -44,7 +44,7 @@ use super::{
Session, SessionBitFlag, SESSION_MANUAL,
};
use crate::{
net::hosts::store::{HostColor, HostState},
net::hosts::HostState,
system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr},
Error, Result,
};
@@ -134,14 +134,6 @@ impl ManualSession {
// Register the new channel
self.register_channel(channel.clone(), ex.clone()).await?;
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
// Add this connection to the anchorlist
self.p2p()
.hosts()
.move_host(&addr, last_seen, HostColor::Gold, Some(channel.clone()))
.await?;
// Wait for channel to close
stop_sub.receive().await;

View File

@@ -22,7 +22,7 @@ use async_trait::async_trait;
use log::debug;
use smol::Executor;
use super::{channel::ChannelPtr, hosts::store::HostColor, p2p::P2pPtr, protocol::ProtocolVersion};
use super::{channel::ChannelPtr, p2p::P2pPtr, protocol::ProtocolVersion};
use crate::Result;
pub mod inbound_session;
@@ -33,6 +33,8 @@ pub mod outbound_session;
pub use outbound_session::{OutboundSession, OutboundSessionPtr};
pub mod seedsync_session;
pub use seedsync_session::{SeedSyncSession, SeedSyncSessionPtr};
pub mod refine_session;
pub use refine_session::{RefineSession, RefineSessionPtr};
/// Bitwise selectors for the `protocol_registry`
pub type SessionBitFlag = u32;
@@ -40,13 +42,14 @@ pub const SESSION_INBOUND: SessionBitFlag = 0b0001;
pub const SESSION_OUTBOUND: SessionBitFlag = 0b0010;
pub const SESSION_MANUAL: SessionBitFlag = 0b0100;
pub const SESSION_SEED: SessionBitFlag = 0b1000;
pub const SESSION_REFINE: SessionBitFlag = 0b0000;
pub const SESSION_ALL: SessionBitFlag = 0b1111;
pub type SessionWeakPtr = Weak<dyn Session + Send + Sync + 'static>;
/// Removes channel from the list of connected channels when a stop signal
/// is received.
pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: SessionBitFlag) {
pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, _type_id: SessionBitFlag) {
debug!(target: "net::session::remove_sub_on_stop()", "[START]");
let addr = channel.address();
@@ -62,17 +65,16 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: Sessi
"Received stop event. Removing channel {}", addr,
);
if let SESSION_OUTBOUND | SESSION_MANUAL = type_id {
debug!(
target: "net::session::remove_sub_on_stop()",
"Downgrading {}", addr,
);
// TODO: downgrade to greylist if outbound or manual session.
//if let SESSION_OUTBOUND | SESSION_MANUAL = type_id {
// debug!(
// target: "net::session::remove_sub_on_stop()",
// "Downgrading {}", addr,
// );
if !p2p.hosts().is_connection_to_self(addr).await {
let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap();
p2p.hosts().move_host(addr, last_seen, HostColor::Grey, None).await.unwrap();
}
}
// let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap();
// p2p.hosts().move_host(addr, last_seen, HostColor::Grey, None).await.unwrap();
//}
// Remove channel from p2p
p2p.hosts().unregister(channel.address()).await;
@@ -158,6 +160,11 @@ pub trait Session: Sync {
// Perform handshake
protocol_version.run(executor.clone()).await?;
// TODO: Upgrade to goldlist if outbound or manual session.
//if let SESSION_OUTBOUND | SESSION_MANUAL = type_id {
// //...
//}
// Attempt to add channel to registry
self.p2p().hosts().register_channel(channel.clone()).await;

View File

@@ -44,7 +44,7 @@ use super::{
channel::ChannelPtr,
connector::Connector,
dnet::{self, dnetev, DnetEvent},
hosts::store::{HostColor, HostState},
hosts::{HostColor, HostState},
message::GetAddrsMessage,
p2p::{P2p, P2pPtr},
},
@@ -99,6 +99,7 @@ impl OutboundSession {
/// Stops the outbound session.
pub(crate) async fn stop(&self) {
debug!(target: "net::outbound_session", "Stopping outbound session");
let slots = &*self.slots.lock().await;
for slot in slots {
@@ -399,12 +400,6 @@ impl Slot {
self.channel_id.store(channel.info.id, Ordering::Relaxed);
// Add this connection to the anchorlist
hosts
.move_host(&addr, last_seen, HostColor::Gold, Some(channel.clone()))
.await
.unwrap();
// Wait for channel to close
stop_sub.receive().await;

View File

@@ -0,0 +1,401 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2024 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
//! TODO: doc
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant, UNIX_EPOCH},
};
use async_trait::async_trait;
use log::{debug, warn};
use smol::lock::Mutex;
use url::Url;
use super::super::p2p::{P2p, P2pPtr};
use crate::{
net::{
connector::Connector,
hosts::{HostColor, HostState},
protocol::ProtocolVersion,
session::{Session, SessionBitFlag, SESSION_REFINE},
},
system::{sleep, timeout::timeout, LazyWeak, StoppableTask, StoppableTaskPtr},
Error,
};
pub type RefineSessionPtr = Arc<RefineSession>;
pub struct RefineSession {
/// Weak pointer to parent p2p object
pub(in crate::net) p2p: LazyWeak<P2p>,
/// Task that periodically checks entries in the greylist.
pub(in crate::net) refinery: Arc<GreylistRefinery>,
/// Task that periodically checks our external addresses.
pub(in crate::net) self_handshake: Arc<SelfHandshake>,
}
impl RefineSession {
pub fn new() -> RefineSessionPtr {
let self_ = Arc::new(Self {
p2p: LazyWeak::new(),
refinery: GreylistRefinery::new(),
self_handshake: SelfHandshake::new(),
});
self_.self_handshake.session.init(self_.clone());
self_.refinery.session.init(self_.clone());
self_
}
pub(crate) async fn start(self: Arc<Self>) {
debug!(target: "net::refine_session", "Starting greylist refinery process");
self.refinery.clone().start().await;
debug!(target: "net::refine_session", "Starting self handshake process");
self.self_handshake.clone().start().await;
}
pub(crate) async fn stop(&self) {
debug!(target: "net::refine_session", "Stopping refinery process");
self.refinery.clone().stop().await;
debug!(target: "net::refine_session", "Stopping self handshake process");
self.self_handshake.clone().stop().await;
}
// TODO: doc and explain why it's public
pub async fn handshake_node(self: Arc<Self>, addr: Url, p2p: P2pPtr) -> bool {
let self_ = Arc::downgrade(&self);
let connector = Connector::new(self.p2p().settings(), self_);
debug!(target: "net::refinery::handshake_node()", "Attempting to connect to {}", addr);
match connector.connect(&addr).await {
Ok((url, channel)) => {
debug!(target: "net::refinery::handshake_node()", "Successfully created a channel with {}", url);
// First initialize the version protocol and its Version, Verack subscribers.
let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
debug!(target: "net::refinery::handshake_node()", "Performing handshake protocols with {}", url);
// Then run the version exchange, store the channel and subscribe to a stop signal.
let handshake_task =
self.perform_handshake_protocols(proto_ver, channel.clone(), p2p.executor());
debug!(target: "net::refinery::handshake_node()", "Starting channel {}", url);
channel.clone().start(p2p.executor());
// Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if
// the handshake does not finish channel.stop() will never get called, resulting in
// zombie processes.
let result = timeout(Duration::from_secs(5), handshake_task).await;
debug!(target: "net::refinery::handshake_node()", "Stopping channel {}", url);
channel.stop().await;
match result {
Ok(_) => {
debug!(target: "net::refinery::handshake_node()", "Handshake success!");
true
}
Err(e) => {
debug!(target: "net::refinery::handshake_node()", "Handshake err: {}", e);
false
}
}
}
Err(e) => {
debug!(target: "net::refinery::handshake_node()", "Failed to connect to {}, ({})", addr, e);
false
}
}
}
}
#[async_trait]
impl Session for RefineSession {
fn p2p(&self) -> P2pPtr {
self.p2p.upgrade()
}
fn type_id(&self) -> SessionBitFlag {
SESSION_REFINE
}
}
/// Periodically probes entries in the greylist.
///
/// Randomly selects a greylist entry and tries to establish a local
/// connection to it using the method handshake_node(), which creates a
/// channel and does a version exchange using `perform_handshake_protocols()`.
///
/// If successful, the entry is removed from the greylist and added to the
/// whitelist with an updated last_seen timestamp. If non-successful, the
/// entry is removed from the greylist.
pub struct GreylistRefinery {
/// Weak pointer to parent object
session: LazyWeak<RefineSession>,
process: StoppableTaskPtr,
}
impl GreylistRefinery {
pub fn new() -> Arc<Self> {
Arc::new(Self { session: LazyWeak::new(), process: StoppableTask::new() })
}
pub async fn start(self: Arc<Self>) {
match self.p2p().hosts().container.load_all(&self.p2p().settings().hostlist).await {
Ok(()) => {
debug!(target: "net::refinery::start()", "Load hosts successful!");
}
Err(e) => {
warn!(target: "net::refinery::start()", "Error loading hosts {}", e);
}
}
let ex = self.p2p().executor();
self.process.clone().start(
async move {
self.run().await;
unreachable!();
},
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
ex,
);
}
pub async fn stop(self: Arc<Self>) {
debug!(target: "net::refinery", "Stopping refinery");
self.process.stop().await;
match self.p2p().hosts().container.save_all(&self.p2p().settings().hostlist).await {
Ok(()) => {
debug!(target: "net::refinery::stop()", "Save hosts successful!");
}
Err(e) => {
warn!(target: "net::refinery::stop()", "Error saving hosts {}", e);
}
}
}
// Randomly select a peer on the greylist and probe it. This method will remove from the
// greylist and store on the whitelist providing the peer is responsive.
async fn run(self: Arc<Self>) {
let settings = self.p2p().settings();
let hosts = self.p2p().hosts();
loop {
sleep(settings.greylist_refinery_interval).await;
if hosts.container.is_empty(HostColor::Grey).await {
debug!(target: "net::refinery",
"Greylist is empty! Cannot start refinery process");
continue
}
// Pause the refinery if we've had zero connections for longer than the configured
// limit.
let offline_limit = Duration::from_secs(settings.time_with_no_connections);
let offline_timer = Instant::now().duration_since(*hosts.last_connection.read().await);
if hosts.channels().await.is_empty() && offline_timer >= offline_limit {
warn!(target: "net::refinery", "No connections for {}s. GreylistRefinery paused.",
offline_timer.as_secs());
// It is neccessary to clear suspended hosts at this point, otherwise these
// hosts cannot be connected to in Outbound Session. Failure to do this could
// result in the refinery being paused forver (since connections could never be
// made).
let suspended_hosts = hosts.suspended().await;
for host in suspended_hosts {
hosts.unregister(&host).await;
}
continue
}
// Only attempt to refine peers that match our transports.
match hosts
.container
.fetch_random_with_schemes(HostColor::Grey, &settings.allowed_transports)
.await
{
Some((entry, position)) => {
let url = &entry.0;
if let Err(e) = hosts.try_register(url.clone(), HostState::Refine).await {
debug!(target: "net::refinery", "Unable to refine addr={}, err={}",
url.clone(), e);
continue
}
if !self.session().handshake_node(url.clone(), self.p2p().clone()).await {
hosts.container.remove(HostColor::Grey, url, position).await;
debug!(
target: "net::refinery",
"Peer {} is non-responsive. Removed from greylist", url,
);
// Remove this entry from HostRegistry to avoid this host getting
// stuck in the Refining state.
//
// It is not necessary to call this when the refinery passes, since the
// state will be changed to Connected.
hosts.unregister(url).await;
continue
}
debug!(
target: "net::refinery",
"Peer {} is responsive. Adding to whitelist", url,
);
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
// Add to the whitelist and remove from the greylist.
hosts.move_host(url, last_seen, HostColor::White, None).await.unwrap();
hosts.unregister(url).await;
debug!(target: "net::refinery", "GreylistRefinery complete!");
continue
}
None => {
debug!(target: "net::refinery", "No matching greylist entries found. Cannot proceed with refinery");
continue
}
}
}
}
fn session(&self) -> RefineSessionPtr {
self.session.upgrade()
}
fn p2p(&self) -> P2pPtr {
self.session().p2p()
}
}
/// Periodically try to do a version exchange with our own external
/// addresses. If the version exchange is successful, take a timestamp and
/// save it along with the external addresses. Each address along with its
/// timestamp (the `last_seen` data field) is sent in to other nodes in
/// ProtocolAddr and ProtocolSeed.
///
/// On first run, SelfHandshake will immediately conduct a version exchange
/// with our external addresses, and if successful update the last_seen
/// field. The process will wait [TODO: self_handshake_interval) before retrying.
///
/// There are two situations in which this can fail:
///
/// 1. If our external address is misconfigured
/// 2. If we have reached our inbound connection limit.
///
/// If our external address is misconfigured, doing a version exchange
/// with ourselves will not work and so the external addresses will not
/// be shared with other nodes.
///
/// If we have reached our inbound connection limit, the external address
/// will continue to be broadcast with an older `last_seen` (from before
/// our inbound connection was reached).
pub struct SelfHandshake {
process: StoppableTaskPtr,
session: LazyWeak<RefineSession>,
pub(in crate::net) addrs: Mutex<HashMap<Url, u64>>,
}
impl SelfHandshake {
fn new() -> Arc<Self> {
Arc::new(Self {
process: StoppableTask::new(),
session: LazyWeak::new(),
addrs: Mutex::new(HashMap::new()),
})
}
async fn start(self: Arc<Self>) {
let ex = self.session().p2p().executor();
self.process.clone().start(
async move {
self.run().await;
unreachable!();
},
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
ex,
);
}
async fn stop(self: Arc<Self>) {
self.process.stop().await
}
async fn run(self: Arc<Self>) {
let external_addrs = self.session().p2p().settings().external_addrs.clone();
let mut current_attempt = 0;
loop {
if current_attempt >= 1 {
// TODO: make this a configurable interval
sleep(600).await;
}
// Only proceed if the external address is configured.
if external_addrs.is_empty() {
current_attempt += 1;
continue
}
for addr in external_addrs.iter() {
debug!(target: "net::refine_session::self_handshake",
"Attempting a version exchange addr={}", addr);
if self.session().handshake_node(addr.clone(), self.session().p2p()).await {
debug!(target: "net::refine_session::self_handshake",
"Version exchange successful! Updating last seen addr={}", addr);
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
let mut addrs = self.addrs.lock().await;
if addrs.contains_key(addr) {
let val = addrs.get_mut(addr).unwrap();
*val = last_seen;
}
addrs.insert(addr.clone(), last_seen);
} else {
// Either our external addr is invalid or our max inbound
// connection count has been reached.
warn!(target: "net::refine_session::self_handshake",
"Version exchange failed! addr={}", addr);
}
}
current_attempt += 1;
}
}
fn session(&self) -> RefineSessionPtr {
self.session.upgrade()
}
}

View File

@@ -51,7 +51,7 @@ use url::Url;
use super::{
super::{
connector::Connector,
hosts::store::HostColor,
hosts::HostColor,
p2p::{P2p, P2pPtr},
},
Session, SessionBitFlag, SESSION_SEED,

View File

@@ -26,7 +26,7 @@ use smol::{channel, future, Executor};
use url::Url;
use crate::{
net::{hosts::store::HostColor, P2p, Settings},
net::{hosts::HostColor, P2p, Settings},
system::sleep,
};
@@ -102,6 +102,7 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
inbound_addrs: vec![seed_addr.clone()],
external_addrs: vec![seed_addr.clone()],
outbound_connections: 0,
greylist_refinery_interval: 2,
inbound_connections: usize::MAX,
seeds: vec![],
peers: vec![],
@@ -146,7 +147,7 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
}
info!("Waiting until all peers connect");
sleep(10).await;
sleep(15).await;
for p2p in p2p_instances.iter() {
let hosts = p2p.hosts();