feat(exex): make backfill thresholds configurable (#22037)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Emma Jamieson-Hoare
2026-02-10 16:30:18 -05:00
committed by GitHub
parent c5d1f70dd3
commit 5c4163c177
3 changed files with 156 additions and 1 deletions

View File

@@ -0,0 +1,6 @@
---
reth-exex: patch
reth-exex-types: patch
---
Added configurable backfill thresholds to ExEx notifications stream and added regression tests for state provider parity between pipeline and backfill execution paths.

View File

@@ -248,6 +248,7 @@ mod tests {
},
BackfillJobFactory,
};
use alloy_consensus::BlockHeader;
use reth_db_common::init::init_genesis;
use reth_evm_ethereum::EthEvmConfig;
use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
@@ -336,6 +337,119 @@ mod tests {
Ok(())
}
/// Verify that ExEx backfill (using `history_by_block_number`) produces identical execution
/// results to the pipeline path (using `LatestStateProvider`).
///
/// This is a regression test for an issue reported on mainnet where backfill fails around
/// blocks 1.7M-3.8M with "transaction gas limit X is more than blocks available gas Y",
/// suggesting the state provider used during backfill may return different state than what the
/// pipeline used during initial sync.
#[test]
fn test_backfill_state_provider_parity() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let key_pair = generators::generate_key(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());
let chain_spec = chain_spec(address);
let executor = EthEvmConfig::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(&provider_factory)?;
let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
// Execute blocks via LatestStateProvider (pipeline-style) and commit to DB.
// This mirrors what the pipeline's ExecutionStage does.
let pipeline_results =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
// Now re-execute via SingleBlockBackfillJob which uses history_by_block_number.
let factory = BackfillJobFactory::new(executor, blockchain_db);
let job = factory.backfill(1..=2);
let single_job = job.into_single_blocks();
let backfill_results: Vec<_> = single_job.into_iter().collect::<Result<Vec<_>, _>>()?;
assert_eq!(
pipeline_results.len(),
backfill_results.len(),
"should produce same number of block results"
);
for (i, ((pipeline_block, pipeline_output), (backfill_block, mut backfill_output))) in
pipeline_results.iter().zip(backfill_results.into_iter()).enumerate()
{
backfill_output.state.reverts.sort();
assert_eq!(
backfill_block, *pipeline_block,
"block {i} mismatch between pipeline and backfill"
);
assert_eq!(
backfill_output.receipts, pipeline_output.receipts,
"block {i}: receipts differ — gas accounting divergence between \
LatestStateProvider and history_by_block_number"
);
assert_eq!(
backfill_output.gas_used, pipeline_output.gas_used,
"block {i}: gas_used differs"
);
assert_eq!(
&backfill_output, pipeline_output,
"block {i}: full execution output differs between pipeline and backfill"
);
}
Ok(())
}
/// Same as above but for the batch `BackfillJob` path (`execute_range`), which also uses
/// `history_by_block_number`. Verifies the batch execution outcome matches what the pipeline
/// produced block-by-block.
#[test]
fn test_backfill_batch_state_provider_parity() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let key_pair = generators::generate_key(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());
let chain_spec = chain_spec(address);
let executor = EthEvmConfig::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(&provider_factory)?;
let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
let pipeline_results =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
// Re-execute via batch BackfillJob (execute_range) using history_by_block_number
let factory = BackfillJobFactory::new(executor, blockchain_db);
let job = factory.backfill(1..=2);
let chains = job.collect::<Result<Vec<_>, _>>()?;
assert_eq!(chains.len(), 1, "two blocks without threshold should yield one chain");
let chain = chains.into_iter().next().unwrap();
// Compare each block's receipts from the chain against the pipeline outputs
for (i, (pipeline_block, pipeline_output)) in pipeline_results.iter().enumerate() {
let block_number = pipeline_block.number();
let chain_block = chain.blocks().get(&block_number).expect("block should be in chain");
assert_eq!(chain_block, pipeline_block, "block {i}: block mismatch in batch backfill");
let chain_receipts = &chain.execution_outcome().receipts[i];
assert_eq!(
chain_receipts, &pipeline_output.receipts,
"block {i}: receipts differ in batch backfill — potential gas accounting \
divergence between LatestStateProvider and history_by_block_number"
);
}
Ok(())
}
#[test]
fn test_backfill_with_batch_threshold() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

View File

@@ -7,6 +7,7 @@ use reth_evm::ConfigureEvm;
use reth_exex_types::ExExHead;
use reth_node_api::NodePrimitives;
use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
use reth_stages_api::ExecutionStageThresholds;
use reth_tracing::tracing::debug;
use std::{
fmt::Debug,
@@ -61,6 +62,15 @@ pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
fn with_head(self, exex_head: ExExHead) -> Self
where
Self: Sized;
/// Sets custom thresholds for the backfill job.
///
/// These thresholds control how many blocks are included in each backfill notification.
/// Only takes effect when the stream is configured with a head.
///
/// By default, the backfill job uses [`BackfillJobFactory`] defaults (up to 500,000 blocks
/// per batch, bounded by 30s execution time).
fn set_backfill_thresholds(&mut self, _thresholds: ExecutionStageThresholds) {}
}
#[derive(Debug)]
@@ -151,6 +161,12 @@ where
self.set_with_head(exex_head);
self
}
fn set_backfill_thresholds(&mut self, thresholds: ExecutionStageThresholds) {
if let ExExNotificationsInner::WithHead(notifications) = &mut self.inner {
notifications.backfill_thresholds = Some(thresholds);
}
}
}
impl<P, E> Stream for ExExNotifications<P, E>
@@ -268,6 +284,8 @@ where
pending_check_backfill: bool,
/// The backfill job to run before consuming any notifications.
backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
/// Custom thresholds for the backfill job, if set.
backfill_thresholds: Option<ExecutionStageThresholds>,
}
impl<P, E> ExExNotificationsWithHead<P, E>
@@ -293,8 +311,22 @@ where
pending_check_canonical: true,
pending_check_backfill: true,
backfill_job: None,
backfill_thresholds: None,
}
}
/// Sets custom thresholds for the backfill job.
///
/// These thresholds control how many blocks are included in each backfill notification.
/// By default, the backfill job uses [`BackfillJobFactory`] defaults (up to 500,000 blocks
/// per batch, bounded by 30s execution time).
///
/// If your ExEx is memory-constrained, consider setting a lower `max_blocks` value to
/// reduce the size of each backfill notification.
pub const fn with_backfill_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self {
self.backfill_thresholds = Some(thresholds);
self
}
}
impl<P, E> ExExNotificationsWithHead<P, E>
@@ -359,8 +391,11 @@ where
/// - ExEx is at the same block number as the node head (`exex_head.number ==
/// node_head.number`). Nothing to do.
fn check_backfill(&mut self) -> eyre::Result<()> {
let backfill_job_factory =
let mut backfill_job_factory =
BackfillJobFactory::new(self.evm_config.clone(), self.provider.clone());
if let Some(thresholds) = self.backfill_thresholds.clone() {
backfill_job_factory = backfill_job_factory.with_thresholds(thresholds);
}
match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
std::cmp::Ordering::Less => {
// ExEx is behind the node head, start backfill