diff --git a/bin/lilith/src/main.rs b/bin/lilith/src/main.rs
index 3bfa8b5b7..e64165ffc 100644
--- a/bin/lilith/src/main.rs
+++ b/bin/lilith/src/main.rs
@@ -155,7 +155,7 @@ impl Lilith {
let (entry, position) = hosts.whitelist_fetch_last().await;
let url = &entry.0;
- if !ping_node(url, p2p.clone()).await {
+ if !ping_node(url.clone(), p2p.clone()).await {
let (_addr, last_seen) = hosts.get_whitelist_entry_at_addr(url).await.unwrap();
hosts.greylist_store_or_update(&[(url.clone(), last_seen)]).await;
diff --git a/src/net/hosts/refinery.rs b/src/net/hosts/refinery.rs
index 3283ffa8a..e3f5d61d8 100644
--- a/src/net/hosts/refinery.rs
+++ b/src/net/hosts/refinery.rs
@@ -16,7 +16,10 @@
* along with this program. If not, see .
*/
-use std::{sync::Arc, time::UNIX_EPOCH};
+use std::{
+ sync::Arc,
+ time::{Duration, UNIX_EPOCH},
+};
use log::{debug, warn};
use url::Url;
@@ -24,7 +27,9 @@ use url::Url;
use super::super::p2p::{P2p, P2pPtr};
use crate::{
net::{connector::Connector, protocol::ProtocolVersion, session::Session},
- system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr},
+ system::{
+ run_until_completion, sleep, timeout::timeout, LazyWeak, StoppableTask, StoppableTaskPtr,
+ },
Error,
};
@@ -104,7 +109,7 @@ impl GreylistRefinery {
}
let mut greylist = hosts.greylist.write().await;
- if !ping_node(url, self.p2p().clone()).await {
+ if !ping_node(url.clone(), self.p2p().clone()).await {
greylist.remove(position);
debug!(
target: "net::refinery",
@@ -137,14 +142,28 @@ impl GreylistRefinery {
}
}
-// Ping a node to check it's online.
-pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
+/// Check a node is online by establishing a channel with it and conducting a handshake with a
+/// version exchange.
+///
+/// We must use run_until_completion() to ensure this code will complete even if the parent task
+/// has been destroyed. Otherwise ping_node() will become a zombie process if the rest of the p2p
+/// network has been shutdown but the handshake it still ongoing.
+///
+/// Other parts of the p2p stack have safe shutdown methods built into them due to the ownership
+/// structure. Here we are creating a outbound session that is not owned by anything and is not
+/// so is not safely cancelled on shutdown.
+pub async fn ping_node(addr: Url, p2p: P2pPtr) -> bool {
+ let ex = p2p.executor();
+ run_until_completion(ping_node_impl(addr.clone(), p2p), ex).await
+}
+
+async fn ping_node_impl(addr: Url, p2p: P2pPtr) -> bool {
let session_outbound = p2p.session_outbound();
let parent = Arc::downgrade(&session_outbound);
let connector = Connector::new(p2p.settings(), parent);
debug!(target: "net::refinery::ping_node()", "Attempting to connect to {}", addr);
- match connector.connect(addr).await {
+ match connector.connect(&addr).await {
Ok((_url, channel)) => {
debug!(target: "net::refinery::ping_node()", "Connected successfully!");
let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
@@ -155,16 +174,21 @@ pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
p2p.executor(),
);
+ // Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if
+ // the handshake does not finish channel.stop() will never get called, resulting in
+ // zombie processes.
+ let result = timeout(Duration::from_secs(5), handshake_task).await;
+
channel.clone().start(p2p.executor());
- match handshake_task.await {
- Ok(()) => {
+ match result {
+ Ok(_) => {
debug!(target: "net::refinery::ping_node()", "Handshake success! Stopping channel.");
channel.stop().await;
true
}
Err(e) => {
- debug!(target: "net::refinery::ping_node()", "Handshake failure! {}", e);
+ debug!(target: "net::refinery::ping_node()", "Handshake timed out! {}", e);
channel.stop().await;
false
}
diff --git a/src/net/hosts/store.rs b/src/net/hosts/store.rs
index a09d3127d..2b13f7b03 100644
--- a/src/net/hosts/store.rs
+++ b/src/net/hosts/store.rs
@@ -1235,10 +1235,33 @@ mod tests {
super::super::{settings::Settings, P2p},
*,
};
- use crate::system::sleep;
+ use crate::{net::hosts::refinery::ping_node, system::sleep};
use smol::Executor;
use std::{sync::Arc, time::UNIX_EPOCH};
+ #[test]
+ fn test_ping_node() {
+ smol::block_on(async {
+ let settings = Settings {
+ localnet: false,
+ external_addrs: vec![
+ Url::parse("tcp://foo.bar:123").unwrap(),
+ Url::parse("tcp://lol.cat:321").unwrap(),
+ ],
+ ..Default::default()
+ };
+
+ let ex = Arc::new(Executor::new());
+ let p2p = P2p::new(settings, ex.clone()).await;
+
+ let url = Url::parse("tcp://xeno.systems.wtf").unwrap();
+ println!("Pinging node...");
+ let task = ex.spawn(ping_node(url.clone(), p2p));
+ ex.run(task).await;
+ println!("Ping node complete!");
+ });
+ }
+
#[test]
fn test_is_local_host() {
smol::block_on(async {
diff --git a/src/net/protocol/protocol_address.rs b/src/net/protocol/protocol_address.rs
index fe29e76e7..405d66fc3 100644
--- a/src/net/protocol/protocol_address.rs
+++ b/src/net/protocol/protocol_address.rs
@@ -197,7 +197,7 @@ impl ProtocolAddress {
debug!(target: "net::protocol_address::send_my_addrs()", "Attempting to ping self");
// See if we can do a version exchange with ourself.
- if ping_node(&addr, self.p2p.clone()).await {
+ if ping_node(addr.clone(), self.p2p.clone()).await {
// We're online. Update last_seen and broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));
diff --git a/src/net/protocol/protocol_seed.rs b/src/net/protocol/protocol_seed.rs
index 58d97c6b6..a405024fd 100644
--- a/src/net/protocol/protocol_seed.rs
+++ b/src/net/protocol/protocol_seed.rs
@@ -77,7 +77,7 @@ impl ProtocolSeed {
debug!(target: "net::protocol_seed::send_my_addrs()", "Attempting to ping self");
// See if we can do a version exchange with ourself.
- if ping_node(&addr, self.p2p.clone()).await {
+ if ping_node(addr.clone(), self.p2p.clone()).await {
// We're online. Update last_seen and broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));
diff --git a/src/system/mod.rs b/src/system/mod.rs
index dce93f448..763394257 100644
--- a/src/system/mod.rs
+++ b/src/system/mod.rs
@@ -18,7 +18,7 @@
use std::{sync::Arc, time::Duration};
-use smol::{Executor, Timer};
+use smol::{future::Future, Executor, Timer};
/// Condition variable which allows a task to block until woken up
pub mod condvar;
@@ -58,3 +58,21 @@ pub async fn sleep_forever() {
pub async fn msleep(millis: u64) {
Timer::after(Duration::from_millis(millis)).await;
}
+
+/// Run a task until it has fully completed, irrespective of whether the parent task still exists.
+pub async fn run_until_completion<'a, R: Send + 'a, F: Future