fix: skip sender recovery stage when senders fully pruned (#21918)

This commit is contained in:
Dan Cline
2026-02-06 21:22:40 +00:00
committed by GitHub
parent 1177bc94c9
commit 352430cd84
5 changed files with 109 additions and 34 deletions

View File

@@ -248,9 +248,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
(Box::new(stage), None)
}
StageEnum::Senders => (
Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
commit_threshold: batch_size,
})),
Box::new(SenderRecoveryStage::new(
SenderRecoveryConfig { commit_threshold: batch_size },
None,
)),
None,
),
StageEnum::Execution => (

View File

@@ -72,7 +72,7 @@ fn senders(c: &mut Criterion, runtime: &Runtime) {
let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);
let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS };
let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS, prune_mode: None };
measure_stage(
runtime,

View File

@@ -52,7 +52,7 @@ use reth_evm::ConfigureEvm;
use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
use reth_primitives_traits::{Block, NodePrimitives};
use reth_provider::HeaderSyncGapProvider;
use reth_prune_types::PruneModes;
use reth_prune_types::{PruneMode, PruneModes};
use reth_stages_api::Stage;
use std::sync::Arc;
use tokio::sync::watch;
@@ -332,23 +332,28 @@ where
PruneStage: Stage<Provider>,
{
fn builder(self) -> StageSetBuilder<Provider> {
ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone())
.builder()
// If sender recovery prune mode is set, add the prune sender recovery stage.
.add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
}))
.add_set(HashingStages { stages_config: self.stages_config.clone() })
.add_set(HistoryIndexingStages {
stages_config: self.stages_config.clone(),
prune_modes: self.prune_modes.clone(),
})
// Prune stage should be added after all hashing stages, because otherwise it will
// delete
.add_stage(PruneStage::new(
self.prune_modes.clone(),
self.stages_config.prune.commit_threshold,
))
ExecutionStages::new(
self.evm_config,
self.consensus,
self.stages_config.clone(),
self.prune_modes.sender_recovery,
)
.builder()
// If sender recovery prune mode is set, add the prune sender recovery stage.
.add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
}))
.add_set(HashingStages { stages_config: self.stages_config.clone() })
.add_set(HistoryIndexingStages {
stages_config: self.stages_config.clone(),
prune_modes: self.prune_modes.clone(),
})
// Prune stage should be added after all hashing stages, because otherwise it will
// delete
.add_stage(PruneStage::new(
self.prune_modes.clone(),
self.stages_config.prune.commit_threshold,
))
}
}
@@ -362,6 +367,8 @@ pub struct ExecutionStages<E: ConfigureEvm> {
consensus: Arc<dyn FullConsensus<E::Primitives>>,
/// Configuration for each stage in the pipeline
stages_config: StageConfig,
/// Prune mode for sender recovery
sender_recovery_prune_mode: Option<PruneMode>,
}
impl<E: ConfigureEvm> ExecutionStages<E> {
@@ -370,8 +377,9 @@ impl<E: ConfigureEvm> ExecutionStages<E> {
executor_provider: E,
consensus: Arc<dyn FullConsensus<E::Primitives>>,
stages_config: StageConfig,
sender_recovery_prune_mode: Option<PruneMode>,
) -> Self {
Self { evm_config: executor_provider, consensus, stages_config }
Self { evm_config: executor_provider, consensus, stages_config, sender_recovery_prune_mode }
}
}
@@ -383,7 +391,10 @@ where
{
fn builder(self) -> StageSetBuilder<Provider> {
StageSetBuilder::default()
.add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
.add_stage(SenderRecoveryStage::new(
self.stages_config.sender_recovery,
self.sender_recovery_prune_mode,
))
.add_stage(ExecutionStage::from_config(
self.evm_config,
self.consensus,

View File

@@ -12,9 +12,10 @@ use reth_db_api::{
use reth_primitives_traits::{GotExpected, NodePrimitives, SignedTransaction};
use reth_provider::{
BlockReader, DBProvider, EitherWriter, HeaderProvider, ProviderError, PruneCheckpointReader,
StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionsProvider,
PruneCheckpointWriter, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
TransactionsProvider,
};
use reth_prune_types::PruneSegment;
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError,
StageId, UnwindInput, UnwindOutput,
@@ -43,18 +44,22 @@ pub struct SenderRecoveryStage {
/// The size of inserted items after which the control
/// flow will be returned to the pipeline for commit
pub commit_threshold: u64,
/// Prune mode for sender recovery. When set to `PruneMode::Full`, the stage will
/// fast-forward its checkpoint to skip all work, since senders will be recovered
/// inline by the execution stage instead.
pub prune_mode: Option<PruneMode>,
}
impl SenderRecoveryStage {
/// Create new instance of [`SenderRecoveryStage`].
pub const fn new(config: SenderRecoveryConfig) -> Self {
Self { commit_threshold: config.commit_threshold }
pub const fn new(config: SenderRecoveryConfig, prune_mode: Option<PruneMode>) -> Self {
Self { commit_threshold: config.commit_threshold, prune_mode }
}
}
impl Default for SenderRecoveryStage {
fn default() -> Self {
Self { commit_threshold: 5_000_000 }
Self { commit_threshold: 5_000_000, prune_mode: None }
}
}
@@ -65,6 +70,7 @@ where
+ StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
+ StatsReader
+ PruneCheckpointReader
+ PruneCheckpointWriter
+ StorageSettingsCache,
{
/// Return the id of the stage
@@ -77,7 +83,45 @@ where
/// collect transactions within that range, recover signer for each transaction and store
/// entries in the [`TransactionSenders`][reth_db_api::tables::TransactionSenders] table or
/// static files depending on configuration.
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
fn execute(
&mut self,
provider: &Provider,
mut input: ExecInput,
) -> Result<ExecOutput, StageError> {
// TODO: when senders are fully pruned, batch recover in execution stage instead of per-tx
// fallback
if let Some((target_prunable_block, prune_mode)) = self
.prune_mode
.map(|mode| {
mode.prune_target_block(
input.target(),
PruneSegment::SenderRecovery,
PrunePurpose::User,
)
})
.transpose()?
.flatten() &&
target_prunable_block > input.checkpoint().block_number
{
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
if provider.get_prune_checkpoint(PruneSegment::SenderRecovery)?.is_none() {
let target_prunable_tx_number = provider
.block_body_indices(target_prunable_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
.last_tx_num();
provider.save_prune_checkpoint(
PruneSegment::SenderRecovery,
PruneCheckpoint {
block_number: Some(target_prunable_block),
tx_number: Some(target_prunable_tx_number),
prune_mode,
},
)?;
}
}
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
@@ -720,7 +764,7 @@ mod tests {
}
fn stage(&self) -> Self::S {
SenderRecoveryStage { commit_threshold: self.threshold }
SenderRecoveryStage { commit_threshold: self.threshold, prune_mode: None }
}
}

View File

@@ -1025,10 +1025,10 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
let tx_range = body.tx_num_range();
let (transactions, senders) = if tx_range.is_empty() {
(vec![], vec![])
let transactions = if tx_range.is_empty() {
vec![]
} else {
(self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
self.transactions_by_tx_range(tx_range.clone())?
};
let body = self
@@ -1038,6 +1038,25 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
.pop()
.ok_or(ProviderError::InvalidStorageOutput)?;
let senders = if tx_range.is_empty() {
vec![]
} else {
let known_senders: HashMap<TxNumber, Address> =
EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
let mut senders = Vec::with_capacity(body.transactions().len());
for (tx_num, tx) in tx_range.zip(body.transactions()) {
match known_senders.get(&tx_num) {
None => {
let sender = tx.recover_signer_unchecked()?;
senders.push(sender);
}
Some(sender) => senders.push(*sender),
}
}
senders
};
construct_block(header, body, senders)
}