net: create system::run_until_completion() to ensure ping_node() does not create zombie process

This commit refactors the ping_node method to use a system function
called run_until_completion(), which ensures a task will safely complete even
if it's parent task has been cancelled.

This happens in ping_node() in the case the handshake is ongoing but the
p2p network has been destroyed.

We also introduce a timeout for ping_node() to prevent
perform_handshake_protocols from running forever and blocking
channel.stop() from being safely invoked.
This commit is contained in:
draoi
2024-01-26 11:06:48 +01:00
parent 715f6c7a86
commit cecf284cef
6 changed files with 79 additions and 14 deletions

View File

@@ -155,7 +155,7 @@ impl Lilith {
let (entry, position) = hosts.whitelist_fetch_last().await;
let url = &entry.0;
if !ping_node(url, p2p.clone()).await {
if !ping_node(url.clone(), p2p.clone()).await {
let (_addr, last_seen) = hosts.get_whitelist_entry_at_addr(url).await.unwrap();
hosts.greylist_store_or_update(&[(url.clone(), last_seen)]).await;

View File

@@ -16,7 +16,10 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{sync::Arc, time::UNIX_EPOCH};
use std::{
sync::Arc,
time::{Duration, UNIX_EPOCH},
};
use log::{debug, warn};
use url::Url;
@@ -24,7 +27,9 @@ use url::Url;
use super::super::p2p::{P2p, P2pPtr};
use crate::{
net::{connector::Connector, protocol::ProtocolVersion, session::Session},
system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr},
system::{
run_until_completion, sleep, timeout::timeout, LazyWeak, StoppableTask, StoppableTaskPtr,
},
Error,
};
@@ -104,7 +109,7 @@ impl GreylistRefinery {
}
let mut greylist = hosts.greylist.write().await;
if !ping_node(url, self.p2p().clone()).await {
if !ping_node(url.clone(), self.p2p().clone()).await {
greylist.remove(position);
debug!(
target: "net::refinery",
@@ -137,14 +142,28 @@ impl GreylistRefinery {
}
}
// Ping a node to check it's online.
pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
/// Check a node is online by establishing a channel with it and conducting a handshake with a
/// version exchange.
///
/// We must use run_until_completion() to ensure this code will complete even if the parent task
/// has been destroyed. Otherwise ping_node() will become a zombie process if the rest of the p2p
/// network has been shutdown but the handshake it still ongoing.
///
/// Other parts of the p2p stack have safe shutdown methods built into them due to the ownership
/// structure. Here we are creating a outbound session that is not owned by anything and is not
/// so is not safely cancelled on shutdown.
pub async fn ping_node(addr: Url, p2p: P2pPtr) -> bool {
let ex = p2p.executor();
run_until_completion(ping_node_impl(addr.clone(), p2p), ex).await
}
async fn ping_node_impl(addr: Url, p2p: P2pPtr) -> bool {
let session_outbound = p2p.session_outbound();
let parent = Arc::downgrade(&session_outbound);
let connector = Connector::new(p2p.settings(), parent);
debug!(target: "net::refinery::ping_node()", "Attempting to connect to {}", addr);
match connector.connect(addr).await {
match connector.connect(&addr).await {
Ok((_url, channel)) => {
debug!(target: "net::refinery::ping_node()", "Connected successfully!");
let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
@@ -155,16 +174,21 @@ pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
p2p.executor(),
);
// Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if
// the handshake does not finish channel.stop() will never get called, resulting in
// zombie processes.
let result = timeout(Duration::from_secs(5), handshake_task).await;
channel.clone().start(p2p.executor());
match handshake_task.await {
Ok(()) => {
match result {
Ok(_) => {
debug!(target: "net::refinery::ping_node()", "Handshake success! Stopping channel.");
channel.stop().await;
true
}
Err(e) => {
debug!(target: "net::refinery::ping_node()", "Handshake failure! {}", e);
debug!(target: "net::refinery::ping_node()", "Handshake timed out! {}", e);
channel.stop().await;
false
}

View File

@@ -1235,10 +1235,33 @@ mod tests {
super::super::{settings::Settings, P2p},
*,
};
use crate::system::sleep;
use crate::{net::hosts::refinery::ping_node, system::sleep};
use smol::Executor;
use std::{sync::Arc, time::UNIX_EPOCH};
#[test]
fn test_ping_node() {
smol::block_on(async {
let settings = Settings {
localnet: false,
external_addrs: vec![
Url::parse("tcp://foo.bar:123").unwrap(),
Url::parse("tcp://lol.cat:321").unwrap(),
],
..Default::default()
};
let ex = Arc::new(Executor::new());
let p2p = P2p::new(settings, ex.clone()).await;
let url = Url::parse("tcp://xeno.systems.wtf").unwrap();
println!("Pinging node...");
let task = ex.spawn(ping_node(url.clone(), p2p));
ex.run(task).await;
println!("Ping node complete!");
});
}
#[test]
fn test_is_local_host() {
smol::block_on(async {

View File

@@ -197,7 +197,7 @@ impl ProtocolAddress {
debug!(target: "net::protocol_address::send_my_addrs()", "Attempting to ping self");
// See if we can do a version exchange with ourself.
if ping_node(&addr, self.p2p.clone()).await {
if ping_node(addr.clone(), self.p2p.clone()).await {
// We're online. Update last_seen and broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));

View File

@@ -77,7 +77,7 @@ impl ProtocolSeed {
debug!(target: "net::protocol_seed::send_my_addrs()", "Attempting to ping self");
// See if we can do a version exchange with ourself.
if ping_node(&addr, self.p2p.clone()).await {
if ping_node(addr.clone(), self.p2p.clone()).await {
// We're online. Update last_seen and broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));

View File

@@ -18,7 +18,7 @@
use std::{sync::Arc, time::Duration};
use smol::{Executor, Timer};
use smol::{future::Future, Executor, Timer};
/// Condition variable which allows a task to block until woken up
pub mod condvar;
@@ -58,3 +58,21 @@ pub async fn sleep_forever() {
pub async fn msleep(millis: u64) {
Timer::after(Duration::from_millis(millis)).await;
}
/// Run a task until it has fully completed, irrespective of whether the parent task still exists.
pub async fn run_until_completion<'a, R: Send + 'a, F: Future<Output = R> + Send + 'a>(
func: F,
executor: Arc<Executor<'a>>,
) -> R {
let (sender, recv_queue) = smol::channel::bounded::<R>(1);
executor
.spawn(async move {
let result = func.await;
// We ignore this result: an error would mean the parent task has been cancelled,
// which is valid behavior.
let _ = sender.send(result).await;
})
.detach();
// This should never panic because it would mean the detached task has not completed.
recv_queue.recv().await.expect("Run until completion task failed")
}