mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
fix(provider): skip storage changeset writes when routed to static files (#21468)
This commit is contained in:
21
.github/workflows/e2e.yml
vendored
21
.github/workflows/e2e.yml
vendored
@@ -44,3 +44,24 @@ jobs:
|
||||
--exclude 'op-reth' \
|
||||
--exclude 'reth' \
|
||||
-E 'binary(e2e_testsuite)'
|
||||
|
||||
rocksdb:
|
||||
name: e2e-rocksdb
|
||||
runs-on: depot-ubuntu-latest-4
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- uses: mozilla-actions/sccache-action@v0.0.9
|
||||
- uses: taiki-e/install-action@nextest
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- name: Run RocksDB e2e tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--locked --features "edge" \
|
||||
-p reth-e2e-test-utils \
|
||||
-E 'binary(rocksdb)'
|
||||
|
||||
@@ -98,14 +98,12 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
|
||||
}
|
||||
|
||||
/// Enables `RocksDB` for `TransactionHashNumbers` table.
|
||||
///
|
||||
/// Note: Static file changesets are disabled because `persistence_threshold(0)` causes
|
||||
/// a race where the static file writer expects sequential block numbers but receives
|
||||
/// them out of order, resulting in `UnexpectedStaticFileBlockNumber` errors.
|
||||
fn with_rocksdb_enabled<C>(mut config: NodeConfig<C>) -> NodeConfig<C> {
|
||||
config.rocksdb = RocksDbArgs { tx_hash: true, ..Default::default() };
|
||||
config.static_files.storage_changesets = false;
|
||||
config.static_files.account_changesets = false;
|
||||
/// Explicitly enables static file changesets to test the fix for double-write bug.
|
||||
const fn with_rocksdb_enabled<C>(mut config: NodeConfig<C>) -> NodeConfig<C> {
|
||||
config.rocksdb =
|
||||
RocksDbArgs { all: true, tx_hash: true, storages_history: true, account_history: true };
|
||||
config.static_files.storage_changesets = true;
|
||||
config.static_files.account_changesets = true;
|
||||
config
|
||||
}
|
||||
|
||||
|
||||
@@ -627,6 +627,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
StateWriteConfig {
|
||||
write_receipts: !sf_ctx.write_receipts,
|
||||
write_account_changesets: !sf_ctx.write_account_changesets,
|
||||
write_storage_changesets: !sf_ctx.write_storage_changesets,
|
||||
},
|
||||
)?;
|
||||
timings.write_state += start.elapsed();
|
||||
@@ -2304,52 +2305,55 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
|
||||
config: StateWriteConfig,
|
||||
) -> ProviderResult<()> {
|
||||
// Write storage changes
|
||||
tracing::trace!("Writing storage changes");
|
||||
let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
|
||||
for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
|
||||
let block_number = first_block + block_index as BlockNumber;
|
||||
if config.write_storage_changesets {
|
||||
tracing::trace!("Writing storage changes");
|
||||
let mut storages_cursor =
|
||||
self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
|
||||
for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
|
||||
let block_number = first_block + block_index as BlockNumber;
|
||||
|
||||
tracing::trace!(block_number, "Writing block change");
|
||||
// sort changes by address.
|
||||
storage_changes.par_sort_unstable_by_key(|a| a.address);
|
||||
let total_changes =
|
||||
storage_changes.iter().map(|change| change.storage_revert.len()).sum();
|
||||
let mut changeset = Vec::with_capacity(total_changes);
|
||||
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
|
||||
let mut storage = storage_revert
|
||||
.into_iter()
|
||||
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
|
||||
.collect::<Vec<_>>();
|
||||
// sort storage slots by key.
|
||||
storage.par_sort_unstable_by_key(|a| a.0);
|
||||
tracing::trace!(block_number, "Writing block change");
|
||||
// sort changes by address.
|
||||
storage_changes.par_sort_unstable_by_key(|a| a.address);
|
||||
let total_changes =
|
||||
storage_changes.iter().map(|change| change.storage_revert.len()).sum();
|
||||
let mut changeset = Vec::with_capacity(total_changes);
|
||||
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
|
||||
let mut storage = storage_revert
|
||||
.into_iter()
|
||||
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
|
||||
.collect::<Vec<_>>();
|
||||
// sort storage slots by key.
|
||||
storage.par_sort_unstable_by_key(|a| a.0);
|
||||
|
||||
// If we are writing the primary storage wipe transition, the pre-existing plain
|
||||
// storage state has to be taken from the database and written to storage history.
|
||||
// See [StorageWipe::Primary] for more details.
|
||||
//
|
||||
// TODO(mediocregopher): This could be rewritten in a way which doesn't require
|
||||
// collecting wiped entries into a Vec like this, see
|
||||
// `write_storage_trie_changesets`.
|
||||
let mut wiped_storage = Vec::new();
|
||||
if wiped {
|
||||
tracing::trace!(?address, "Wiping storage");
|
||||
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
|
||||
wiped_storage.push((entry.key, entry.value));
|
||||
while let Some(entry) = storages_cursor.next_dup_val()? {
|
||||
wiped_storage.push((entry.key, entry.value))
|
||||
// If we are writing the primary storage wipe transition, the pre-existing plain
|
||||
// storage state has to be taken from the database and written to storage
|
||||
// history. See [StorageWipe::Primary] for more details.
|
||||
//
|
||||
// TODO(mediocregopher): This could be rewritten in a way which doesn't require
|
||||
// collecting wiped entries into a Vec like this, see
|
||||
// `write_storage_trie_changesets`.
|
||||
let mut wiped_storage = Vec::new();
|
||||
if wiped {
|
||||
tracing::trace!(?address, "Wiping storage");
|
||||
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
|
||||
wiped_storage.push((entry.key, entry.value));
|
||||
while let Some(entry) = storages_cursor.next_dup_val()? {
|
||||
wiped_storage.push((entry.key, entry.value))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::trace!(?address, ?storage, "Writing storage reverts");
|
||||
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
|
||||
changeset.push(StorageBeforeTx { address, key, value });
|
||||
}
|
||||
}
|
||||
|
||||
tracing::trace!(?address, ?storage, "Writing storage reverts");
|
||||
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
|
||||
changeset.push(StorageBeforeTx { address, key, value });
|
||||
}
|
||||
let mut storage_changesets_writer =
|
||||
EitherWriter::new_storage_changesets(self, block_number)?;
|
||||
storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
|
||||
}
|
||||
|
||||
let mut storage_changesets_writer =
|
||||
EitherWriter::new_storage_changesets(self, block_number)?;
|
||||
storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
|
||||
}
|
||||
|
||||
if !config.write_account_changesets {
|
||||
|
||||
@@ -615,13 +615,13 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
let block_number = block.recovered_block().number();
|
||||
let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
|
||||
|
||||
for account_block_reverts in reverts.accounts {
|
||||
let changeset = account_block_reverts
|
||||
.into_iter()
|
||||
.map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
|
||||
.collect::<Vec<_>>();
|
||||
w.append_account_changeset(changeset, block_number)?;
|
||||
}
|
||||
let changeset: Vec<_> = reverts
|
||||
.accounts
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
|
||||
.collect();
|
||||
w.append_account_changeset(changeset, block_number)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -636,21 +636,21 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
let block_number = block.recovered_block().number();
|
||||
let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
|
||||
|
||||
for storage_block_reverts in reverts.storage {
|
||||
let changeset = storage_block_reverts
|
||||
.into_iter()
|
||||
.flat_map(|revert| {
|
||||
revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
|
||||
StorageBeforeTx {
|
||||
address: revert.address,
|
||||
key: B256::new(key.to_be_bytes()),
|
||||
value: revert_to_slot.to_previous_value(),
|
||||
}
|
||||
})
|
||||
let changeset: Vec<_> = reverts
|
||||
.storage
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.flat_map(|revert| {
|
||||
revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
|
||||
StorageBeforeTx {
|
||||
address: revert.address,
|
||||
key: B256::new(key.to_be_bytes()),
|
||||
value: revert_to_slot.to_previous_value(),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
w.append_storage_changeset(changeset, block_number)?;
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
w.append_storage_changeset(changeset, block_number)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -136,10 +136,16 @@ pub struct StateWriteConfig {
|
||||
pub write_receipts: bool,
|
||||
/// Whether to write account changesets.
|
||||
pub write_account_changesets: bool,
|
||||
/// Whether to write storage changesets.
|
||||
pub write_storage_changesets: bool,
|
||||
}
|
||||
|
||||
impl Default for StateWriteConfig {
|
||||
fn default() -> Self {
|
||||
Self { write_receipts: true, write_account_changesets: true }
|
||||
Self {
|
||||
write_receipts: true,
|
||||
write_account_changesets: true,
|
||||
write_storage_changesets: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user