From 1e734936d8956e2e36d356f9d6fe8da23ee07c80 Mon Sep 17 00:00:00 2001 From: YK Date: Tue, 27 Jan 2026 18:34:44 +0800 Subject: [PATCH] fix(provider): skip storage changeset writes when routed to static files (#21468) --- .github/workflows/e2e.yml | 21 +++++ crates/e2e-test-utils/tests/rocksdb/main.rs | 14 ++-- .../src/providers/database/provider.rs | 82 ++++++++++--------- .../src/providers/static_file/manager.rs | 42 +++++----- .../storage/storage-api/src/state_writer.rs | 8 +- 5 files changed, 98 insertions(+), 69 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index f31fefed35..0a60f59367 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -44,3 +44,24 @@ jobs: --exclude 'op-reth' \ --exclude 'reth' \ -E 'binary(e2e_testsuite)' + + rocksdb: + name: e2e-rocksdb + runs-on: depot-ubuntu-latest-4 + env: + RUST_BACKTRACE: 1 + timeout-minutes: 60 + steps: + - uses: actions/checkout@v6 + - uses: dtolnay/rust-toolchain@stable + - uses: mozilla-actions/sccache-action@v0.0.9 + - uses: taiki-e/install-action@nextest + - uses: Swatinem/rust-cache@v2 + with: + cache-on-failure: true + - name: Run RocksDB e2e tests + run: | + cargo nextest run \ + --locked --features "edge" \ + -p reth-e2e-test-utils \ + -E 'binary(rocksdb)' diff --git a/crates/e2e-test-utils/tests/rocksdb/main.rs b/crates/e2e-test-utils/tests/rocksdb/main.rs index 2a3e0f6214..178ed5a25c 100644 --- a/crates/e2e-test-utils/tests/rocksdb/main.rs +++ b/crates/e2e-test-utils/tests/rocksdb/main.rs @@ -98,14 +98,12 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes { } /// Enables `RocksDB` for `TransactionHashNumbers` table. -/// -/// Note: Static file changesets are disabled because `persistence_threshold(0)` causes -/// a race where the static file writer expects sequential block numbers but receives -/// them out of order, resulting in `UnexpectedStaticFileBlockNumber` errors. -fn with_rocksdb_enabled(mut config: NodeConfig) -> NodeConfig { - config.rocksdb = RocksDbArgs { tx_hash: true, ..Default::default() }; - config.static_files.storage_changesets = false; - config.static_files.account_changesets = false; +/// Explicitly enables static file changesets to test the fix for double-write bug. +const fn with_rocksdb_enabled(mut config: NodeConfig) -> NodeConfig { + config.rocksdb = + RocksDbArgs { all: true, tx_hash: true, storages_history: true, account_history: true }; + config.static_files.storage_changesets = true; + config.static_files.account_changesets = true; config } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index ba3292c51b..e39336c6a8 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -627,6 +627,7 @@ impl DatabaseProvider StateWriter config: StateWriteConfig, ) -> ProviderResult<()> { // Write storage changes - tracing::trace!("Writing storage changes"); - let mut storages_cursor = self.tx_ref().cursor_dup_write::()?; - for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() { - let block_number = first_block + block_index as BlockNumber; + if config.write_storage_changesets { + tracing::trace!("Writing storage changes"); + let mut storages_cursor = + self.tx_ref().cursor_dup_write::()?; + for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() { + let block_number = first_block + block_index as BlockNumber; - tracing::trace!(block_number, "Writing block change"); - // sort changes by address. - storage_changes.par_sort_unstable_by_key(|a| a.address); - let total_changes = - storage_changes.iter().map(|change| change.storage_revert.len()).sum(); - let mut changeset = Vec::with_capacity(total_changes); - for PlainStorageRevert { address, wiped, storage_revert } in storage_changes { - let mut storage = storage_revert - .into_iter() - .map(|(k, v)| (B256::new(k.to_be_bytes()), v)) - .collect::>(); - // sort storage slots by key. - storage.par_sort_unstable_by_key(|a| a.0); + tracing::trace!(block_number, "Writing block change"); + // sort changes by address. + storage_changes.par_sort_unstable_by_key(|a| a.address); + let total_changes = + storage_changes.iter().map(|change| change.storage_revert.len()).sum(); + let mut changeset = Vec::with_capacity(total_changes); + for PlainStorageRevert { address, wiped, storage_revert } in storage_changes { + let mut storage = storage_revert + .into_iter() + .map(|(k, v)| (B256::new(k.to_be_bytes()), v)) + .collect::>(); + // sort storage slots by key. + storage.par_sort_unstable_by_key(|a| a.0); - // If we are writing the primary storage wipe transition, the pre-existing plain - // storage state has to be taken from the database and written to storage history. - // See [StorageWipe::Primary] for more details. - // - // TODO(mediocregopher): This could be rewritten in a way which doesn't require - // collecting wiped entries into a Vec like this, see - // `write_storage_trie_changesets`. - let mut wiped_storage = Vec::new(); - if wiped { - tracing::trace!(?address, "Wiping storage"); - if let Some((_, entry)) = storages_cursor.seek_exact(address)? { - wiped_storage.push((entry.key, entry.value)); - while let Some(entry) = storages_cursor.next_dup_val()? { - wiped_storage.push((entry.key, entry.value)) + // If we are writing the primary storage wipe transition, the pre-existing plain + // storage state has to be taken from the database and written to storage + // history. See [StorageWipe::Primary] for more details. + // + // TODO(mediocregopher): This could be rewritten in a way which doesn't require + // collecting wiped entries into a Vec like this, see + // `write_storage_trie_changesets`. + let mut wiped_storage = Vec::new(); + if wiped { + tracing::trace!(?address, "Wiping storage"); + if let Some((_, entry)) = storages_cursor.seek_exact(address)? { + wiped_storage.push((entry.key, entry.value)); + while let Some(entry) = storages_cursor.next_dup_val()? { + wiped_storage.push((entry.key, entry.value)) + } } } + + tracing::trace!(?address, ?storage, "Writing storage reverts"); + for (key, value) in StorageRevertsIter::new(storage, wiped_storage) { + changeset.push(StorageBeforeTx { address, key, value }); + } } - tracing::trace!(?address, ?storage, "Writing storage reverts"); - for (key, value) in StorageRevertsIter::new(storage, wiped_storage) { - changeset.push(StorageBeforeTx { address, key, value }); - } + let mut storage_changesets_writer = + EitherWriter::new_storage_changesets(self, block_number)?; + storage_changesets_writer.append_storage_changeset(block_number, changeset)?; } - - let mut storage_changesets_writer = - EitherWriter::new_storage_changesets(self, block_number)?; - storage_changesets_writer.append_storage_changeset(block_number, changeset)?; } if !config.write_account_changesets { diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index dff1b6d303..79b7b2a3d9 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -615,13 +615,13 @@ impl StaticFileProvider { let block_number = block.recovered_block().number(); let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts(); - for account_block_reverts in reverts.accounts { - let changeset = account_block_reverts - .into_iter() - .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) }) - .collect::>(); - w.append_account_changeset(changeset, block_number)?; - } + let changeset: Vec<_> = reverts + .accounts + .into_iter() + .flatten() + .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) }) + .collect(); + w.append_account_changeset(changeset, block_number)?; } Ok(()) } @@ -636,21 +636,21 @@ impl StaticFileProvider { let block_number = block.recovered_block().number(); let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts(); - for storage_block_reverts in reverts.storage { - let changeset = storage_block_reverts - .into_iter() - .flat_map(|revert| { - revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| { - StorageBeforeTx { - address: revert.address, - key: B256::new(key.to_be_bytes()), - value: revert_to_slot.to_previous_value(), - } - }) + let changeset: Vec<_> = reverts + .storage + .into_iter() + .flatten() + .flat_map(|revert| { + revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| { + StorageBeforeTx { + address: revert.address, + key: B256::new(key.to_be_bytes()), + value: revert_to_slot.to_previous_value(), + } }) - .collect::>(); - w.append_storage_changeset(changeset, block_number)?; - } + }) + .collect(); + w.append_storage_changeset(changeset, block_number)?; } Ok(()) } diff --git a/crates/storage/storage-api/src/state_writer.rs b/crates/storage/storage-api/src/state_writer.rs index f2c193559b..36fbf5f94c 100644 --- a/crates/storage/storage-api/src/state_writer.rs +++ b/crates/storage/storage-api/src/state_writer.rs @@ -136,10 +136,16 @@ pub struct StateWriteConfig { pub write_receipts: bool, /// Whether to write account changesets. pub write_account_changesets: bool, + /// Whether to write storage changesets. + pub write_storage_changesets: bool, } impl Default for StateWriteConfig { fn default() -> Self { - Self { write_receipts: true, write_account_changesets: true } + Self { + write_receipts: true, + write_account_changesets: true, + write_storage_changesets: true, + } } }