From 352430cd84e1e256f081a9f3170193ca228f21f4 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Fri, 6 Feb 2026 21:22:40 +0000 Subject: [PATCH] fix: skip sender recovery stage when senders fully pruned (#21918) --- crates/cli/commands/src/stage/run.rs | 7 ++- crates/stages/stages/benches/criterion.rs | 2 +- crates/stages/stages/src/sets.rs | 51 +++++++++------- .../stages/src/stages/sender_recovery.rs | 58 ++++++++++++++++--- .../src/providers/database/provider.rs | 25 +++++++- 5 files changed, 109 insertions(+), 34 deletions(-) diff --git a/crates/cli/commands/src/stage/run.rs b/crates/cli/commands/src/stage/run.rs index eaf2328df2..0aa2e0fd16 100644 --- a/crates/cli/commands/src/stage/run.rs +++ b/crates/cli/commands/src/stage/run.rs @@ -248,9 +248,10 @@ impl (Box::new(stage), None) } StageEnum::Senders => ( - Box::new(SenderRecoveryStage::new(SenderRecoveryConfig { - commit_threshold: batch_size, - })), + Box::new(SenderRecoveryStage::new( + SenderRecoveryConfig { commit_threshold: batch_size }, + None, + )), None, ), StageEnum::Execution => ( diff --git a/crates/stages/stages/benches/criterion.rs b/crates/stages/stages/benches/criterion.rs index 655b990f25..e43c2e3c59 100644 --- a/crates/stages/stages/benches/criterion.rs +++ b/crates/stages/stages/benches/criterion.rs @@ -72,7 +72,7 @@ fn senders(c: &mut Criterion, runtime: &Runtime) { let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); - let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS }; + let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS, prune_mode: None }; measure_stage( runtime, diff --git a/crates/stages/stages/src/sets.rs b/crates/stages/stages/src/sets.rs index 2c1948307e..0edb14e0c8 100644 --- a/crates/stages/stages/src/sets.rs +++ b/crates/stages/stages/src/sets.rs @@ -52,7 +52,7 @@ use reth_evm::ConfigureEvm; use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}; use reth_primitives_traits::{Block, NodePrimitives}; use reth_provider::HeaderSyncGapProvider; -use reth_prune_types::PruneModes; +use reth_prune_types::{PruneMode, PruneModes}; use reth_stages_api::Stage; use std::sync::Arc; use tokio::sync::watch; @@ -332,23 +332,28 @@ where PruneStage: Stage, { fn builder(self) -> StageSetBuilder { - ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone()) - .builder() - // If sender recovery prune mode is set, add the prune sender recovery stage. - .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| { - PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold) - })) - .add_set(HashingStages { stages_config: self.stages_config.clone() }) - .add_set(HistoryIndexingStages { - stages_config: self.stages_config.clone(), - prune_modes: self.prune_modes.clone(), - }) - // Prune stage should be added after all hashing stages, because otherwise it will - // delete - .add_stage(PruneStage::new( - self.prune_modes.clone(), - self.stages_config.prune.commit_threshold, - )) + ExecutionStages::new( + self.evm_config, + self.consensus, + self.stages_config.clone(), + self.prune_modes.sender_recovery, + ) + .builder() + // If sender recovery prune mode is set, add the prune sender recovery stage. + .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| { + PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold) + })) + .add_set(HashingStages { stages_config: self.stages_config.clone() }) + .add_set(HistoryIndexingStages { + stages_config: self.stages_config.clone(), + prune_modes: self.prune_modes.clone(), + }) + // Prune stage should be added after all hashing stages, because otherwise it will + // delete + .add_stage(PruneStage::new( + self.prune_modes.clone(), + self.stages_config.prune.commit_threshold, + )) } } @@ -362,6 +367,8 @@ pub struct ExecutionStages { consensus: Arc>, /// Configuration for each stage in the pipeline stages_config: StageConfig, + /// Prune mode for sender recovery + sender_recovery_prune_mode: Option, } impl ExecutionStages { @@ -370,8 +377,9 @@ impl ExecutionStages { executor_provider: E, consensus: Arc>, stages_config: StageConfig, + sender_recovery_prune_mode: Option, ) -> Self { - Self { evm_config: executor_provider, consensus, stages_config } + Self { evm_config: executor_provider, consensus, stages_config, sender_recovery_prune_mode } } } @@ -383,7 +391,10 @@ where { fn builder(self) -> StageSetBuilder { StageSetBuilder::default() - .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery)) + .add_stage(SenderRecoveryStage::new( + self.stages_config.sender_recovery, + self.sender_recovery_prune_mode, + )) .add_stage(ExecutionStage::from_config( self.evm_config, self.consensus, diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index abd737baad..dfe57330e2 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -12,9 +12,10 @@ use reth_db_api::{ use reth_primitives_traits::{GotExpected, NodePrimitives, SignedTransaction}; use reth_provider::{ BlockReader, DBProvider, EitherWriter, HeaderProvider, ProviderError, PruneCheckpointReader, - StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionsProvider, + PruneCheckpointWriter, StaticFileProviderFactory, StatsReader, StorageSettingsCache, + TransactionsProvider, }; -use reth_prune_types::PruneSegment; +use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, @@ -43,18 +44,22 @@ pub struct SenderRecoveryStage { /// The size of inserted items after which the control /// flow will be returned to the pipeline for commit pub commit_threshold: u64, + /// Prune mode for sender recovery. When set to `PruneMode::Full`, the stage will + /// fast-forward its checkpoint to skip all work, since senders will be recovered + /// inline by the execution stage instead. + pub prune_mode: Option, } impl SenderRecoveryStage { /// Create new instance of [`SenderRecoveryStage`]. - pub const fn new(config: SenderRecoveryConfig) -> Self { - Self { commit_threshold: config.commit_threshold } + pub const fn new(config: SenderRecoveryConfig, prune_mode: Option) -> Self { + Self { commit_threshold: config.commit_threshold, prune_mode } } } impl Default for SenderRecoveryStage { fn default() -> Self { - Self { commit_threshold: 5_000_000 } + Self { commit_threshold: 5_000_000, prune_mode: None } } } @@ -65,6 +70,7 @@ where + StaticFileProviderFactory> + StatsReader + PruneCheckpointReader + + PruneCheckpointWriter + StorageSettingsCache, { /// Return the id of the stage @@ -77,7 +83,45 @@ where /// collect transactions within that range, recover signer for each transaction and store /// entries in the [`TransactionSenders`][reth_db_api::tables::TransactionSenders] table or /// static files depending on configuration. - fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + fn execute( + &mut self, + provider: &Provider, + mut input: ExecInput, + ) -> Result { + // TODO: when senders are fully pruned, batch recover in execution stage instead of per-tx + // fallback + if let Some((target_prunable_block, prune_mode)) = self + .prune_mode + .map(|mode| { + mode.prune_target_block( + input.target(), + PruneSegment::SenderRecovery, + PrunePurpose::User, + ) + }) + .transpose()? + .flatten() && + target_prunable_block > input.checkpoint().block_number + { + input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); + + if provider.get_prune_checkpoint(PruneSegment::SenderRecovery)?.is_none() { + let target_prunable_tx_number = provider + .block_body_indices(target_prunable_block)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))? + .last_tx_num(); + + provider.save_prune_checkpoint( + PruneSegment::SenderRecovery, + PruneCheckpoint { + block_number: Some(target_prunable_block), + tx_number: Some(target_prunable_tx_number), + prune_mode, + }, + )?; + } + } + if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } @@ -720,7 +764,7 @@ mod tests { } fn stage(&self) -> Self::S { - SenderRecoveryStage { commit_threshold: self.threshold } + SenderRecoveryStage { commit_threshold: self.threshold, prune_mode: None } } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 6505015751..73d5b22bf9 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1025,10 +1025,10 @@ impl DatabaseProvider { let tx_range = body.tx_num_range(); - let (transactions, senders) = if tx_range.is_empty() { - (vec![], vec![]) + let transactions = if tx_range.is_empty() { + vec![] } else { - (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?) + self.transactions_by_tx_range(tx_range.clone())? }; let body = self @@ -1038,6 +1038,25 @@ impl DatabaseProvider { .pop() .ok_or(ProviderError::InvalidStorageOutput)?; + let senders = if tx_range.is_empty() { + vec![] + } else { + let known_senders: HashMap = + EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?; + + let mut senders = Vec::with_capacity(body.transactions().len()); + for (tx_num, tx) in tx_range.zip(body.transactions()) { + match known_senders.get(&tx_num) { + None => { + let sender = tx.recover_signer_unchecked()?; + senders.push(sender); + } + Some(sender) => senders.push(*sender), + } + } + senders + }; + construct_block(header, body, senders) }