mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
5 Commits
snapv2
...
joshie/per
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c5aa6782fa | ||
|
|
bb7276bdea | ||
|
|
b545cded62 | ||
|
|
d3c4589790 | ||
|
|
8f18164660 |
@@ -19,12 +19,8 @@ use reth_evm::ConfigureEvm;
|
||||
use reth_network_p2p::BlockClient;
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, ProviderNodeTypes},
|
||||
ProviderFactory,
|
||||
};
|
||||
use reth_prune::PrunerWithFactory;
|
||||
use reth_stages_api::{MetricEventsSender, Pipeline};
|
||||
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
|
||||
use reth_stages_api::Pipeline;
|
||||
use reth_tasks::Runtime;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use std::sync::Arc;
|
||||
@@ -34,8 +30,8 @@ use std::sync::Arc;
|
||||
/// This spawns and wires together the following components:
|
||||
///
|
||||
/// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync.
|
||||
/// - **[`PersistenceHandle`]** — spawns the persistence service on a background thread for writing
|
||||
/// blocks and performing pruning outside the critical consensus path.
|
||||
/// - **[`PersistenceHandle`]** — handle to the persistence service for writing blocks and
|
||||
/// performing pruning outside the critical consensus path.
|
||||
/// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests
|
||||
/// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state.
|
||||
/// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL
|
||||
@@ -55,13 +51,11 @@ pub fn build_engine_orchestrator<N, Client, S, V, C>(
|
||||
incoming_requests: S,
|
||||
pipeline: Pipeline<N>,
|
||||
pipeline_task_spawner: Runtime,
|
||||
provider: ProviderFactory<N>,
|
||||
blockchain_db: BlockchainProvider<N>,
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
persistence_handle: PersistenceHandle<N::Primitives>,
|
||||
payload_builder: PayloadBuilderHandle<N::Payload>,
|
||||
payload_validator: V,
|
||||
tree_config: TreeConfig,
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
runtime: Runtime,
|
||||
@@ -82,9 +76,6 @@ where
|
||||
{
|
||||
let downloader = BasicBlockDownloader::new(client, consensus.clone());
|
||||
|
||||
let persistence_handle =
|
||||
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
|
||||
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
|
||||
|
||||
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
|
||||
|
||||
@@ -14,6 +14,7 @@ use reth_stages_api::{MetricEvent, MetricEventsSender};
|
||||
use reth_tasks::spawn_os_thread;
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::{Receiver, SendError, Sender},
|
||||
Arc,
|
||||
},
|
||||
@@ -247,6 +248,8 @@ pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
|
||||
/// Guard that joins the service thread when all handles are dropped.
|
||||
/// Uses `Arc` so the handle remains `Clone`.
|
||||
_service_guard: Arc<ServiceGuard>,
|
||||
/// External gate to suppress persistence.
|
||||
persistence_gate: PersistenceGate,
|
||||
}
|
||||
|
||||
impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
@@ -255,7 +258,11 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
/// This is intended for testing purposes where you want to mock the persistence service.
|
||||
/// For production use, prefer [`spawn_service`](Self::spawn_service).
|
||||
pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
|
||||
Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
|
||||
Self {
|
||||
sender,
|
||||
_service_guard: Arc::new(ServiceGuard(None)),
|
||||
persistence_gate: PersistenceGate::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new [`PersistenceHandle`], and spawn the persistence service.
|
||||
@@ -286,6 +293,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
PersistenceHandle {
|
||||
sender: db_service_tx,
|
||||
_service_guard: Arc::new(ServiceGuard(Some(join_handle))),
|
||||
persistence_gate: PersistenceGate::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,6 +306,11 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
self.sender.send(action)
|
||||
}
|
||||
|
||||
/// Returns a clone of the [`PersistenceGate`].
|
||||
pub fn persistence_gate(&self) -> PersistenceGate {
|
||||
self.persistence_gate.clone()
|
||||
}
|
||||
|
||||
/// Tells the persistence service to save a certain list of finalized blocks. The blocks are
|
||||
/// assumed to be ordered by block number.
|
||||
///
|
||||
@@ -371,6 +384,41 @@ impl Drop for ServiceGuard {
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared gate that controls whether the engine tree handler starts new persistence cycles.
|
||||
///
|
||||
/// When disabled,
|
||||
/// [`EngineApiTreeHandler::should_persist`](crate::tree::EngineApiTreeHandler::should_persist)
|
||||
/// returns `false`, preventing new persistence cycles from starting. An in-flight persistence
|
||||
/// task is unaffected.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PersistenceGate(Arc<AtomicBool>);
|
||||
|
||||
impl PersistenceGate {
|
||||
/// Creates a new gate that is enabled by default.
|
||||
pub fn new() -> Self {
|
||||
Self(Arc::new(AtomicBool::new(true)))
|
||||
}
|
||||
|
||||
/// Returns `true` if persistence cycles are enabled.
|
||||
pub fn is_enabled(&self) -> bool {
|
||||
self.0.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Sets the persistence gate.
|
||||
///
|
||||
/// When set to `false`, no new persistence cycles will be started by the tree handler.
|
||||
/// An in-flight persistence task is unaffected. Set back to `true` to resume.
|
||||
pub fn set_enabled(&self, enabled: bool) {
|
||||
self.0.store(enabled, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for PersistenceGate {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -2010,9 +2010,13 @@ where
|
||||
}
|
||||
|
||||
/// Returns true if the canonical chain length minus the last persisted
|
||||
/// block is greater than or equal to the persistence threshold and
|
||||
/// backfill is not running.
|
||||
pub const fn should_persist(&self) -> bool {
|
||||
/// block is greater than or equal to the persistence threshold,
|
||||
/// backfill is not running, and the external persistence gate is open.
|
||||
pub fn should_persist(&self) -> bool {
|
||||
if !self.persistence.persistence_gate().is_enabled() {
|
||||
return false
|
||||
}
|
||||
|
||||
if !self.backfill_sync_state.is_idle() {
|
||||
// can't persist if backfill is running
|
||||
return false
|
||||
|
||||
@@ -15,6 +15,7 @@ use reth_engine_tree::{
|
||||
chain::{ChainEvent, FromOrchestrator},
|
||||
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
|
||||
launch::build_engine_orchestrator,
|
||||
persistence::PersistenceHandle,
|
||||
tree::TreeConfig,
|
||||
};
|
||||
use reth_engine_util::EngineMessageStreamExt;
|
||||
@@ -226,6 +227,14 @@ impl EngineNodeLauncher {
|
||||
EngineApiKind::Ethereum
|
||||
};
|
||||
|
||||
let persistence_handle =
|
||||
PersistenceHandle::<<T::Types as NodeTypes>::Primitives>::spawn_service(
|
||||
ctx.provider_factory().clone(),
|
||||
pruner,
|
||||
ctx.sync_metrics_tx(),
|
||||
);
|
||||
let persistence_gate = persistence_handle.persistence_gate();
|
||||
|
||||
let mut orchestrator = build_engine_orchestrator(
|
||||
engine_kind,
|
||||
consensus.clone(),
|
||||
@@ -233,13 +242,11 @@ impl EngineNodeLauncher {
|
||||
Box::pin(consensus_engine_stream),
|
||||
pipeline,
|
||||
ctx.task_executor().clone(),
|
||||
ctx.provider_factory().clone(),
|
||||
ctx.blockchain_db().clone(),
|
||||
pruner,
|
||||
persistence_handle,
|
||||
ctx.components().payload_builder_handle().clone(),
|
||||
engine_validator,
|
||||
engine_tree_config,
|
||||
ctx.sync_metrics_tx(),
|
||||
ctx.components().evm_config().clone(),
|
||||
changeset_cache,
|
||||
ctx.task_executor().clone(),
|
||||
@@ -271,6 +278,7 @@ impl EngineNodeLauncher {
|
||||
engine_events,
|
||||
beacon_engine_handle,
|
||||
engine_shutdown: _,
|
||||
persistence_gate: _,
|
||||
} = add_ons.launch_add_ons(add_ons_ctx).await?;
|
||||
|
||||
// Create engine shutdown handle
|
||||
@@ -409,6 +417,7 @@ impl EngineNodeLauncher {
|
||||
engine_events,
|
||||
beacon_engine_handle,
|
||||
engine_shutdown,
|
||||
persistence_gate,
|
||||
},
|
||||
};
|
||||
// Notify on node started
|
||||
|
||||
@@ -4,8 +4,8 @@ pub use jsonrpsee::{
|
||||
core::middleware::layer::Either,
|
||||
server::middleware::rpc::{RpcService, RpcServiceBuilder},
|
||||
};
|
||||
use reth_engine_tree::tree::WaitForCaches;
|
||||
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
|
||||
use reth_engine_tree::{persistence::PersistenceGate, tree::WaitForCaches};
|
||||
pub use reth_rpc_builder::{
|
||||
middleware::{RethAuthHttpMiddleware, RethRpcMiddleware},
|
||||
Identity, Stack,
|
||||
@@ -345,6 +345,8 @@ pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
|
||||
pub beacon_engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
|
||||
/// Handle to trigger engine shutdown.
|
||||
pub engine_shutdown: EngineShutdown,
|
||||
/// Gate to suppress engine persistence cycles.
|
||||
pub persistence_gate: PersistenceGate,
|
||||
}
|
||||
|
||||
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, EthApi> {
|
||||
@@ -355,6 +357,7 @@ impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, Et
|
||||
engine_events: self.engine_events.clone(),
|
||||
beacon_engine_handle: self.beacon_engine_handle.clone(),
|
||||
engine_shutdown: self.engine_shutdown.clone(),
|
||||
persistence_gate: self.persistence_gate.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1084,6 +1087,7 @@ where
|
||||
engine_events,
|
||||
beacon_engine_handle: engine_handle,
|
||||
engine_shutdown: EngineShutdown::default(),
|
||||
persistence_gate: PersistenceGate::default(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user