diff --git a/Cargo.lock b/Cargo.lock index 8bb5dd0105..48cec8dd50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4956,6 +4956,7 @@ dependencies = [ "reth-payload-builder", "reth-primitives", "reth-provider", + "reth-prune", "reth-revm", "reth-revm-inspectors", "reth-rlp", @@ -5029,6 +5030,7 @@ dependencies = [ "reth-payload-builder", "reth-primitives", "reth-provider", + "reth-prune", "reth-rpc-types", "reth-stages", "reth-tasks", @@ -5554,6 +5556,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "reth-prune" +version = "0.1.0-alpha.3" +dependencies = [ + "reth-primitives", + "thiserror", + "tracing", +] + [[package]] name = "reth-revm" version = "0.1.0-alpha.3" diff --git a/Cargo.toml b/Cargo.toml index c73f2f87ab..94e6ca26fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "crates/net/downloaders", "crates/payload/basic", "crates/primitives", + "crates/prune", "crates/revm", "crates/revm/revm-primitives", "crates/revm/revm-inspectors", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 1a2fc95912..eb81a41931 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -36,6 +36,7 @@ reth-payload-builder = { workspace = true } reth-basic-payload-builder = { path = "../../crates/payload/basic" } reth-discv4 = { path = "../../crates/net/discv4" } reth-metrics = { workspace = true } +reth-prune = { path = "../../crates/prune" } jemallocator = { version = "0.5.0", optional = true } jemalloc-ctl = { version = "0.5.0", optional = true } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 13fd00d78e..d2ed282561 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -360,6 +360,11 @@ impl Command { None }; + let pruner = config.prune.map(|prune_config| { + info!(target: "reth::cli", "Pruner initialized"); + reth_prune::Pruner::new(prune_config.block_interval, tree_config.max_reorg_depth()) + }); + // Configure the consensus engine let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( client, @@ -374,6 +379,7 @@ impl Command { MIN_BLOCKS_FOR_PIPELINE_RUN, consensus_engine_tx, consensus_engine_rx, + pruner, )?; info!(target: "reth::cli", "Consensus engine initialized"); diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 92c821064c..43fe1be6fa 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -750,12 +750,21 @@ impl BlockchainTree /// /// This finalizes `last_finalized_block` prior to reading the canonical hashes (using /// [`BlockchainTree::finalize_block`]). - pub fn restore_canonical_hashes( + pub fn restore_canonical_hashes_and_finalize( &mut self, last_finalized_block: BlockNumber, ) -> Result<(), Error> { self.finalize_block(last_finalized_block); + self.restore_canonical_hashes() + } + + /// Reads the last `N` canonical hashes from the database and updates the block indices of the + /// tree. + /// + /// `N` is the `max_reorg_depth` plus the number of block hashes needed to satisfy the + /// `BLOCKHASH` opcode in the EVM. + pub fn restore_canonical_hashes(&mut self) -> Result<(), Error> { let num_of_canonical_hashes = self.config.max_reorg_depth() + self.config.num_of_additional_canonical_block_hashes(); @@ -1578,7 +1587,7 @@ mod tests { .assert(&tree); // update canonical block to b2, this would make b2a be removed - assert_eq!(tree.restore_canonical_hashes(12), Ok(())); + assert_eq!(tree.restore_canonical_hashes_and_finalize(12), Ok(())); assert_eq!(tree.is_block_known(block2.num_hash()).unwrap(), Some(BlockStatus::Valid)); diff --git a/crates/blockchain-tree/src/config.rs b/crates/blockchain-tree/src/config.rs index 681c8f61b3..3c56acc565 100644 --- a/crates/blockchain-tree/src/config.rs +++ b/crates/blockchain-tree/src/config.rs @@ -1,7 +1,7 @@ //! Blockchain tree configuration /// The configuration for the blockchain tree. -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub struct BlockchainTreeConfig { /// Number of blocks after the last finalized block that we are storing. /// diff --git a/crates/blockchain-tree/src/shareable.rs b/crates/blockchain-tree/src/shareable.rs index a63b18dd26..d99901dc8e 100644 --- a/crates/blockchain-tree/src/shareable.rs +++ b/crates/blockchain-tree/src/shareable.rs @@ -66,10 +66,21 @@ impl BlockchainTreeEngine tree.update_chains_metrics(); } - fn restore_canonical_hashes(&self, last_finalized_block: BlockNumber) -> Result<(), Error> { + fn restore_canonical_hashes_and_finalize( + &self, + last_finalized_block: BlockNumber, + ) -> Result<(), Error> { trace!(target: "blockchain_tree", ?last_finalized_block, "Restoring canonical hashes for last finalized block"); let mut tree = self.tree.write(); - let res = tree.restore_canonical_hashes(last_finalized_block); + let res = tree.restore_canonical_hashes_and_finalize(last_finalized_block); + tree.update_chains_metrics(); + res + } + + fn restore_canonical_hashes(&self) -> Result<(), Error> { + trace!(target: "blockchain_tree", "Restoring canonical hashes"); + let mut tree = self.tree.write(); + let res = tree.restore_canonical_hashes(); tree.update_chains_metrics(); res } diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 64845e971d..5f7ba35dd2 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -19,6 +19,7 @@ reth-rpc-types = { workspace = true } reth-tasks = { workspace = true } reth-payload-builder = { workspace = true } reth-metrics = { workspace = true } +reth-prune = { path = "../../prune" } # async tokio = { workspace = true, features = ["sync"] } diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs index 355197576a..b78b3828bd 100644 --- a/crates/consensus/beacon/src/engine/error.rs +++ b/crates/consensus/beacon/src/engine/error.rs @@ -1,3 +1,4 @@ +use reth_prune::PrunerError; use reth_rpc_types::engine::ForkchoiceUpdateError; use reth_stages::PipelineError; @@ -16,6 +17,12 @@ pub enum BeaconConsensusEngineError { /// Pipeline error. #[error(transparent)] Pipeline(#[from] Box), + /// Pruner channel closed. + #[error("Pruner channel closed")] + PrunerChannelClosed, + /// Pruner error. + #[error(transparent)] + Pruner(#[from] PrunerError), /// Common error. Wrapper around [reth_interfaces::Error]. #[error(transparent)] Common(#[from] reth_interfaces::Error), diff --git a/crates/consensus/beacon/src/engine/metrics.rs b/crates/consensus/beacon/src/engine/metrics.rs index 14d68f4dd7..04080e93be 100644 --- a/crates/consensus/beacon/src/engine/metrics.rs +++ b/crates/consensus/beacon/src/engine/metrics.rs @@ -13,6 +13,8 @@ pub(crate) struct EngineMetrics { pub(crate) forkchoice_updated_messages: Counter, /// The total count of new payload messages received. pub(crate) new_payload_messages: Counter, + /// The number of times the pruner was run. + pub(crate) pruner_runs: Counter, } /// Metrics for the `EngineSyncController`. diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 10e5332329..9403597278 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -3,6 +3,7 @@ use crate::{ forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker}, message::OnForkChoiceUpdated, metrics::EngineMetrics, + prune::{EnginePruneController, EnginePruneEvent}, }, sync::{EngineSyncController, EngineSyncEvent}, }; @@ -28,6 +29,7 @@ use reth_provider::{ BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ProviderError, StageCheckpointReader, }; +use reth_prune::Pruner; use reth_rpc_types::engine::{ ExecutionPayload, PayloadAttributes, PayloadStatus, PayloadStatusEnum, PayloadValidationError, }; @@ -60,13 +62,15 @@ use invalid_headers::InvalidHeaderCache; mod event; pub use event::BeaconConsensusEngineEvent; -mod forkchoice; -mod metrics; -pub(crate) mod sync; mod handle; pub use handle::BeaconConsensusEngineHandle; +mod forkchoice; +mod metrics; +pub(crate) mod prune; +pub(crate) mod sync; + /// The maximum number of invalid headers that can be tracked by the engine. const MAX_INVALID_HEADERS: u32 = 512u32; @@ -187,6 +191,8 @@ where /// blocks using the pipeline. Otherwise, the engine, sync controller, and blockchain tree will /// be used to download and execute the missing blocks. pipeline_run_threshold: u64, + /// Controls pruning triggered by engine updates. + prune: Option, } impl BeaconConsensusEngine @@ -213,7 +219,8 @@ where payload_builder: PayloadBuilderHandle, target: Option, pipeline_run_threshold: u64, - ) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> { + pruner: Option, + ) -> Result<(Self, BeaconConsensusEngineHandle), Error> { let (to_engine, rx) = mpsc::unbounded_channel(); Self::with_channel( client, @@ -228,6 +235,7 @@ where pipeline_run_threshold, to_engine, rx, + pruner, ) } @@ -257,15 +265,17 @@ where pipeline_run_threshold: u64, to_engine: UnboundedSender, rx: UnboundedReceiver, - ) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> { + pruner: Option, + ) -> Result<(Self, BeaconConsensusEngineHandle), Error> { let handle = BeaconConsensusEngineHandle { to_engine }; let sync = EngineSyncController::new( pipeline, client, - task_spawner, + task_spawner.clone(), run_pipeline_continuously, max_block, ); + let prune = pruner.map(|pruner| EnginePruneController::new(pruner, task_spawner)); let mut this = Self { sync, blockchain, @@ -278,6 +288,7 @@ where invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS), metrics: EngineMetrics::default(), pipeline_run_threshold, + prune, }; let maybe_pipeline_target = match target { @@ -304,7 +315,7 @@ where /// # Returns /// /// A target block hash if the pipeline is inconsistent, otherwise `None`. - fn check_pipeline_consistency(&self) -> Result, reth_interfaces::Error> { + fn check_pipeline_consistency(&self) -> Result, Error> { // If no target was provided, check if the stages are congruent - check if the // checkpoint of the last stage matches the checkpoint of the first. let first_stage_checkpoint = self @@ -532,7 +543,7 @@ where &mut self, state: ForkchoiceState, attrs: Option, - tx: oneshot::Sender>, + tx: oneshot::Sender>, ) -> bool { self.metrics.forkchoice_updated_messages.increment(1); self.blockchain.on_forkchoice_update_received(&state); @@ -583,7 +594,7 @@ where &mut self, state: ForkchoiceState, attrs: Option, - ) -> Result { + ) -> Result { trace!(target: "consensus::engine", ?state, "Received new forkchoice state update"); if state.head_block_hash.is_zero() { return Ok(OnForkChoiceUpdated::invalid_state()) @@ -602,6 +613,17 @@ where return Ok(OnForkChoiceUpdated::syncing()) } + if self.is_prune_active() { + // We can only process new forkchoice updates if the pruner is idle, since it requires + // exclusive access to the database + warn!( + target: "consensus::engine", + "Pruning is in progress, skipping forkchoice update. \ + This may affect the performance of your node as a validator." + ); + return Ok(OnForkChoiceUpdated::syncing()) + } + let status = match self.blockchain.make_canonical(&state.head_block_hash) { Ok(outcome) => { if !outcome.is_already_canonical() { @@ -654,7 +676,7 @@ where &self, head: SealedHeader, update: &ForkchoiceState, - ) -> Result<(), reth_interfaces::Error> { + ) -> Result<(), Error> { let mut head_block = Head { number: head.number, hash: head.hash, @@ -899,11 +921,14 @@ where return Ok(status) } - let res = if self.sync.is_pipeline_idle() { - // we can only insert new payloads if the pipeline is _not_ running, because it holds - // exclusive access to the database + let res = if self.sync.is_pipeline_idle() && self.is_prune_idle() { + // we can only insert new payloads if the pipeline and the pruner are _not_ running, + // because they hold exclusive access to the database self.try_insert_new_payload(block) } else { + if self.is_prune_active() { + warn!(target: "consensus::engine", "Pruning is in progress, buffering new payload."); + } self.try_buffer_payload(block) }; @@ -964,12 +989,12 @@ where Ok(block) } - /// When the pipeline is actively syncing the tree is unable to commit any additional blocks - /// since the pipeline holds exclusive access to the database. + /// When the pipeline or the pruner is active, the tree is unable to commit any additional + /// blocks since the pipeline holds exclusive access to the database. /// /// In this scenario we buffer the payload in the tree if the payload is valid, once the - /// pipeline finished syncing the tree is then able to also use the buffered payloads to commit - /// to a (newer) canonical chain. + /// pipeline or pruner is finished, the tree is then able to also use the buffered payloads to + /// commit to a (newer) canonical chain. /// /// This will return `SYNCING` if the block was buffered successfully, and an error if an error /// occurred while buffering the block. @@ -984,7 +1009,7 @@ where /// Attempts to insert a new payload into the tree. /// - /// Caution: This expects that the pipeline is idle. + /// Caution: This expects that the pipeline and the pruner are idle. #[instrument(level = "trace", skip_all, target = "consensus::engine", ret)] fn try_insert_new_payload( &mut self, @@ -1063,14 +1088,11 @@ where /// /// If the given block is missing from the database, this will return `false`. Otherwise, `true` /// is returned: the database contains the hash and the tree was updated. - fn update_tree_on_finished_pipeline( - &mut self, - block_hash: H256, - ) -> Result { + fn update_tree_on_finished_pipeline(&mut self, block_hash: H256) -> Result { let synced_to_finalized = match self.blockchain.block_number(block_hash)? { Some(number) => { // Attempt to restore the tree. - self.blockchain.restore_canonical_hashes(number)?; + self.blockchain.restore_canonical_hashes_and_finalize(number)?; true } None => false, @@ -1078,6 +1100,14 @@ where Ok(synced_to_finalized) } + /// Attempt to restore the tree. + /// + /// This is invoked after a pruner run to update the tree with the most recent canonical + /// hashes. + fn update_tree_on_finished_pruner(&mut self) -> Result<(), Error> { + self.blockchain.restore_canonical_hashes() + } + /// Invoked if we successfully downloaded a new block from the network. /// /// This will attempt to insert the block into the tree. @@ -1226,9 +1256,7 @@ where // it's part of the canonical chain: if it's the safe or the finalized block if matches!( err, - reth_interfaces::Error::Execution( - BlockExecutionError::BlockHashNotFoundInChain { .. } - ) + Error::Execution(BlockExecutionError::BlockHashNotFoundInChain { .. }) ) { // if the inserted block is the currently targeted `finalized` or `safe` // block, we will attempt to make them canonical, @@ -1250,9 +1278,9 @@ where /// This returns a result to indicate whether the engine future should resolve (fatal error). fn on_sync_event( &mut self, - ev: EngineSyncEvent, + event: EngineSyncEvent, ) -> Option> { - match ev { + match event { EngineSyncEvent::FetchedFullBlock(block) => { self.on_downloaded_block(block); } @@ -1416,6 +1444,55 @@ where None } + + /// Event handler for events emitted by the [EnginePruneController]. + /// + /// This returns a result to indicate whether the engine future should resolve (fatal error). + fn on_prune_event( + &mut self, + event: EnginePruneEvent, + ) -> Option> { + match event { + EnginePruneEvent::NotReady => {} + EnginePruneEvent::Started(tip_block_number) => { + trace!(target: "consensus::engine", %tip_block_number, "Pruner started"); + self.metrics.pruner_runs.increment(1); + } + EnginePruneEvent::TaskDropped => { + error!(target: "consensus::engine", "Failed to receive spawned pruner"); + return Some(Err(BeaconConsensusEngineError::PrunerChannelClosed)) + } + EnginePruneEvent::Finished { result } => { + trace!(target: "consensus::engine", ?result, "Pruner finished"); + match result { + Ok(_) => { + // Update the state and hashes of the blockchain tree if possible. + match self.update_tree_on_finished_pruner() { + Ok(()) => {} + Err(error) => { + error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state"); + return Some(Err(error.into())) + } + }; + } + // Any pruner error at this point is fatal. + Err(error) => return Some(Err(error.into())), + }; + } + }; + + None + } + + /// Returns `true` if the prune controller's pruner is idle. + fn is_prune_idle(&self) -> bool { + self.prune.as_ref().map(|prune| prune.is_pruner_idle()).unwrap_or(true) + } + + /// Returns `true` if the prune controller's pruner is active. + fn is_prune_active(&self) -> bool { + !self.is_prune_idle() + } } /// On initialization, the consensus engine will poll the message receiver and return @@ -1446,6 +1523,7 @@ where // SyncController, hence they are polled first, and they're also time sensitive. loop { let mut engine_messages_pending = false; + let mut sync_pending = false; // handle next engine message match this.engine_message_rx.poll_next_unpin(cx) { @@ -1484,10 +1562,28 @@ where } } Poll::Pending => { - if engine_messages_pending { - // both the sync and the engine message receiver are pending - return Poll::Pending + // no more sync events to process + sync_pending = true; + } + } + + // check prune events if pipeline is idle AND (pruning is running and we need to + // prioritize checking its events OR no engine and sync messages are pending and we may + // start pruning) + if this.sync.is_pipeline_idle() && + (this.is_prune_active() || engine_messages_pending & sync_pending) + { + if let Some(ref mut prune) = this.prune { + match prune.poll(cx, this.blockchain.canonical_tip().number) { + Poll::Ready(prune_event) => { + if let Some(res) = this.on_prune_event(prune_event) { + return Poll::Ready(res) + } + } + Poll::Pending => return Poll::Pending, } + } else { + return Poll::Pending } } } @@ -1680,6 +1776,9 @@ mod tests { let shareable_db = ProviderFactory::new(db.clone(), self.chain_spec.clone()); let latest = self.chain_spec.genesis_header().seal_slow(); let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest); + + let pruner = Pruner::new(5, 0); + let (mut engine, handle) = BeaconConsensusEngine::new( NoopFullBlockClient::default(), pipeline, @@ -1691,6 +1790,7 @@ mod tests { payload_builder, None, self.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN), + Some(pruner), ) .expect("failed to create consensus engine"); @@ -1767,21 +1867,41 @@ mod tests { std::thread::sleep(Duration::from_millis(100)); assert_matches!(rx.try_recv(), Err(TryRecvError::Empty)); - // consensus engine is still idle + // consensus engine is still idle because no FCUs were received let _ = env.send_new_payload(SealedBlock::default().into()).await; assert_matches!(rx.try_recv(), Err(TryRecvError::Empty)); - // consensus engine receives a forkchoice state and triggers the pipeline + // consensus engine is still idle because pruning is running let _ = env .send_forkchoice_updated(ForkchoiceState { head_block_hash: H256::random(), ..Default::default() }) .await; - assert_matches!( - rx.await, - Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) - ); + assert_matches!(rx.try_recv(), Err(TryRecvError::Empty)); + + // consensus engine receives a forkchoice state and triggers the pipeline when pruning is + // finished + loop { + match rx.try_recv() { + Ok(result) => { + assert_matches!( + result, + Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed)) + ); + break + } + Err(TryRecvError::Empty) => { + let _ = env + .send_forkchoice_updated(ForkchoiceState { + head_block_hash: H256::random(), + ..Default::default() + }) + .await; + } + Err(err) => panic!("receive error: {err}"), + } + } } // Test that the consensus engine runs the pipeline again if the tree cannot be restored. diff --git a/crates/consensus/beacon/src/engine/prune.rs b/crates/consensus/beacon/src/engine/prune.rs new file mode 100644 index 0000000000..855456f354 --- /dev/null +++ b/crates/consensus/beacon/src/engine/prune.rs @@ -0,0 +1,146 @@ +//! Prune management for the engine implementation. + +use futures::FutureExt; +use reth_primitives::BlockNumber; +use reth_prune::{Pruner, PrunerError, PrunerWithResult}; +use reth_tasks::TaskSpawner; +use std::task::{ready, Context, Poll}; +use tokio::sync::oneshot; + +/// Manages pruning under the control of the engine. +/// +/// This type controls the [Pruner]. +pub(crate) struct EnginePruneController { + /// The current state of the pruner. + pruner_state: PrunerState, + /// The type that can spawn the pruner task. + pruner_task_spawner: Box, +} + +impl EnginePruneController { + /// Create a new instance + pub(crate) fn new(pruner: Pruner, pruner_task_spawner: Box) -> Self { + Self { pruner_state: PrunerState::Idle(Some(pruner)), pruner_task_spawner } + } + + /// Returns `true` if the pruner is idle. + pub(crate) fn is_pruner_idle(&self) -> bool { + self.pruner_state.is_idle() + } + + /// Advances the pruner state. + /// + /// This checks for the result in the channel, or returns pending if the pruner is idle. + fn poll_pruner(&mut self, cx: &mut Context<'_>) -> Poll { + let res = match self.pruner_state { + PrunerState::Idle(_) => return Poll::Pending, + PrunerState::Running(ref mut fut) => { + ready!(fut.poll_unpin(cx)) + } + }; + let ev = match res { + Ok((pruner, result)) => { + self.pruner_state = PrunerState::Idle(Some(pruner)); + EnginePruneEvent::Finished { result } + } + Err(_) => { + // failed to receive the pruner + EnginePruneEvent::TaskDropped + } + }; + Poll::Ready(ev) + } + + /// This will try to spawn the pruner if it is idle: + /// 1. Check if pruning is needed through [Pruner::is_pruning_needed]. + /// 2a. If pruning is needed, pass tip block number to the [Pruner::run] and spawn it in a + /// separate task. Set pruner state to [PrunerState::Running]. + /// 2b. If pruning is not needed, set pruner state back to [PrunerState::Idle]. + /// + /// If pruner is already running, do nothing. + fn try_spawn_pruner(&mut self, tip_block_number: BlockNumber) -> Option { + match &mut self.pruner_state { + PrunerState::Idle(pruner) => { + let mut pruner = pruner.take()?; + + // Check tip for pruning + if pruner.is_pruning_needed(tip_block_number) { + let (tx, rx) = oneshot::channel(); + self.pruner_task_spawner.spawn_critical_blocking( + "pruner task", + Box::pin(async move { + let result = pruner.run(tip_block_number); + let _ = tx.send((pruner, result)); + }), + ); + self.pruner_state = PrunerState::Running(rx); + + Some(EnginePruneEvent::Started(tip_block_number)) + } else { + self.pruner_state = PrunerState::Idle(Some(pruner)); + Some(EnginePruneEvent::NotReady) + } + } + PrunerState::Running(_) => None, + } + } + + /// Advances the prune process with the tip block number. + pub(crate) fn poll( + &mut self, + cx: &mut Context<'_>, + tip_block_number: BlockNumber, + ) -> Poll { + // Try to spawn a pruner + match self.try_spawn_pruner(tip_block_number) { + Some(EnginePruneEvent::NotReady) => return Poll::Pending, + Some(event) => return Poll::Ready(event), + None => (), + } + + // Poll pruner and check its status + self.poll_pruner(cx) + } +} + +/// The event type emitted by the [EnginePruneController]. +#[derive(Debug)] +pub(crate) enum EnginePruneEvent { + /// Pruner is not ready + NotReady, + /// Pruner started with tip block number + Started(BlockNumber), + /// Pruner finished + /// + /// If this is returned, the pruner is idle. + Finished { + /// Final result of the pruner run. + result: Result<(), PrunerError>, + }, + /// Pruner task was dropped after it was started, unable to receive it because channel + /// closed. This would indicate a panicked pruner task + TaskDropped, +} + +/// The possible pruner states within the sync controller. +/// +/// [PrunerState::Idle] means that the pruner is currently idle. +/// [PrunerState::Running] means that the pruner is currently running. +/// +/// NOTE: The differentiation between these two states is important, because when the pruner is +/// running, it acquires the write lock over the database. This means that we cannot forward to the +/// blockchain tree any messages that would result in database writes, since it would result in a +/// deadlock. +enum PrunerState { + /// Pruner is idle. + Idle(Option), + /// Pruner is running and waiting for a response + Running(oneshot::Receiver), +} + +impl PrunerState { + /// Returns `true` if the state matches idle. + fn is_idle(&self) -> bool { + matches!(self, PrunerState::Idle(_)) + } +} diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index b422583f2d..02da63d5ae 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -166,9 +166,9 @@ where return false } trace!( - target: "consensus::engine", + target: "consensus::engine::sync", ?hash, - "start downloading full block." + "Start downloading full block" ); let request = self.full_block_client.get_full_block(hash); self.inflight_full_block_requests.push(request); @@ -191,10 +191,10 @@ where self.max_block.map(|target| progress >= target).unwrap_or_default(); if has_reached_max_block { trace!( - target: "consensus::engine", + target: "consensus::engine::sync", ?progress, max_block = ?self.max_block, - "Consensus engine reached max block." + "Consensus engine reached max block" ); } has_reached_max_block diff --git a/crates/interfaces/src/blockchain_tree/mod.rs b/crates/interfaces/src/blockchain_tree/mod.rs index 63f628c49d..ce844ee0b3 100644 --- a/crates/interfaces/src/blockchain_tree/mod.rs +++ b/crates/interfaces/src/blockchain_tree/mod.rs @@ -62,7 +62,17 @@ pub trait BlockchainTreeEngine: BlockchainTreeViewer + Send + Sync { /// /// This finalizes `last_finalized_block` prior to reading the canonical hashes (using /// [`BlockchainTreeEngine::finalize_block`]). - fn restore_canonical_hashes(&self, last_finalized_block: BlockNumber) -> Result<(), Error>; + fn restore_canonical_hashes_and_finalize( + &self, + last_finalized_block: BlockNumber, + ) -> Result<(), Error>; + + /// Reads the last `N` canonical hashes from the database and updates the block indices of the + /// tree. + /// + /// `N` is the `max_reorg_depth` plus the number of block hashes needed to satisfy the + /// `BLOCKHASH` opcode in the EVM. + fn restore_canonical_hashes(&self) -> Result<(), Error>; /// Make a block and its parent chain part of the canonical chain by committing it to the /// database. diff --git a/crates/primitives/src/stage/checkpoints.rs b/crates/primitives/src/stage/checkpoints.rs index c6de28ff63..4d7311dd06 100644 --- a/crates/primitives/src/stage/checkpoints.rs +++ b/crates/primitives/src/stage/checkpoints.rs @@ -220,7 +220,7 @@ impl StageCheckpoint { /// Get the underlying [`EntitiesCheckpoint`], if any, to determine the number of entities /// processed, and the number of total entities to process. pub fn entities(&self) -> Option { - let Some(stage_checkpoint) = self.stage_checkpoint else { return None }; + let stage_checkpoint = self.stage_checkpoint?; match stage_checkpoint { StageUnitCheckpoint::Account(AccountHashingCheckpoint { diff --git a/crates/prune/Cargo.toml b/crates/prune/Cargo.toml new file mode 100644 index 0000000000..56b0c49a01 --- /dev/null +++ b/crates/prune/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "reth-prune" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = """ +Pruning implementation +""" + +[dependencies] +# reth +reth-primitives = { workspace = true } + +# misc +tracing = { workspace = true } +thiserror = { workspace = true } + diff --git a/crates/prune/src/error.rs b/crates/prune/src/error.rs new file mode 100644 index 0000000000..96f0a25b7d --- /dev/null +++ b/crates/prune/src/error.rs @@ -0,0 +1,4 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum PrunerError {} diff --git a/crates/prune/src/lib.rs b/crates/prune/src/lib.rs new file mode 100644 index 0000000000..6d133030fc --- /dev/null +++ b/crates/prune/src/lib.rs @@ -0,0 +1,5 @@ +mod error; +mod pruner; + +pub use error::PrunerError; +pub use pruner::{Pruner, PrunerResult, PrunerWithResult}; diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs new file mode 100644 index 0000000000..9f3a60b86c --- /dev/null +++ b/crates/prune/src/pruner.rs @@ -0,0 +1,83 @@ +//! Support for pruning. + +use crate::PrunerError; +use reth_primitives::BlockNumber; +use tracing::debug; + +/// Result of [Pruner::run] execution +pub type PrunerResult = Result<(), PrunerError>; + +/// The pipeline type itself with the result of [Pruner::run] +pub type PrunerWithResult = (Pruner, PrunerResult); + +/// Pruning routine. Main pruning logic happens in [Pruner::run]. +pub struct Pruner { + /// Minimum pruning interval measured in blocks. All prune parts are checked and, if needed, + /// pruned, when the chain advances by the specified number of blocks. + min_block_interval: u64, + /// Maximum prune depth. Used to determine the pruning target for parts that are needed during + /// the reorg, e.g. changesets. + #[allow(dead_code)] + max_prune_depth: u64, + /// Last pruned block number. Used in conjunction with `min_block_interval` to determine + /// when the pruning needs to be initiated. + last_pruned_block_number: Option, +} + +impl Pruner { + /// Creates a new [Pruner]. + pub fn new(min_block_interval: u64, max_prune_depth: u64) -> Self { + Self { min_block_interval, max_prune_depth, last_pruned_block_number: None } + } + + /// Run the pruner + pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { + // Pruning logic + + self.last_pruned_block_number = Some(tip_block_number); + Ok(()) + } + + /// Returns `true` if the pruning is needed at the provided tip block number. + /// This determined by the check against minimum pruning interval and last pruned block number. + pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool { + if self.last_pruned_block_number.map_or(true, |last_pruned_block_number| { + // Saturating subtraction is needed for the case when the chain was reverted, meaning + // current block number might be less than the previously pruned block number. If + // that's the case, no pruning is needed as outdated data is also reverted. + tip_block_number.saturating_sub(last_pruned_block_number) >= self.min_block_interval + }) { + debug!( + target: "pruner", + last_pruned_block_number = ?self.last_pruned_block_number, + %tip_block_number, + "Minimum pruning interval reached" + ); + true + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use crate::Pruner; + + #[test] + fn pruner_is_pruning_needed() { + let pruner = Pruner::new(5, 0); + + // No last pruned block number was set before + let first_block_number = 1; + assert!(pruner.is_pruning_needed(first_block_number)); + + // Delta is not less than min block interval + let second_block_number = first_block_number + pruner.min_block_interval; + assert!(pruner.is_pruning_needed(second_block_number)); + + // Delta is less than min block interval + let third_block_number = second_block_number; + assert!(pruner.is_pruning_needed(third_block_number)); + } +} diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 81c123323f..a7c3ecbe4f 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -594,8 +594,15 @@ where self.tree.finalize_block(finalized_block) } - fn restore_canonical_hashes(&self, last_finalized_block: BlockNumber) -> Result<()> { - self.tree.restore_canonical_hashes(last_finalized_block) + fn restore_canonical_hashes_and_finalize( + &self, + last_finalized_block: BlockNumber, + ) -> Result<()> { + self.tree.restore_canonical_hashes_and_finalize(last_finalized_block) + } + + fn restore_canonical_hashes(&self) -> Result<()> { + self.tree.restore_canonical_hashes() } fn make_canonical(&self, block_hash: &BlockHash) -> Result {