feat: NodeExit future core implementation (#6166)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Nil Medvedev
2024-01-25 12:11:47 +01:00
committed by GitHub
parent dad9ebd0aa
commit 2901b3cb33
2 changed files with 84 additions and 22 deletions

View File

@@ -223,8 +223,6 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
// launch the node
let handle = node_config.launch::<Ext>(ext, executor).await?;
// wait for node exit
handle.wait_for_node_exit().await
}

View File

@@ -20,9 +20,10 @@ use crate::{
utils::{get_single_header, write_peers_to_file},
version::SHORT_VERSION,
};
use eyre::Context;
use core::future::Future;
use eyre::WrapErr;
use fdlimit::raise_fd_limit;
use futures::{future::Either, stream, stream_select, StreamExt};
use futures::{future::Either, stream, stream_select, FutureExt, StreamExt};
use metrics_exporter_prometheus::PrometheusHandle;
use once_cell::sync::Lazy;
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode};
@@ -62,6 +63,7 @@ use reth_network_api::{NetworkInfo, PeersInfo};
use reth_node_builder::EthEngineTypes;
#[cfg(feature = "optimism")]
use reth_node_builder::OptimismEngineTypes;
use std::task::ready;
use reth_payload_builder::PayloadBuilderHandle;
use reth_primitives::{
@@ -98,7 +100,9 @@ use secp256k1::SecretKey;
use std::{
net::{SocketAddr, SocketAddrV4},
path::PathBuf,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{
mpsc::{unbounded_channel, Receiver, UnboundedSender},
@@ -1311,12 +1315,11 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
.await?;
}
// wait for node exit future
let node_exit_future = NodeExitFuture::new(rx, self.config.debug.terminate);
// construct node handle and return
let node_handle = NodeHandle {
rpc_server_handles,
consensus_engine_rx: rx,
terminate: self.config.debug.terminate,
};
let node_handle = NodeHandle { rpc_server_handles, node_exit_future };
Ok(node_handle)
}
@@ -1355,12 +1358,9 @@ pub struct NodeHandle {
/// The handles to the RPC servers
rpc_server_handles: RethRpcServerHandles,
/// The receiver half of the channel for the consensus engine.
/// This can be used to wait for the consensus engine to exit.
consensus_engine_rx: oneshot::Receiver<Result<(), BeaconConsensusEngineError>>,
/// Flag indicating whether the node should be terminated after the pipeline sync.
terminate: bool,
/// A Future which waits node exit
/// See [`NodeExitFuture`]
node_exit_future: NodeExitFuture,
}
impl NodeHandle {
@@ -1369,16 +1369,52 @@ impl NodeHandle {
&self.rpc_server_handles
}
/// Waits for the node to exit, if it was configured to exit.
/// Waits for the node to exit. Uses [`NodeExitFuture`]
pub async fn wait_for_node_exit(self) -> eyre::Result<()> {
self.consensus_engine_rx.await??;
self.node_exit_future.await
}
}
if self.terminate {
Ok(())
/// A Future which resolves when the node exits
#[derive(Debug)]
pub struct NodeExitFuture {
/// The receiver half of the channel for the consensus engine.
/// This can be used to wait for the consensus engine to exit.
consensus_engine_rx: Option<oneshot::Receiver<Result<(), BeaconConsensusEngineError>>>,
/// Flag indicating whether the node should be terminated after the pipeline sync.
terminate: bool,
}
impl NodeExitFuture {
fn new(
consensus_engine_rx: oneshot::Receiver<Result<(), BeaconConsensusEngineError>>,
terminate: bool,
) -> Self {
Self { consensus_engine_rx: Some(consensus_engine_rx), terminate }
}
}
impl Future for NodeExitFuture {
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if let Some(rx) = this.consensus_engine_rx.as_mut() {
match ready!(rx.poll_unpin(cx)) {
Ok(res) => {
this.consensus_engine_rx.take();
res?;
if this.terminate {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
Err(err) => Poll::Ready(Err(err.into())),
}
} else {
// The pipeline has finished downloading blocks up to `--debug.tip` or
// `--debug.max-block`. Keep other node components alive for further usage.
futures::future::pending().await
Poll::Pending
}
}
}
@@ -1411,6 +1447,7 @@ pub async fn spawn_node(config: NodeConfig) -> eyre::Result<(NodeHandle, TaskMan
#[cfg(test)]
mod tests {
use super::*;
use futures::future::poll_fn;
use reth_primitives::U256;
use reth_rpc_api::EthApiClient;
@@ -1452,6 +1489,33 @@ mod tests {
}
}
#[tokio::test]
async fn test_node_exit_future_terminate_true() {
let (tx, rx) = oneshot::channel::<Result<(), BeaconConsensusEngineError>>();
let _ = tx.send(Ok(()));
let node_exit_future = NodeExitFuture::new(rx, true);
let res = node_exit_future.await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_node_exit_future_terminate_false() {
let (tx, rx) = oneshot::channel::<Result<(), BeaconConsensusEngineError>>();
let _ = tx.send(Ok(()));
let mut node_exit_future = NodeExitFuture::new(rx, false);
poll_fn(|cx| {
assert!(node_exit_future.poll_unpin(cx).is_pending());
Poll::Ready(())
})
.await;
}
#[cfg(feature = "optimism")]
#[tokio::test]
async fn optimism_pre_canyon_no_withdrawals_valid() {