diff --git a/bin/reth/src/commands/node/mod.rs b/bin/reth/src/commands/node/mod.rs index ed48766ae9..cb616090bf 100644 --- a/bin/reth/src/commands/node/mod.rs +++ b/bin/reth/src/commands/node/mod.rs @@ -223,8 +223,6 @@ impl NodeCommand { // launch the node let handle = node_config.launch::(ext, executor).await?; - - // wait for node exit handle.wait_for_node_exit().await } diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index 3d486bc8a7..9956624b76 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -20,9 +20,10 @@ use crate::{ utils::{get_single_header, write_peers_to_file}, version::SHORT_VERSION, }; -use eyre::Context; +use core::future::Future; +use eyre::WrapErr; use fdlimit::raise_fd_limit; -use futures::{future::Either, stream, stream_select, StreamExt}; +use futures::{future::Either, stream, stream_select, FutureExt, StreamExt}; use metrics_exporter_prometheus::PrometheusHandle; use once_cell::sync::Lazy; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode}; @@ -62,6 +63,7 @@ use reth_network_api::{NetworkInfo, PeersInfo}; use reth_node_builder::EthEngineTypes; #[cfg(feature = "optimism")] use reth_node_builder::OptimismEngineTypes; +use std::task::ready; use reth_payload_builder::PayloadBuilderHandle; use reth_primitives::{ @@ -98,7 +100,9 @@ use secp256k1::SecretKey; use std::{ net::{SocketAddr, SocketAddrV4}, path::PathBuf, + pin::Pin, sync::Arc, + task::{Context, Poll}, }; use tokio::sync::{ mpsc::{unbounded_channel, Receiver, UnboundedSender}, @@ -1311,12 +1315,11 @@ impl NodeBuilderWit .await?; } + // wait for node exit future + let node_exit_future = NodeExitFuture::new(rx, self.config.debug.terminate); + // construct node handle and return - let node_handle = NodeHandle { - rpc_server_handles, - consensus_engine_rx: rx, - terminate: self.config.debug.terminate, - }; + let node_handle = NodeHandle { rpc_server_handles, node_exit_future }; Ok(node_handle) } @@ -1355,12 +1358,9 @@ pub struct NodeHandle { /// The handles to the RPC servers rpc_server_handles: RethRpcServerHandles, - /// The receiver half of the channel for the consensus engine. - /// This can be used to wait for the consensus engine to exit. - consensus_engine_rx: oneshot::Receiver>, - - /// Flag indicating whether the node should be terminated after the pipeline sync. - terminate: bool, + /// A Future which waits node exit + /// See [`NodeExitFuture`] + node_exit_future: NodeExitFuture, } impl NodeHandle { @@ -1369,16 +1369,52 @@ impl NodeHandle { &self.rpc_server_handles } - /// Waits for the node to exit, if it was configured to exit. + /// Waits for the node to exit. Uses [`NodeExitFuture`] pub async fn wait_for_node_exit(self) -> eyre::Result<()> { - self.consensus_engine_rx.await??; + self.node_exit_future.await + } +} - if self.terminate { - Ok(()) +/// A Future which resolves when the node exits +#[derive(Debug)] +pub struct NodeExitFuture { + /// The receiver half of the channel for the consensus engine. + /// This can be used to wait for the consensus engine to exit. + consensus_engine_rx: Option>>, + + /// Flag indicating whether the node should be terminated after the pipeline sync. + terminate: bool, +} + +impl NodeExitFuture { + fn new( + consensus_engine_rx: oneshot::Receiver>, + terminate: bool, + ) -> Self { + Self { consensus_engine_rx: Some(consensus_engine_rx), terminate } + } +} + +impl Future for NodeExitFuture { + type Output = eyre::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + if let Some(rx) = this.consensus_engine_rx.as_mut() { + match ready!(rx.poll_unpin(cx)) { + Ok(res) => { + this.consensus_engine_rx.take(); + res?; + if this.terminate { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + Err(err) => Poll::Ready(Err(err.into())), + } } else { - // The pipeline has finished downloading blocks up to `--debug.tip` or - // `--debug.max-block`. Keep other node components alive for further usage. - futures::future::pending().await + Poll::Pending } } } @@ -1411,6 +1447,7 @@ pub async fn spawn_node(config: NodeConfig) -> eyre::Result<(NodeHandle, TaskMan #[cfg(test)] mod tests { use super::*; + use futures::future::poll_fn; use reth_primitives::U256; use reth_rpc_api::EthApiClient; @@ -1452,6 +1489,33 @@ mod tests { } } + #[tokio::test] + async fn test_node_exit_future_terminate_true() { + let (tx, rx) = oneshot::channel::>(); + + let _ = tx.send(Ok(())); + + let node_exit_future = NodeExitFuture::new(rx, true); + + let res = node_exit_future.await; + + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_node_exit_future_terminate_false() { + let (tx, rx) = oneshot::channel::>(); + + let _ = tx.send(Ok(())); + + let mut node_exit_future = NodeExitFuture::new(rx, false); + poll_fn(|cx| { + assert!(node_exit_future.poll_unpin(cx).is_pending()); + Poll::Ready(()) + }) + .await; + } + #[cfg(feature = "optimism")] #[tokio::test] async fn optimism_pre_canyon_no_withdrawals_valid() {