chore: add engine terminate (#20420)

Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2025-12-18 10:01:36 +01:00
committed by GitHub
parent 8e00e81af4
commit 701e5ec455
8 changed files with 275 additions and 26 deletions

View File

@@ -3,7 +3,7 @@
use crate::{
common::{Attached, LaunchContextWith, WithConfigs},
hooks::NodeHooks,
rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
setup::build_networked_pipeline,
AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
@@ -13,6 +13,7 @@ use futures::{stream_select, StreamExt};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::{
chain::FromOrchestrator,
engine::{EngineApiRequest, EngineRequestHandler},
tree::TreeConfig,
};
@@ -260,8 +261,16 @@ impl EngineNodeLauncher {
)),
);
let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
add_ons.launch_add_ons(add_ons_ctx).await?;
let RpcHandle {
rpc_server_handles,
rpc_registry,
engine_events,
beacon_engine_handle,
engine_shutdown: _,
} = add_ons.launch_add_ons(add_ons_ctx).await?;
// Create engine shutdown handle
let (engine_shutdown, mut shutdown_rx) = EngineShutdown::new();
// Run consensus engine to completion
let initial_target = ctx.initial_backfill_target()?;
@@ -295,6 +304,14 @@ impl EngineNodeLauncher {
// advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
loop {
tokio::select! {
shutdown_req = &mut shutdown_rx => {
if let Ok(req) = shutdown_req {
debug!(target: "reth::cli", "received engine shutdown request");
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
FromOrchestrator::Terminate { tx: req.done_tx }.into()
);
}
}
payload = built_payloads.select_next_some() => {
if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
@@ -366,6 +383,7 @@ impl EngineNodeLauncher {
rpc_registry,
engine_events,
beacon_engine_handle,
engine_shutdown,
},
};
// Notify on node started

View File

@@ -11,6 +11,7 @@ use crate::{
use alloy_rpc_types::engine::ClientVersionV1;
use alloy_rpc_types_engine::ExecutionData;
use jsonrpsee::{core::middleware::layer::Either, RpcModule};
use parking_lot::Mutex;
use reth_chain_state::CanonStateSubscriptions;
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks, Hardforks};
use reth_node_api::{
@@ -41,7 +42,9 @@ use std::{
fmt::{self, Debug},
future::Future,
ops::{Deref, DerefMut},
sync::Arc,
};
use tokio::sync::oneshot;
/// Contains the handles to the spawned RPC servers.
///
@@ -332,6 +335,8 @@ pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
pub engine_events: EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
/// Handle to the beacon consensus engine.
pub beacon_engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
/// Handle to trigger engine shutdown.
pub engine_shutdown: EngineShutdown,
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, EthApi> {
@@ -341,6 +346,7 @@ impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, Et
rpc_registry: self.rpc_registry.clone(),
engine_events: self.engine_events.clone(),
beacon_engine_handle: self.beacon_engine_handle.clone(),
engine_shutdown: self.engine_shutdown.clone(),
}
}
}
@@ -361,6 +367,7 @@ where
f.debug_struct("RpcHandle")
.field("rpc_server_handles", &self.rpc_server_handles)
.field("rpc_registry", &self.rpc_registry)
.field("engine_shutdown", &self.engine_shutdown)
.finish()
}
}
@@ -956,6 +963,7 @@ where
rpc_registry: registry,
engine_events,
beacon_engine_handle: engine_handle,
engine_shutdown: EngineShutdown::default(),
})
}
@@ -1428,3 +1436,48 @@ impl IntoEngineApiRpcModule for NoopEngineApi {
RpcModule::new(())
}
}
/// Handle to trigger graceful engine shutdown.
///
/// This handle can be used to request a graceful shutdown of the engine,
/// which will persist all remaining in-memory blocks before terminating.
#[derive(Clone, Debug)]
pub struct EngineShutdown {
/// Channel to send shutdown signal.
tx: Arc<Mutex<Option<oneshot::Sender<EngineShutdownRequest>>>>,
}
impl EngineShutdown {
/// Creates a new [`EngineShutdown`] handle and returns the receiver.
pub fn new() -> (Self, oneshot::Receiver<EngineShutdownRequest>) {
let (tx, rx) = oneshot::channel();
(Self { tx: Arc::new(Mutex::new(Some(tx))) }, rx)
}
/// Requests a graceful engine shutdown.
///
/// All remaining in-memory blocks will be persisted before the engine terminates.
///
/// Returns a receiver that resolves when shutdown is complete.
/// Returns `None` if shutdown was already triggered.
pub fn shutdown(&self) -> Option<oneshot::Receiver<()>> {
let mut guard = self.tx.lock();
let tx = guard.take()?;
let (done_tx, done_rx) = oneshot::channel();
let _ = tx.send(EngineShutdownRequest { done_tx });
Some(done_rx)
}
}
impl Default for EngineShutdown {
fn default() -> Self {
Self { tx: Arc::new(Mutex::new(None)) }
}
}
/// Request to shutdown the engine.
#[derive(Debug)]
pub struct EngineShutdownRequest {
/// Channel to signal shutdown completion.
pub done_tx: oneshot::Sender<()>,
}