Compare commits

...

5 Commits

Author SHA1 Message Date
joshieDo
c5aa6782fa docs: fix build_engine_orchestrator doc comments
Amp-Thread-ID: https://ampcode.com/threads/T-019d96a7-4bee-7528-8c27-75b8d57f247d
Co-authored-by: Amp <amp@ampcode.com>
2026-04-16 16:10:20 +01:00
joshieDo
bb7276bdea refactor: remove RpcHandle convenience methods, restore doc comment
Amp-Thread-ID: https://ampcode.com/threads/T-019d96a7-4bee-7528-8c27-75b8d57f247d
Co-authored-by: Amp <amp@ampcode.com>
2026-04-16 16:08:28 +01:00
joshieDo
b545cded62 feat(engine): add PersistenceGate and expose on RpcHandle
Introduces PersistenceGate, a shared gate that controls whether the
engine tree handler starts new persistence cycles. Exposed on RpcHandle
so external consumers (e.g. Tempo) can pause/resume persistence via
set_persistence_enabled()/is_persistence_enabled().

Also refactors build_engine_orchestrator to accept a PersistenceHandle
directly instead of spawning it internally, removing three parameters
and simplifying the return type.

Amp-Thread-ID: https://ampcode.com/threads/T-019d96a7-4bee-7528-8c27-75b8d57f247d
Co-authored-by: Amp <amp@ampcode.com>
2026-04-16 16:05:57 +01:00
Derek Cofausper
d3c4589790 rm spawn_new doc comment per review
Co-Authored-By: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-Authored-By: Georgios Konstantopoulos <17802178+gakonst@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d961d-12d6-73da-9a80-f732efbbd314
2026-04-16 12:32:38 +00:00
Derek Cofausper
8f18164660 feat(engine): add persistence gate to PersistenceHandle
Adds a persistence gate to PersistenceHandle so external callers can
suppress new persistence cycles during latency-critical windows (e.g.
block proposal on fast-block-time chains).

PersistenceHandle::set_persistence_enabled(false) causes
should_persist() to return false, preventing new SaveBlocks from being
dispatched. In-flight persistence is unaffected. Defaults to true.

Co-Authored-By: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-Authored-By: Georgios Konstantopoulos <17802178+gakonst@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d961d-12d6-73da-9a80-f732efbbd314
2026-04-16 12:25:22 +00:00
5 changed files with 78 additions and 22 deletions

View File

@@ -19,12 +19,8 @@ use reth_evm::ConfigureEvm;
use reth_network_p2p::BlockClient;
use reth_payload_builder::PayloadBuilderHandle;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
use reth_stages_api::Pipeline;
use reth_tasks::Runtime;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
@@ -34,8 +30,8 @@ use std::sync::Arc;
/// This spawns and wires together the following components:
///
/// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync.
/// - **[`PersistenceHandle`]** — spawns the persistence service on a background thread for writing
/// blocks and performing pruning outside the critical consensus path.
/// - **[`PersistenceHandle`]** — handle to the persistence service for writing blocks and
/// performing pruning outside the critical consensus path.
/// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests
/// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state.
/// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL
@@ -55,13 +51,11 @@ pub fn build_engine_orchestrator<N, Client, S, V, C>(
incoming_requests: S,
pipeline: Pipeline<N>,
pipeline_task_spawner: Runtime,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
persistence_handle: PersistenceHandle<N::Primitives>,
payload_builder: PayloadBuilderHandle<N::Payload>,
payload_validator: V,
tree_config: TreeConfig,
sync_metrics_tx: MetricEventsSender,
evm_config: C,
changeset_cache: ChangesetCache,
runtime: Runtime,
@@ -82,9 +76,6 @@ where
{
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(

View File

@@ -14,6 +14,7 @@ use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_tasks::spawn_os_thread;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, SendError, Sender},
Arc,
},
@@ -247,6 +248,8 @@ pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
/// Guard that joins the service thread when all handles are dropped.
/// Uses `Arc` so the handle remains `Clone`.
_service_guard: Arc<ServiceGuard>,
/// External gate to suppress persistence.
persistence_gate: PersistenceGate,
}
impl<T: NodePrimitives> PersistenceHandle<T> {
@@ -255,7 +258,11 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
/// This is intended for testing purposes where you want to mock the persistence service.
/// For production use, prefer [`spawn_service`](Self::spawn_service).
pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
Self {
sender,
_service_guard: Arc::new(ServiceGuard(None)),
persistence_gate: PersistenceGate::new(),
}
}
/// Create a new [`PersistenceHandle`], and spawn the persistence service.
@@ -286,6 +293,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
PersistenceHandle {
sender: db_service_tx,
_service_guard: Arc::new(ServiceGuard(Some(join_handle))),
persistence_gate: PersistenceGate::new(),
}
}
@@ -298,6 +306,11 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
self.sender.send(action)
}
/// Returns a clone of the [`PersistenceGate`].
pub fn persistence_gate(&self) -> PersistenceGate {
self.persistence_gate.clone()
}
/// Tells the persistence service to save a certain list of finalized blocks. The blocks are
/// assumed to be ordered by block number.
///
@@ -371,6 +384,41 @@ impl Drop for ServiceGuard {
}
}
/// Shared gate that controls whether the engine tree handler starts new persistence cycles.
///
/// When disabled,
/// [`EngineApiTreeHandler::should_persist`](crate::tree::EngineApiTreeHandler::should_persist)
/// returns `false`, preventing new persistence cycles from starting. An in-flight persistence
/// task is unaffected.
#[derive(Clone, Debug)]
pub struct PersistenceGate(Arc<AtomicBool>);
impl PersistenceGate {
/// Creates a new gate that is enabled by default.
pub fn new() -> Self {
Self(Arc::new(AtomicBool::new(true)))
}
/// Returns `true` if persistence cycles are enabled.
pub fn is_enabled(&self) -> bool {
self.0.load(Ordering::Relaxed)
}
/// Sets the persistence gate.
///
/// When set to `false`, no new persistence cycles will be started by the tree handler.
/// An in-flight persistence task is unaffected. Set back to `true` to resume.
pub fn set_enabled(&self, enabled: bool) {
self.0.store(enabled, Ordering::Relaxed);
}
}
impl Default for PersistenceGate {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -2010,9 +2010,13 @@ where
}
/// Returns true if the canonical chain length minus the last persisted
/// block is greater than or equal to the persistence threshold and
/// backfill is not running.
pub const fn should_persist(&self) -> bool {
/// block is greater than or equal to the persistence threshold,
/// backfill is not running, and the external persistence gate is open.
pub fn should_persist(&self) -> bool {
if !self.persistence.persistence_gate().is_enabled() {
return false
}
if !self.backfill_sync_state.is_idle() {
// can't persist if backfill is running
return false

View File

@@ -15,6 +15,7 @@ use reth_engine_tree::{
chain::{ChainEvent, FromOrchestrator},
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
launch::build_engine_orchestrator,
persistence::PersistenceHandle,
tree::TreeConfig,
};
use reth_engine_util::EngineMessageStreamExt;
@@ -226,6 +227,14 @@ impl EngineNodeLauncher {
EngineApiKind::Ethereum
};
let persistence_handle =
PersistenceHandle::<<T::Types as NodeTypes>::Primitives>::spawn_service(
ctx.provider_factory().clone(),
pruner,
ctx.sync_metrics_tx(),
);
let persistence_gate = persistence_handle.persistence_gate();
let mut orchestrator = build_engine_orchestrator(
engine_kind,
consensus.clone(),
@@ -233,13 +242,11 @@ impl EngineNodeLauncher {
Box::pin(consensus_engine_stream),
pipeline,
ctx.task_executor().clone(),
ctx.provider_factory().clone(),
ctx.blockchain_db().clone(),
pruner,
persistence_handle,
ctx.components().payload_builder_handle().clone(),
engine_validator,
engine_tree_config,
ctx.sync_metrics_tx(),
ctx.components().evm_config().clone(),
changeset_cache,
ctx.task_executor().clone(),
@@ -271,6 +278,7 @@ impl EngineNodeLauncher {
engine_events,
beacon_engine_handle,
engine_shutdown: _,
persistence_gate: _,
} = add_ons.launch_add_ons(add_ons_ctx).await?;
// Create engine shutdown handle
@@ -409,6 +417,7 @@ impl EngineNodeLauncher {
engine_events,
beacon_engine_handle,
engine_shutdown,
persistence_gate,
},
};
// Notify on node started

View File

@@ -4,8 +4,8 @@ pub use jsonrpsee::{
core::middleware::layer::Either,
server::middleware::rpc::{RpcService, RpcServiceBuilder},
};
use reth_engine_tree::tree::WaitForCaches;
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
use reth_engine_tree::{persistence::PersistenceGate, tree::WaitForCaches};
pub use reth_rpc_builder::{
middleware::{RethAuthHttpMiddleware, RethRpcMiddleware},
Identity, Stack,
@@ -345,6 +345,8 @@ pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
pub beacon_engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
/// Handle to trigger engine shutdown.
pub engine_shutdown: EngineShutdown,
/// Gate to suppress engine persistence cycles.
pub persistence_gate: PersistenceGate,
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, EthApi> {
@@ -355,6 +357,7 @@ impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, Et
engine_events: self.engine_events.clone(),
beacon_engine_handle: self.beacon_engine_handle.clone(),
engine_shutdown: self.engine_shutdown.clone(),
persistence_gate: self.persistence_gate.clone(),
}
}
}
@@ -1084,6 +1087,7 @@ where
engine_events,
beacon_engine_handle: engine_handle,
engine_shutdown: EngineShutdown::default(),
persistence_gate: PersistenceGate::default(),
})
}