> = Vec::new();
let mut parent_hash = genesis.hash();
@@ -384,11 +401,15 @@ async fn test_pipeline() -> eyre::Result<()> {
// This is needed because we wrote state during block generation for computing state roots
let pipeline_provider_factory =
create_test_provider_factory_with_chain_spec(chain_spec.clone());
+ if let Some(settings) = storage_settings {
+ pipeline_provider_factory.set_storage_settings_cache(settings);
+ }
init_genesis(&pipeline_provider_factory).expect("init genesis");
let pipeline_genesis =
pipeline_provider_factory.sealed_header(0)?.expect("genesis should exist");
let pipeline_consensus = NoopConsensus::arc();
+ let blocks_clone = blocks.clone();
let file_client = create_file_client_from_blocks(blocks);
let max_block = file_client.max_block().unwrap();
let tip = file_client.tip().expect("tip");
@@ -417,7 +438,7 @@ async fn test_pipeline() -> eyre::Result<()> {
{
let provider = pipeline_provider_factory.provider()?;
let last_block = provider.last_block_number()?;
- assert_eq!(last_block, 5, "should have synced 5 blocks");
+ assert_eq!(last_block, num_blocks, "should have synced {num_blocks} blocks");
for stage_id in [
StageId::Headers,
@@ -435,29 +456,28 @@ async fn test_pipeline() -> eyre::Result<()> {
let checkpoint = provider.get_stage_checkpoint(stage_id)?;
assert_eq!(
checkpoint.map(|c| c.block_number),
- Some(5),
- "{stage_id} checkpoint should be at block 5"
+ Some(num_blocks),
+ "{stage_id} checkpoint should be at block {num_blocks}"
);
}
// Verify the counter contract's storage was updated
- // After 5 blocks with 1 increment each, slot 0 should be 5
+ // After num_blocks blocks with 1 increment each, slot 0 should be num_blocks
let state = provider.latest();
let counter_storage = state.storage(CONTRACT_ADDRESS, B256::ZERO)?;
assert_eq!(
counter_storage,
- Some(U256::from(5)),
- "Counter storage slot 0 should be 5 after 5 increments"
+ Some(U256::from(num_blocks)),
+ "Counter storage slot 0 should be {num_blocks} after {num_blocks} increments"
);
}
// Verify changesets are queryable before unwind
// This validates that the #21561 fix works - unwind needs to read changesets from the correct
// source
- assert_changesets_queryable(&pipeline_provider_factory, 1..=5)?;
+ assert_changesets_queryable(&pipeline_provider_factory, 1..=num_blocks)?;
- // Unwind to block 2
- let unwind_target = 2u64;
+ // Unwind to unwind_target
pipeline.unwind(unwind_target, None)?;
// Verify unwind
@@ -484,7 +504,114 @@ async fn test_pipeline() -> eyre::Result<()> {
);
}
}
+
+ let state = provider.latest();
+ let counter_storage = state.storage(CONTRACT_ADDRESS, B256::ZERO)?;
+ assert_eq!(
+ counter_storage,
+ Some(U256::from(unwind_target)),
+ "Counter storage slot 0 should be {unwind_target} after unwinding to block {unwind_target}"
+ );
+ }
+
+ // Re-sync: build a new pipeline starting from unwind_target and sync back to num_blocks
+ let resync_file_client = create_file_client_from_blocks(blocks_clone);
+ let resync_consensus = NoopConsensus::arc();
+ let resync_stages_config = StageConfig::default();
+
+ let unwind_head = pipeline_provider_factory
+ .sealed_header(unwind_target)?
+ .expect("unwind target header should exist");
+
+ let mut resync_header_downloader =
+ ReverseHeadersDownloaderBuilder::new(resync_stages_config.headers)
+ .build(resync_file_client.clone(), resync_consensus.clone())
+ .into_task();
+ resync_header_downloader.update_local_head(unwind_head);
+ resync_header_downloader.update_sync_target(SyncTarget::Tip(tip));
+
+ let mut resync_body_downloader = BodiesDownloaderBuilder::new(resync_stages_config.bodies)
+ .build(resync_file_client, resync_consensus, pipeline_provider_factory.clone())
+ .into_task();
+ resync_body_downloader
+ .set_download_range(unwind_target + 1..=max_block)
+ .expect("set download range");
+
+ let resync_pipeline = build_pipeline(
+ pipeline_provider_factory.clone(),
+ resync_header_downloader,
+ resync_body_downloader,
+ max_block,
+ tip,
+ );
+
+ let (_resync_pipeline, resync_result) = resync_pipeline.run_as_fut(None).await;
+ resync_result?;
+
+ // Verify re-sync
+ {
+ let provider = pipeline_provider_factory.provider()?;
+ let last_block = provider.last_block_number()?;
+ assert_eq!(last_block, num_blocks, "should have re-synced to {num_blocks} blocks");
+
+ for stage_id in [
+ StageId::Headers,
+ StageId::Bodies,
+ StageId::SenderRecovery,
+ StageId::Execution,
+ StageId::AccountHashing,
+ StageId::StorageHashing,
+ StageId::MerkleExecute,
+ StageId::TransactionLookup,
+ StageId::IndexAccountHistory,
+ StageId::IndexStorageHistory,
+ StageId::Finish,
+ ] {
+ let checkpoint = provider.get_stage_checkpoint(stage_id)?;
+ assert_eq!(
+ checkpoint.map(|c| c.block_number),
+ Some(num_blocks),
+ "{stage_id} checkpoint should be at block {num_blocks} after re-sync"
+ );
+ }
+
+ let state = provider.latest();
+ let counter_storage = state.storage(CONTRACT_ADDRESS, B256::ZERO)?;
+ assert_eq!(
+ counter_storage,
+ Some(U256::from(num_blocks)),
+ "Counter storage slot 0 should be {num_blocks} after re-sync"
+ );
}
Ok(())
}
+
+/// Tests pipeline with ALL stages enabled using both ETH transfers and contract storage changes.
+///
+/// This test:
+/// 1. Pre-funds a signer account and deploys a Counter contract in genesis
+/// 2. Each block contains two transactions:
+/// - ETH transfer to a recipient (account state changes)
+/// - Counter `increment()` call (storage state changes)
+/// 3. Runs the full pipeline with ALL stages enabled
+/// 4. Forward syncs to block 5, unwinds to block 2, then re-syncs to block 5
+///
+/// This exercises both account and storage hashing/history stages.
+#[tokio::test(flavor = "multi_thread")]
+async fn test_pipeline() -> eyre::Result<()> {
+ run_pipeline_forward_and_unwind(None, 5, 2).await
+}
+
+/// Same as [`test_pipeline`] but runs with v2 storage settings (`use_hashed_state=true`,
+/// `storage_changesets_in_static_files=true`, etc.).
+///
+/// In v2 mode:
+/// - The execution stage writes directly to `HashedAccounts`/`HashedStorages`
+/// - `AccountHashingStage` and `StorageHashingStage` are no-ops during forward execution
+/// - Changesets are stored in static files with pre-hashed storage keys
+/// - Unwind must still revert hashed state via the hashing stages before `MerkleUnwind` validates
+#[tokio::test(flavor = "multi_thread")]
+async fn test_pipeline_v2() -> eyre::Result<()> {
+ run_pipeline_forward_and_unwind(Some(StorageSettings::v2()), 5, 2).await
+}
diff --git a/crates/storage/db-api/src/models/metadata.rs b/crates/storage/db-api/src/models/metadata.rs
index 14b9cb44ae..ef67dba722 100644
--- a/crates/storage/db-api/src/models/metadata.rs
+++ b/crates/storage/db-api/src/models/metadata.rs
@@ -43,11 +43,19 @@ pub struct StorageSettings {
impl StorageSettings {
/// Returns the default base `StorageSettings`.
///
- /// Always returns [`Self::v1()`]. Use the `--storage.v2` CLI flag to opt into
- /// [`Self::v2()`] at runtime. The `rocksdb` feature only makes the v2 backend
- /// *available*; it does not activate it by default.
+ /// When the `edge` feature is enabled, returns [`Self::v2()`] so that CI and
+ /// edge builds automatically use v2 storage defaults. Otherwise returns
+ /// [`Self::v1()`]. The `--storage.v2` CLI flag can also opt into v2 at runtime
+ /// regardless of feature flags.
pub const fn base() -> Self {
- Self::v1()
+ #[cfg(feature = "edge")]
+ {
+ Self::v2()
+ }
+ #[cfg(not(feature = "edge"))]
+ {
+ Self::v1()
+ }
}
/// Creates `StorageSettings` for v2 nodes with all storage features enabled:
@@ -65,7 +73,7 @@ impl StorageSettings {
storages_history_in_rocksdb: true,
transaction_hash_numbers_in_rocksdb: true,
account_history_in_rocksdb: true,
- use_hashed_state: false,
+ use_hashed_state: true,
}
}
diff --git a/crates/storage/provider/src/changeset_walker.rs b/crates/storage/provider/src/changeset_walker.rs
index 5eb521e3a7..ba4fe4811c 100644
--- a/crates/storage/provider/src/changeset_walker.rs
+++ b/crates/storage/provider/src/changeset_walker.rs
@@ -5,8 +5,7 @@ use crate::ProviderResult;
use alloy_primitives::BlockNumber;
use reth_db::models::AccountBeforeTx;
use reth_db_api::models::BlockNumberAddress;
-use reth_primitives_traits::StorageEntry;
-use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
+use reth_storage_api::{ChangeSetReader, ChangesetEntry, StorageChangeSetReader};
use std::ops::{Bound, RangeBounds};
/// Iterator that walks account changesets from static files in a block range.
@@ -110,7 +109,7 @@ pub struct StaticFileStorageChangesetWalker {
/// Current block being processed
current_block: BlockNumber,
/// Changesets for current block
- current_changesets: Vec<(BlockNumberAddress, StorageEntry)>,
+ current_changesets: Vec<(BlockNumberAddress, ChangesetEntry)>,
/// Index within current block's changesets
changeset_index: usize,
}
@@ -144,7 +143,7 @@ impl
Iterator for StaticFileStorageChangesetWalker
where
P: StorageChangeSetReader,
{
- type Item = ProviderResult<(BlockNumberAddress, StorageEntry)>;
+ type Item = ProviderResult<(BlockNumberAddress, ChangesetEntry)>;
fn next(&mut self) -> Option {
if let Some(changeset) = self.current_changesets.get(self.changeset_index).copied() {
diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs
index a9cf4c38f4..63de5fd845 100644
--- a/crates/storage/provider/src/providers/blockchain_provider.rs
+++ b/crates/storage/provider/src/providers/blockchain_provider.rs
@@ -23,11 +23,13 @@ use reth_chainspec::ChainInfo;
use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
use reth_execution_types::ExecutionOutcome;
use reth_node_types::{BlockTy, HeaderTy, NodeTypesWithDB, ReceiptTy, TxTy};
-use reth_primitives_traits::{Account, RecoveredBlock, SealedHeader, StorageEntry};
+use reth_primitives_traits::{Account, RecoveredBlock, SealedHeader};
use reth_prune_types::{PruneCheckpoint, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
-use reth_storage_api::{BlockBodyIndicesProvider, NodePrimitivesProvider, StorageChangeSetReader};
+use reth_storage_api::{
+ BlockBodyIndicesProvider, ChangesetEntry, NodePrimitivesProvider, StorageChangeSetReader,
+};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{HashedPostState, KeccakKeyHasher};
use revm_database::BundleState;
@@ -713,7 +715,7 @@ impl StorageChangeSetReader for BlockchainProvider {
fn storage_changeset(
&self,
block_number: BlockNumber,
- ) -> ProviderResult> {
+ ) -> ProviderResult> {
self.consistent_provider()?.storage_changeset(block_number)
}
@@ -722,14 +724,14 @@ impl StorageChangeSetReader for BlockchainProvider {
block_number: BlockNumber,
address: Address,
storage_key: B256,
- ) -> ProviderResult