diff --git a/src/net/protocol/protocol_address.rs b/src/net/protocol/protocol_address.rs index 405d66fc3..70db20b0b 100644 --- a/src/net/protocol/protocol_address.rs +++ b/src/net/protocol/protocol_address.rs @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -use std::{sync::Arc, time::UNIX_EPOCH}; +use std::sync::Arc; use async_trait::async_trait; use log::{debug, warn}; @@ -25,7 +25,7 @@ use smol::Executor; use super::{ super::{ channel::ChannelPtr, - hosts::{refinery::ping_node, store::HostsPtr}, + hosts::store::HostsPtr, message::{AddrsMessage, GetAddrsMessage}, message_subscriber::MessageSubscription, p2p::P2pPtr, @@ -37,9 +37,24 @@ use super::{ }; use crate::Result; -/// Defines address and get-address messages. -/// On receiving GetAddr, nodes send an AddrMessage containing whitelisted nodes. -/// On receiving an AddrMessage, nodes enter the info into their greylists. +/// Defines address and get-address messages. On receiving GetAddr, nodes +/// reply an AddrMessage containing nodes from their hostlist. On receiving +/// an AddrMessage, nodes enter the info into their greylists. +/// +/// The node selection logic for creating an AddrMessage is as follows: +/// +/// 1. First select nodes matching the requested transports from the +/// whitelist. These nodes have a high guarantee of being reachable, so we +/// prioritize them first. +/// +/// 2. Then select whitelist nodes that don't match our transports. We do +/// this so that nodes share and propagate nodes of different transports, +/// even if they can't connect to them themselves. +/// +/// 3. Finally, if there's still space available, fill the remaining vector +/// space with greylist entries. This is necessary in case this node does +/// not support the transports of the requesting node (non-supported +/// transports are stored on the greylist). pub struct ProtocolAddress { channel: ChannelPtr, addrs_sub: MessageSubscription, @@ -104,8 +119,9 @@ impl ProtocolAddress { } } - /// Handles receiving the get-address message. Continually receives get-address - /// messages on the get-address subscription. Then replies with an address message. + /// Handles receiving the get-address message. Continually receives + /// get-address messages on the get-address subscription. Then replies + /// with an address message. async fn handle_receive_get_addrs(self: Arc) -> Result<()> { debug!( target: "net::protocol_address::handle_receive_get_addrs()", @@ -170,47 +186,45 @@ impl ProtocolAddress { } } - // If it's an outbound session, we have an extern_addr, and address advertising - // is enabled, send our address. + /// Send our own external addresses over a channel. Get the latest + /// last_seen field from InboundSession, and send it along with our + /// external address. + /// + /// If our external address is misconfigured, send an empty vector. + /// If we have reached our inbound connection limit, send our external + /// address with a `last_seen` field that corresponds to the last time + /// we could receive inbound connections. async fn send_my_addrs(self: Arc) -> Result<()> { - debug!(target: "net::protocol_address::send_my_addrs()", "[START]"); - let type_id = self.channel.session_type_id(); + debug!( + target: "net::protocol_address::send_my_addrs()", + "[START] channel address={}", self.channel.address(), + ); + let type_id = self.channel.session_type_id(); if type_id != SESSION_OUTBOUND { - debug!(target: "net::protocol_address::send_my_addrs()", "Not an outbound session. Stopping"); + debug!(target: "net::protocol_address::send_my_addrs()", + "Not an outbound session. Stopping"); return Ok(()) } if self.settings.external_addrs.is_empty() { - debug!(target: "net::protocol_address::send_my_addrs()", "External addr not configured. Stopping"); + debug!(target: "net::protocol_address::send_my_addrs()", + "External addr not configured. Stopping"); return Ok(()) } - debug!( - target: "net::protocol_address::send_my_addrs()", - "[START] address={}", self.channel.address(), - ); - let mut addrs = vec![]; - for addr in self.settings.external_addrs.clone() { - //addrs.push((addr, 0)); - debug!(target: "net::protocol_address::send_my_addrs()", "Attempting to ping self"); - - // See if we can do a version exchange with ourself. - if ping_node(addr.clone(), self.p2p.clone()).await { - // We're online. Update last_seen and broadcast our address. - let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - addrs.push((addr, last_seen)); - } else { - // Our external addr is invalid. If every external addr in the list is invalid - // we will just broadcast an empty AddrsMessage. - debug!(target: "net::protocol_address::send_my_addrs()", "Ping self failed!"); - } + let inbound = self.p2p.session_inbound(); + for (addr, last_seen) in inbound.ping_self.addrs.lock().await.iter() { + addrs.push((addr.clone(), last_seen.clone())); } - debug!(target: "net::protocol_address::send_my_addrs()", "Broadcasting {} addresses", addrs.len()); + + debug!(target: "net::protocol_address::send_my_addrs()", + "Broadcasting {} addresses", addrs.len()); let ext_addr_msg = AddrsMessage { addrs }; self.channel.send(&ext_addr_msg).await?; - debug!(target: "net::protocol_address::send_my_addrs()", "[END]"); + debug!(target: "net::protocol_address::send_my_addrs()", + "[END] channel address={}", self.channel.address()); Ok(()) } @@ -218,12 +232,13 @@ impl ProtocolAddress { #[async_trait] impl ProtocolBase for ProtocolAddress { - /// Starts the address protocol. If it's an outbound session, has an external address - /// is set to advertise, pings our external address and sends it if everything is fine. - /// Runs receive address and get address protocols on the protocol task manager. - /// Then sends get-address msg. + /// Start the address protocol. If it's an outbound session and has an + /// external address, send our external address. Run receive address + /// and get address protocols on the protocol task manager. Then send + /// get-address msg. async fn start(self: Arc, ex: Arc>) -> Result<()> { - debug!(target: "net::protocol_address::start()", "START => address={}", self.channel.address()); + debug!(target: "net::protocol_address::start()", + "START => address={}", self.channel.address()); self.jobsman.clone().start(ex.clone()); @@ -240,7 +255,9 @@ impl ProtocolBase for ProtocolAddress { }; self.channel.send(&get_addrs).await?; - debug!(target: "net::protocol_address::start()", "END => address={}", self.channel.address()); + debug!(target: "net::protocol_address::start()", + "END => address={}", self.channel.address()); + Ok(()) } fn name(&self) -> &'static str { diff --git a/src/net/protocol/protocol_seed.rs b/src/net/protocol/protocol_seed.rs index a405024fd..120225139 100644 --- a/src/net/protocol/protocol_seed.rs +++ b/src/net/protocol/protocol_seed.rs @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -use std::{sync::Arc, time::UNIX_EPOCH}; +use std::sync::Arc; use async_trait::async_trait; use log::debug; @@ -33,7 +33,7 @@ use super::{ }, protocol_base::{ProtocolBase, ProtocolBasePtr}, }; -use crate::{net::hosts::refinery::ping_node, Result}; +use crate::Result; /// Implements the seed protocol pub struct ProtocolSeed { @@ -59,12 +59,18 @@ impl ProtocolSeed { Arc::new(Self { channel, hosts, settings, addr_sub, p2p }) } - /// Sends own external addresses over a channel. Imports own external addresses - /// from settings, then adds those addresses to an addrs message and sends it - /// out over the channel. + /// Send our own external addresses over a channel. Get the latest + /// last_seen field from InboundSession, and send it along with our + /// external address. + /// + /// If our external address is misconfigured, send an empty vector. + /// If we have reached our inbound connection limit, send our external + /// address with a `last_seen` field that corresponds to the last time + /// we could receive inbound connections. pub async fn send_my_addrs(&self) -> Result<()> { - debug!(target: "net::protocol_seed::send_my_addrs()", "[START]"); - // Do nothing if external addresses are not configured + debug!(target: "net::protocol_seed::send_my_addrs()", + "[START] channel address={}", self.channel.address()); + if self.settings.external_addrs.is_empty() { debug!(target: "net::protocol_seed::send_my_addrs()", "External address is not configured. Stopping"); @@ -72,25 +78,17 @@ impl ProtocolSeed { } let mut addrs = vec![]; - for addr in self.settings.external_addrs.clone() { - //addrs.push((addr, 0)); - debug!(target: "net::protocol_seed::send_my_addrs()", "Attempting to ping self"); - - // See if we can do a version exchange with ourself. - if ping_node(addr.clone(), self.p2p.clone()).await { - // We're online. Update last_seen and broadcast our address. - let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - addrs.push((addr, last_seen)); - } else { - // Our external addr is invalid. If every external addr in the list is invalid - // we will just broadcast an empty AddrsMessage. - debug!(target: "net::protocol_seed::send_my_addrs()", "Ping self failed!"); - } + let inbound = self.p2p.session_inbound(); + for (addr, last_seen) in inbound.ping_self.addrs.lock().await.iter() { + addrs.push((addr.clone(), last_seen.clone())); } - debug!(target: "net::protocol_seed::send_my_addrs()", "Broadcasting {} addresses", addrs.len()); + + debug!(target: "net::protocol_seed::send_my_addrs()", + "Broadcasting {} addresses", addrs.len()); let ext_addr_msg = AddrsMessage { addrs }; self.channel.send(&ext_addr_msg).await?; - debug!(target: "net::protocol_seed::send_my_addrs()", "[END]"); + debug!(target: "net::protocol_seed::send_my_addrs()", + "[END] channel address={}", self.channel.address()); Ok(()) } @@ -98,9 +96,10 @@ impl ProtocolSeed { #[async_trait] impl ProtocolBase for ProtocolSeed { - /// Starts the seed protocol. Creates a subscription to the address message, - /// then sends our address to the seed server. Sends a get-address message - /// and receives an address messsage. + /// Starts the seed protocol. Creates a subscription to the address + /// message. If our external address is enabled, then send our address + /// to the seed server. Sends a get-address message and receives an + /// address messsage. async fn start(self: Arc, _ex: Arc>) -> Result<()> { debug!(target: "net::protocol_seed::start()", "START => address={}", self.channel.address()); diff --git a/src/net/session/inbound_session.rs b/src/net/session/inbound_session.rs index 539cf636d..f382474a1 100644 --- a/src/net/session/inbound_session.rs +++ b/src/net/session/inbound_session.rs @@ -23,10 +23,10 @@ //! an acceptor pointer, and a stoppable task pointer. Using a weak pointer //! to P2P allows us to avoid circular dependencies. -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH}; use async_trait::async_trait; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use smol::{lock::Mutex, Executor}; use url::Url; @@ -35,12 +35,13 @@ use super::{ acceptor::{Acceptor, AcceptorPtr}, channel::ChannelPtr, dnet::{self, dnetev, DnetEvent}, + hosts::refinery::ping_node, p2p::{P2p, P2pPtr}, }, Session, SessionBitFlag, SESSION_INBOUND, }; use crate::{ - system::{LazyWeak, StoppableTask, StoppableTaskPtr, Subscription}, + system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr, Subscription}, Error, Result, }; @@ -51,16 +52,21 @@ pub struct InboundSession { pub(in crate::net) p2p: LazyWeak, acceptors: Mutex>, accept_tasks: Mutex>, + /// Task that periodically checks our external addresses. + pub(in crate::net) ping_self: Arc, } impl InboundSession { /// Create a new inbound session pub fn new() -> InboundSessionPtr { - Arc::new(Self { + let self_ = Arc::new(Self { p2p: LazyWeak::new(), acceptors: Mutex::new(Vec::new()), accept_tasks: Mutex::new(Vec::new()), - }) + ping_self: PingSelfProcess::new(), + }); + self_.ping_self.session.init(self_.clone()); + self_ } /// Starts the inbound session. Begins by accepting connections and fails @@ -106,6 +112,8 @@ impl InboundSession { .await?; } + self.ping_self.clone().start().await; + Ok(()) } @@ -120,6 +128,8 @@ impl InboundSession { for accept_task in accept_tasks { accept_task.stop().await; } + + self.ping_self.clone().stop().await; } /// Start accepting connections for inbound session. @@ -208,3 +218,105 @@ impl Session for InboundSession { SESSION_INBOUND } } + +/// Periodically try to do a version exchange with our own external +/// addresses. If the version exchange is successful, take a timestamp and +/// save it along with the external addresses. Each address along with its +/// timestamp (the `last_seen` data field) is sent in to other nodes in +/// ProtocolAddr and ProtocolSeed. +/// +/// On first run, PingSelfProcess will immediately conduct a version exchange +/// with our external addresses, and if successful update the last_seen +/// field. The process will wait [TODO: ping_self_interval) before retrying. +/// +/// There are two situations in which this can fail: +/// +/// 1. If our external address is misconfigured +/// 2. If we have reached our inbound connection limit. +/// +/// If our external address is misconfigured, doing a version exchange +/// with ourselves will not work and so the external addresses will not +/// be shared with other nodes. +/// +/// If we have reached our inbound connection limit, the external address +/// will continue to be broadcast with an older `last_seen` (from before +/// our inbound connection was reached). +pub struct PingSelfProcess { + process: StoppableTaskPtr, + session: LazyWeak, + pub(in crate::net) addrs: Mutex>, +} + +impl PingSelfProcess { + fn new() -> Arc { + Arc::new(Self { + process: StoppableTask::new(), + session: LazyWeak::new(), + addrs: Mutex::new(HashMap::new()), + }) + } + + async fn start(self: Arc) { + let ex = self.session().p2p().executor(); + self.process.clone().start( + async move { + self.run().await; + unreachable!(); + }, + // Ignore stop handler + |_| async {}, + Error::NetworkServiceStopped, + ex, + ); + } + + async fn stop(self: Arc) { + self.process.stop().await + } + + async fn run(self: Arc) { + let external_addrs = self.session().p2p().settings().external_addrs.clone(); + let mut current_attempt = 0; + + loop { + if current_attempt >= 1 { + // TODO: make this a configurable interval + sleep(600).await; + } + + // Only proceed if the external address is not configured. + if external_addrs.is_empty() { + current_attempt += 1; + continue + } + + for addr in external_addrs.iter() { + debug!(target: "net::inbound_session::ping_self", + "Attempting a version exchange addr={}", addr); + + if ping_node(addr.clone(), self.session().p2p()).await { + debug!(target: "net::inbound_session::ping_self", + "Version exchange successful! Updating last seen addr={}", addr); + let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); + let mut addrs = self.addrs.lock().await; + + if addrs.contains_key(addr) { + let val = addrs.get_mut(&addr).unwrap(); + *val = last_seen; + } + addrs.insert(addr.clone(), last_seen); + } else { + // Either our external addr is invalid or our max inbound + // connection count has been reached. + warn!(target: "net::inbound_session::ping_self", + "Version exchange failed! addr={}", addr); + } + } + current_attempt += 1; + } + } + + fn session(&self) -> InboundSessionPtr { + self.session.upgrade() + } +}