From cecf284cef1ec45cb5a15276ec7679585b0cf828 Mon Sep 17 00:00:00 2001 From: draoi Date: Fri, 26 Jan 2024 11:06:48 +0100 Subject: [PATCH] net: create system::run_until_completion() to ensure ping_node() does not create zombie process This commit refactors the ping_node method to use a system function called run_until_completion(), which ensures a task will safely complete even if it's parent task has been cancelled. This happens in ping_node() in the case the handshake is ongoing but the p2p network has been destroyed. We also introduce a timeout for ping_node() to prevent perform_handshake_protocols from running forever and blocking channel.stop() from being safely invoked. --- bin/lilith/src/main.rs | 2 +- src/net/hosts/refinery.rs | 42 ++++++++++++++++++++++------ src/net/hosts/store.rs | 25 ++++++++++++++++- src/net/protocol/protocol_address.rs | 2 +- src/net/protocol/protocol_seed.rs | 2 +- src/system/mod.rs | 20 ++++++++++++- 6 files changed, 79 insertions(+), 14 deletions(-) diff --git a/bin/lilith/src/main.rs b/bin/lilith/src/main.rs index 3bfa8b5b7..e64165ffc 100644 --- a/bin/lilith/src/main.rs +++ b/bin/lilith/src/main.rs @@ -155,7 +155,7 @@ impl Lilith { let (entry, position) = hosts.whitelist_fetch_last().await; let url = &entry.0; - if !ping_node(url, p2p.clone()).await { + if !ping_node(url.clone(), p2p.clone()).await { let (_addr, last_seen) = hosts.get_whitelist_entry_at_addr(url).await.unwrap(); hosts.greylist_store_or_update(&[(url.clone(), last_seen)]).await; diff --git a/src/net/hosts/refinery.rs b/src/net/hosts/refinery.rs index 3283ffa8a..e3f5d61d8 100644 --- a/src/net/hosts/refinery.rs +++ b/src/net/hosts/refinery.rs @@ -16,7 +16,10 @@ * along with this program. If not, see . */ -use std::{sync::Arc, time::UNIX_EPOCH}; +use std::{ + sync::Arc, + time::{Duration, UNIX_EPOCH}, +}; use log::{debug, warn}; use url::Url; @@ -24,7 +27,9 @@ use url::Url; use super::super::p2p::{P2p, P2pPtr}; use crate::{ net::{connector::Connector, protocol::ProtocolVersion, session::Session}, - system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr}, + system::{ + run_until_completion, sleep, timeout::timeout, LazyWeak, StoppableTask, StoppableTaskPtr, + }, Error, }; @@ -104,7 +109,7 @@ impl GreylistRefinery { } let mut greylist = hosts.greylist.write().await; - if !ping_node(url, self.p2p().clone()).await { + if !ping_node(url.clone(), self.p2p().clone()).await { greylist.remove(position); debug!( target: "net::refinery", @@ -137,14 +142,28 @@ impl GreylistRefinery { } } -// Ping a node to check it's online. -pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool { +/// Check a node is online by establishing a channel with it and conducting a handshake with a +/// version exchange. +/// +/// We must use run_until_completion() to ensure this code will complete even if the parent task +/// has been destroyed. Otherwise ping_node() will become a zombie process if the rest of the p2p +/// network has been shutdown but the handshake it still ongoing. +/// +/// Other parts of the p2p stack have safe shutdown methods built into them due to the ownership +/// structure. Here we are creating a outbound session that is not owned by anything and is not +/// so is not safely cancelled on shutdown. +pub async fn ping_node(addr: Url, p2p: P2pPtr) -> bool { + let ex = p2p.executor(); + run_until_completion(ping_node_impl(addr.clone(), p2p), ex).await +} + +async fn ping_node_impl(addr: Url, p2p: P2pPtr) -> bool { let session_outbound = p2p.session_outbound(); let parent = Arc::downgrade(&session_outbound); let connector = Connector::new(p2p.settings(), parent); debug!(target: "net::refinery::ping_node()", "Attempting to connect to {}", addr); - match connector.connect(addr).await { + match connector.connect(&addr).await { Ok((_url, channel)) => { debug!(target: "net::refinery::ping_node()", "Connected successfully!"); let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await; @@ -155,16 +174,21 @@ pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool { p2p.executor(), ); + // Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if + // the handshake does not finish channel.stop() will never get called, resulting in + // zombie processes. + let result = timeout(Duration::from_secs(5), handshake_task).await; + channel.clone().start(p2p.executor()); - match handshake_task.await { - Ok(()) => { + match result { + Ok(_) => { debug!(target: "net::refinery::ping_node()", "Handshake success! Stopping channel."); channel.stop().await; true } Err(e) => { - debug!(target: "net::refinery::ping_node()", "Handshake failure! {}", e); + debug!(target: "net::refinery::ping_node()", "Handshake timed out! {}", e); channel.stop().await; false } diff --git a/src/net/hosts/store.rs b/src/net/hosts/store.rs index a09d3127d..2b13f7b03 100644 --- a/src/net/hosts/store.rs +++ b/src/net/hosts/store.rs @@ -1235,10 +1235,33 @@ mod tests { super::super::{settings::Settings, P2p}, *, }; - use crate::system::sleep; + use crate::{net::hosts::refinery::ping_node, system::sleep}; use smol::Executor; use std::{sync::Arc, time::UNIX_EPOCH}; + #[test] + fn test_ping_node() { + smol::block_on(async { + let settings = Settings { + localnet: false, + external_addrs: vec![ + Url::parse("tcp://foo.bar:123").unwrap(), + Url::parse("tcp://lol.cat:321").unwrap(), + ], + ..Default::default() + }; + + let ex = Arc::new(Executor::new()); + let p2p = P2p::new(settings, ex.clone()).await; + + let url = Url::parse("tcp://xeno.systems.wtf").unwrap(); + println!("Pinging node..."); + let task = ex.spawn(ping_node(url.clone(), p2p)); + ex.run(task).await; + println!("Ping node complete!"); + }); + } + #[test] fn test_is_local_host() { smol::block_on(async { diff --git a/src/net/protocol/protocol_address.rs b/src/net/protocol/protocol_address.rs index fe29e76e7..405d66fc3 100644 --- a/src/net/protocol/protocol_address.rs +++ b/src/net/protocol/protocol_address.rs @@ -197,7 +197,7 @@ impl ProtocolAddress { debug!(target: "net::protocol_address::send_my_addrs()", "Attempting to ping self"); // See if we can do a version exchange with ourself. - if ping_node(&addr, self.p2p.clone()).await { + if ping_node(addr.clone(), self.p2p.clone()).await { // We're online. Update last_seen and broadcast our address. let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); addrs.push((addr, last_seen)); diff --git a/src/net/protocol/protocol_seed.rs b/src/net/protocol/protocol_seed.rs index 58d97c6b6..a405024fd 100644 --- a/src/net/protocol/protocol_seed.rs +++ b/src/net/protocol/protocol_seed.rs @@ -77,7 +77,7 @@ impl ProtocolSeed { debug!(target: "net::protocol_seed::send_my_addrs()", "Attempting to ping self"); // See if we can do a version exchange with ourself. - if ping_node(&addr, self.p2p.clone()).await { + if ping_node(addr.clone(), self.p2p.clone()).await { // We're online. Update last_seen and broadcast our address. let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); addrs.push((addr, last_seen)); diff --git a/src/system/mod.rs b/src/system/mod.rs index dce93f448..763394257 100644 --- a/src/system/mod.rs +++ b/src/system/mod.rs @@ -18,7 +18,7 @@ use std::{sync::Arc, time::Duration}; -use smol::{Executor, Timer}; +use smol::{future::Future, Executor, Timer}; /// Condition variable which allows a task to block until woken up pub mod condvar; @@ -58,3 +58,21 @@ pub async fn sleep_forever() { pub async fn msleep(millis: u64) { Timer::after(Duration::from_millis(millis)).await; } + +/// Run a task until it has fully completed, irrespective of whether the parent task still exists. +pub async fn run_until_completion<'a, R: Send + 'a, F: Future + Send + 'a>( + func: F, + executor: Arc>, +) -> R { + let (sender, recv_queue) = smol::channel::bounded::(1); + executor + .spawn(async move { + let result = func.await; + // We ignore this result: an error would mean the parent task has been cancelled, + // which is valid behavior. + let _ = sender.send(result).await; + }) + .detach(); + // This should never panic because it would mean the detached task has not completed. + recv_queue.recv().await.expect("Run until completion task failed") +}