inbound_session: implement PingSelfProcess

Move the process that pings ourselves and creates a last_seen field
for each of our external addresses into a custom child process of
InboundSession, and remove from ProtocolSeed and ProtocolAddr.
This commit is contained in:
draoi
2024-02-01 12:56:13 +01:00
parent 79512e6dce
commit ccc6b25754
3 changed files with 199 additions and 71 deletions

View File

@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{sync::Arc, time::UNIX_EPOCH};
use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, warn};
@@ -25,7 +25,7 @@ use smol::Executor;
use super::{
super::{
channel::ChannelPtr,
hosts::{refinery::ping_node, store::HostsPtr},
hosts::store::HostsPtr,
message::{AddrsMessage, GetAddrsMessage},
message_subscriber::MessageSubscription,
p2p::P2pPtr,
@@ -37,9 +37,24 @@ use super::{
};
use crate::Result;
/// Defines address and get-address messages.
/// On receiving GetAddr, nodes send an AddrMessage containing whitelisted nodes.
/// On receiving an AddrMessage, nodes enter the info into their greylists.
/// Defines address and get-address messages. On receiving GetAddr, nodes
/// reply an AddrMessage containing nodes from their hostlist. On receiving
/// an AddrMessage, nodes enter the info into their greylists.
///
/// The node selection logic for creating an AddrMessage is as follows:
///
/// 1. First select nodes matching the requested transports from the
/// whitelist. These nodes have a high guarantee of being reachable, so we
/// prioritize them first.
///
/// 2. Then select whitelist nodes that don't match our transports. We do
/// this so that nodes share and propagate nodes of different transports,
/// even if they can't connect to them themselves.
///
/// 3. Finally, if there's still space available, fill the remaining vector
/// space with greylist entries. This is necessary in case this node does
/// not support the transports of the requesting node (non-supported
/// transports are stored on the greylist).
pub struct ProtocolAddress {
channel: ChannelPtr,
addrs_sub: MessageSubscription<AddrsMessage>,
@@ -104,8 +119,9 @@ impl ProtocolAddress {
}
}
/// Handles receiving the get-address message. Continually receives get-address
/// messages on the get-address subscription. Then replies with an address message.
/// Handles receiving the get-address message. Continually receives
/// get-address messages on the get-address subscription. Then replies
/// with an address message.
async fn handle_receive_get_addrs(self: Arc<Self>) -> Result<()> {
debug!(
target: "net::protocol_address::handle_receive_get_addrs()",
@@ -170,47 +186,45 @@ impl ProtocolAddress {
}
}
// If it's an outbound session, we have an extern_addr, and address advertising
// is enabled, send our address.
/// Send our own external addresses over a channel. Get the latest
/// last_seen field from InboundSession, and send it along with our
/// external address.
///
/// If our external address is misconfigured, send an empty vector.
/// If we have reached our inbound connection limit, send our external
/// address with a `last_seen` field that corresponds to the last time
/// we could receive inbound connections.
async fn send_my_addrs(self: Arc<Self>) -> Result<()> {
debug!(target: "net::protocol_address::send_my_addrs()", "[START]");
let type_id = self.channel.session_type_id();
debug!(
target: "net::protocol_address::send_my_addrs()",
"[START] channel address={}", self.channel.address(),
);
let type_id = self.channel.session_type_id();
if type_id != SESSION_OUTBOUND {
debug!(target: "net::protocol_address::send_my_addrs()", "Not an outbound session. Stopping");
debug!(target: "net::protocol_address::send_my_addrs()",
"Not an outbound session. Stopping");
return Ok(())
}
if self.settings.external_addrs.is_empty() {
debug!(target: "net::protocol_address::send_my_addrs()", "External addr not configured. Stopping");
debug!(target: "net::protocol_address::send_my_addrs()",
"External addr not configured. Stopping");
return Ok(())
}
debug!(
target: "net::protocol_address::send_my_addrs()",
"[START] address={}", self.channel.address(),
);
let mut addrs = vec![];
for addr in self.settings.external_addrs.clone() {
//addrs.push((addr, 0));
debug!(target: "net::protocol_address::send_my_addrs()", "Attempting to ping self");
// See if we can do a version exchange with ourself.
if ping_node(addr.clone(), self.p2p.clone()).await {
// We're online. Update last_seen and broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));
} else {
// Our external addr is invalid. If every external addr in the list is invalid
// we will just broadcast an empty AddrsMessage.
debug!(target: "net::protocol_address::send_my_addrs()", "Ping self failed!");
}
let inbound = self.p2p.session_inbound();
for (addr, last_seen) in inbound.ping_self.addrs.lock().await.iter() {
addrs.push((addr.clone(), last_seen.clone()));
}
debug!(target: "net::protocol_address::send_my_addrs()", "Broadcasting {} addresses", addrs.len());
debug!(target: "net::protocol_address::send_my_addrs()",
"Broadcasting {} addresses", addrs.len());
let ext_addr_msg = AddrsMessage { addrs };
self.channel.send(&ext_addr_msg).await?;
debug!(target: "net::protocol_address::send_my_addrs()", "[END]");
debug!(target: "net::protocol_address::send_my_addrs()",
"[END] channel address={}", self.channel.address());
Ok(())
}
@@ -218,12 +232,13 @@ impl ProtocolAddress {
#[async_trait]
impl ProtocolBase for ProtocolAddress {
/// Starts the address protocol. If it's an outbound session, has an external address
/// is set to advertise, pings our external address and sends it if everything is fine.
/// Runs receive address and get address protocols on the protocol task manager.
/// Then sends get-address msg.
/// Start the address protocol. If it's an outbound session and has an
/// external address, send our external address. Run receive address
/// and get address protocols on the protocol task manager. Then send
/// get-address msg.
async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net::protocol_address::start()", "START => address={}", self.channel.address());
debug!(target: "net::protocol_address::start()",
"START => address={}", self.channel.address());
self.jobsman.clone().start(ex.clone());
@@ -240,7 +255,9 @@ impl ProtocolBase for ProtocolAddress {
};
self.channel.send(&get_addrs).await?;
debug!(target: "net::protocol_address::start()", "END => address={}", self.channel.address());
debug!(target: "net::protocol_address::start()",
"END => address={}", self.channel.address());
Ok(())
}
fn name(&self) -> &'static str {

View File

@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{sync::Arc, time::UNIX_EPOCH};
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
@@ -33,7 +33,7 @@ use super::{
},
protocol_base::{ProtocolBase, ProtocolBasePtr},
};
use crate::{net::hosts::refinery::ping_node, Result};
use crate::Result;
/// Implements the seed protocol
pub struct ProtocolSeed {
@@ -59,12 +59,18 @@ impl ProtocolSeed {
Arc::new(Self { channel, hosts, settings, addr_sub, p2p })
}
/// Sends own external addresses over a channel. Imports own external addresses
/// from settings, then adds those addresses to an addrs message and sends it
/// out over the channel.
/// Send our own external addresses over a channel. Get the latest
/// last_seen field from InboundSession, and send it along with our
/// external address.
///
/// If our external address is misconfigured, send an empty vector.
/// If we have reached our inbound connection limit, send our external
/// address with a `last_seen` field that corresponds to the last time
/// we could receive inbound connections.
pub async fn send_my_addrs(&self) -> Result<()> {
debug!(target: "net::protocol_seed::send_my_addrs()", "[START]");
// Do nothing if external addresses are not configured
debug!(target: "net::protocol_seed::send_my_addrs()",
"[START] channel address={}", self.channel.address());
if self.settings.external_addrs.is_empty() {
debug!(target: "net::protocol_seed::send_my_addrs()",
"External address is not configured. Stopping");
@@ -72,25 +78,17 @@ impl ProtocolSeed {
}
let mut addrs = vec![];
for addr in self.settings.external_addrs.clone() {
//addrs.push((addr, 0));
debug!(target: "net::protocol_seed::send_my_addrs()", "Attempting to ping self");
// See if we can do a version exchange with ourself.
if ping_node(addr.clone(), self.p2p.clone()).await {
// We're online. Update last_seen and broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));
} else {
// Our external addr is invalid. If every external addr in the list is invalid
// we will just broadcast an empty AddrsMessage.
debug!(target: "net::protocol_seed::send_my_addrs()", "Ping self failed!");
}
let inbound = self.p2p.session_inbound();
for (addr, last_seen) in inbound.ping_self.addrs.lock().await.iter() {
addrs.push((addr.clone(), last_seen.clone()));
}
debug!(target: "net::protocol_seed::send_my_addrs()", "Broadcasting {} addresses", addrs.len());
debug!(target: "net::protocol_seed::send_my_addrs()",
"Broadcasting {} addresses", addrs.len());
let ext_addr_msg = AddrsMessage { addrs };
self.channel.send(&ext_addr_msg).await?;
debug!(target: "net::protocol_seed::send_my_addrs()", "[END]");
debug!(target: "net::protocol_seed::send_my_addrs()",
"[END] channel address={}", self.channel.address());
Ok(())
}
@@ -98,9 +96,10 @@ impl ProtocolSeed {
#[async_trait]
impl ProtocolBase for ProtocolSeed {
/// Starts the seed protocol. Creates a subscription to the address message,
/// then sends our address to the seed server. Sends a get-address message
/// and receives an address messsage.
/// Starts the seed protocol. Creates a subscription to the address
/// message. If our external address is enabled, then send our address
/// to the seed server. Sends a get-address message and receives an
/// address messsage.
async fn start(self: Arc<Self>, _ex: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net::protocol_seed::start()", "START => address={}", self.channel.address());

View File

@@ -23,10 +23,10 @@
//! an acceptor pointer, and a stoppable task pointer. Using a weak pointer
//! to P2P allows us to avoid circular dependencies.
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH};
use async_trait::async_trait;
use log::{debug, error, info};
use log::{debug, error, info, warn};
use smol::{lock::Mutex, Executor};
use url::Url;
@@ -35,12 +35,13 @@ use super::{
acceptor::{Acceptor, AcceptorPtr},
channel::ChannelPtr,
dnet::{self, dnetev, DnetEvent},
hosts::refinery::ping_node,
p2p::{P2p, P2pPtr},
},
Session, SessionBitFlag, SESSION_INBOUND,
};
use crate::{
system::{LazyWeak, StoppableTask, StoppableTaskPtr, Subscription},
system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr, Subscription},
Error, Result,
};
@@ -51,16 +52,21 @@ pub struct InboundSession {
pub(in crate::net) p2p: LazyWeak<P2p>,
acceptors: Mutex<Vec<AcceptorPtr>>,
accept_tasks: Mutex<Vec<StoppableTaskPtr>>,
/// Task that periodically checks our external addresses.
pub(in crate::net) ping_self: Arc<PingSelfProcess>,
}
impl InboundSession {
/// Create a new inbound session
pub fn new() -> InboundSessionPtr {
Arc::new(Self {
let self_ = Arc::new(Self {
p2p: LazyWeak::new(),
acceptors: Mutex::new(Vec::new()),
accept_tasks: Mutex::new(Vec::new()),
})
ping_self: PingSelfProcess::new(),
});
self_.ping_self.session.init(self_.clone());
self_
}
/// Starts the inbound session. Begins by accepting connections and fails
@@ -106,6 +112,8 @@ impl InboundSession {
.await?;
}
self.ping_self.clone().start().await;
Ok(())
}
@@ -120,6 +128,8 @@ impl InboundSession {
for accept_task in accept_tasks {
accept_task.stop().await;
}
self.ping_self.clone().stop().await;
}
/// Start accepting connections for inbound session.
@@ -208,3 +218,105 @@ impl Session for InboundSession {
SESSION_INBOUND
}
}
/// Periodically try to do a version exchange with our own external
/// addresses. If the version exchange is successful, take a timestamp and
/// save it along with the external addresses. Each address along with its
/// timestamp (the `last_seen` data field) is sent in to other nodes in
/// ProtocolAddr and ProtocolSeed.
///
/// On first run, PingSelfProcess will immediately conduct a version exchange
/// with our external addresses, and if successful update the last_seen
/// field. The process will wait [TODO: ping_self_interval) before retrying.
///
/// There are two situations in which this can fail:
///
/// 1. If our external address is misconfigured
/// 2. If we have reached our inbound connection limit.
///
/// If our external address is misconfigured, doing a version exchange
/// with ourselves will not work and so the external addresses will not
/// be shared with other nodes.
///
/// If we have reached our inbound connection limit, the external address
/// will continue to be broadcast with an older `last_seen` (from before
/// our inbound connection was reached).
pub struct PingSelfProcess {
process: StoppableTaskPtr,
session: LazyWeak<InboundSession>,
pub(in crate::net) addrs: Mutex<HashMap<Url, u64>>,
}
impl PingSelfProcess {
fn new() -> Arc<Self> {
Arc::new(Self {
process: StoppableTask::new(),
session: LazyWeak::new(),
addrs: Mutex::new(HashMap::new()),
})
}
async fn start(self: Arc<Self>) {
let ex = self.session().p2p().executor();
self.process.clone().start(
async move {
self.run().await;
unreachable!();
},
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
ex,
);
}
async fn stop(self: Arc<Self>) {
self.process.stop().await
}
async fn run(self: Arc<Self>) {
let external_addrs = self.session().p2p().settings().external_addrs.clone();
let mut current_attempt = 0;
loop {
if current_attempt >= 1 {
// TODO: make this a configurable interval
sleep(600).await;
}
// Only proceed if the external address is not configured.
if external_addrs.is_empty() {
current_attempt += 1;
continue
}
for addr in external_addrs.iter() {
debug!(target: "net::inbound_session::ping_self",
"Attempting a version exchange addr={}", addr);
if ping_node(addr.clone(), self.session().p2p()).await {
debug!(target: "net::inbound_session::ping_self",
"Version exchange successful! Updating last seen addr={}", addr);
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
let mut addrs = self.addrs.lock().await;
if addrs.contains_key(addr) {
let val = addrs.get_mut(&addr).unwrap();
*val = last_seen;
}
addrs.insert(addr.clone(), last_seen);
} else {
// Either our external addr is invalid or our max inbound
// connection count has been reached.
warn!(target: "net::inbound_session::ping_self",
"Version exchange failed! addr={}", addr);
}
}
current_attempt += 1;
}
}
fn session(&self) -> InboundSessionPtr {
self.session.upgrade()
}
}