feat: remove StateChanges and StateReverts (#9781)

This commit is contained in:
Dan Cline
2024-07-24 17:53:54 -04:00
committed by GitHub
parent 7531366449
commit 527cc4460b
8 changed files with 179 additions and 214 deletions

View File

@@ -1,8 +1,5 @@
//! Bundle state module.
//! This module contains all the logic related to bundle state.
mod state_changes;
mod state_reverts;
pub use state_changes::StateChanges;
pub use state_reverts::{StateReverts, StorageRevertsIter};
pub use state_reverts::StorageRevertsIter;

View File

@@ -1,88 +0,0 @@
use crate::DatabaseProviderRW;
use rayon::slice::ParallelSliceMut;
use reth_db::{tables, Database};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
transaction::DbTxMut,
};
use reth_primitives::{Bytecode, StorageEntry};
use reth_storage_errors::db::DatabaseError;
use revm::db::states::{PlainStorageChangeset, StateChangeset};
/// A change to the state of the world.
#[derive(Debug, Default)]
pub struct StateChanges(pub StateChangeset);
impl From<StateChangeset> for StateChanges {
fn from(revm: StateChangeset) -> Self {
Self(revm)
}
}
impl StateChanges {
/// Write the bundle state to the database.
pub fn write_to_db<DB>(mut self, provider: &DatabaseProviderRW<DB>) -> Result<(), DatabaseError>
where
DB: Database,
{
// sort all entries so they can be written to database in more performant way.
// and take smaller memory footprint.
self.0.accounts.par_sort_by_key(|a| a.0);
self.0.storage.par_sort_by_key(|a| a.address);
self.0.contracts.par_sort_by_key(|a| a.0);
// Write new account state
tracing::trace!(target: "provider::bundle_state", len = self.0.accounts.len(), "Writing new account state");
let mut accounts_cursor = provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
// write account to database.
for (address, account) in self.0.accounts {
if let Some(account) = account {
tracing::trace!(target: "provider::bundle_state", ?address, "Updating plain state account");
accounts_cursor.upsert(address, account.into())?;
} else if accounts_cursor.seek_exact(address)?.is_some() {
tracing::trace!(target: "provider::bundle_state", ?address, "Deleting plain state account");
accounts_cursor.delete_current()?;
}
}
// Write bytecode
tracing::trace!(target: "provider::bundle_state", len = self.0.contracts.len(), "Writing bytecodes");
let mut bytecodes_cursor = provider.tx_ref().cursor_write::<tables::Bytecodes>()?;
for (hash, bytecode) in self.0.contracts {
bytecodes_cursor.upsert(hash, Bytecode(bytecode))?;
}
// Write new storage state and wipe storage if needed.
tracing::trace!(target: "provider::bundle_state", len = self.0.storage.len(), "Writing new storage state");
let mut storages_cursor =
provider.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
for PlainStorageChangeset { address, wipe_storage, storage } in self.0.storage {
// Wiping of storage.
if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
storages_cursor.delete_current_duplicates()?;
}
// cast storages to B256.
let mut storage = storage
.into_iter()
.map(|(k, value)| StorageEntry { key: k.into(), value })
.collect::<Vec<_>>();
// sort storage slots by key.
storage.par_sort_unstable_by_key(|a| a.key);
for entry in storage {
tracing::trace!(target: "provider::bundle_state", ?address, ?entry.key, "Updating plain state storage");
if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
if db_entry.key == entry.key {
storages_cursor.delete_current()?;
}
}
if !entry.value.is_zero() {
storages_cursor.upsert(address, entry)?;
}
}
}
Ok(())
}
}

View File

@@ -1,103 +1,7 @@
use crate::DatabaseProviderRW;
use rayon::slice::ParallelSliceMut;
use reth_db::{tables, Database};
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO, DbDupCursorRW},
models::{AccountBeforeTx, BlockNumberAddress},
transaction::DbTxMut,
};
use reth_primitives::{BlockNumber, StorageEntry, B256, U256};
use reth_storage_errors::db::DatabaseError;
use revm::db::states::{PlainStateReverts, PlainStorageRevert, RevertToSlot};
use reth_primitives::{B256, U256};
use revm::db::states::RevertToSlot;
use std::iter::Peekable;
/// Revert of the state.
#[derive(Debug, Default)]
pub struct StateReverts(pub PlainStateReverts);
impl From<PlainStateReverts> for StateReverts {
fn from(revm: PlainStateReverts) -> Self {
Self(revm)
}
}
impl StateReverts {
/// Write reverts to database.
///
/// `Note::` Reverts will delete all wiped storage from plain state.
pub fn write_to_db<DB>(
self,
provider: &DatabaseProviderRW<DB>,
first_block: BlockNumber,
) -> Result<(), DatabaseError>
where
DB: Database,
{
// Write storage changes
tracing::trace!(target: "provider::reverts", "Writing storage changes");
let mut storages_cursor =
provider.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
let mut storage_changeset_cursor =
provider.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
for (block_index, mut storage_changes) in self.0.storage.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
tracing::trace!(target: "provider::reverts", block_number, "Writing block change");
// sort changes by address.
storage_changes.par_sort_unstable_by_key(|a| a.address);
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
let storage_id = BlockNumberAddress((block_number, address));
let mut storage = storage_revert
.into_iter()
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
.collect::<Vec<_>>();
// sort storage slots by key.
storage.par_sort_unstable_by_key(|a| a.0);
// If we are writing the primary storage wipe transition, the pre-existing plain
// storage state has to be taken from the database and written to storage history.
// See [StorageWipe::Primary] for more details.
let mut wiped_storage = Vec::new();
if wiped {
tracing::trace!(target: "provider::reverts", ?address, "Wiping storage");
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
wiped_storage.push((entry.key, entry.value));
while let Some(entry) = storages_cursor.next_dup_val()? {
wiped_storage.push((entry.key, entry.value))
}
}
}
tracing::trace!(target: "provider::reverts", ?address, ?storage, "Writing storage reverts");
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
}
}
}
// Write account changes
tracing::trace!(target: "provider::reverts", "Writing account changes");
let mut account_changeset_cursor =
provider.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
for (block_index, mut account_block_reverts) in self.0.accounts.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
// Sort accounts by address.
account_block_reverts.par_sort_by_key(|a| a.0);
for (address, info) in account_block_reverts {
account_changeset_cursor.append_dup(
block_number,
AccountBeforeTx { address, info: info.map(Into::into) },
)?;
}
}
Ok(())
}
}
/// Iterator over storage reverts.
/// See [`StorageRevertsIter::next`] for more details.
#[allow(missing_debug_implementations)]

View File

@@ -34,7 +34,6 @@ pub use reth_storage_errors::provider::{ProviderError, ProviderResult};
pub use reth_execution_types::*;
pub mod bundle_state;
pub use bundle_state::{StateChanges, StateReverts};
/// Re-export `OriginalValuesKnown`
pub use revm::db::states::OriginalValuesKnown;

View File

@@ -1,4 +1,5 @@
use crate::{
bundle_state::StorageRevertsIter,
providers::{database::metrics, static_file::StaticFileWriter, StaticFileProvider},
to_range,
traits::{
@@ -10,13 +11,16 @@ use crate::{
FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
HistoricalStateProvider, HistoryWriter, LatestStateProvider, OriginalValuesKnown,
ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, RevertsInit,
StageCheckpointReader, StateProviderBox, StateWriter, StatsReader, StorageReader,
StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
TrieWriter, WithdrawalsProvider,
StageCheckpointReader, StateChangeWriter, StateProviderBox, StateWriter, StatsReader,
StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
};
use itertools::{izip, Itertools};
use rayon::slice::ParallelSliceMut;
use reth_chainspec::{ChainInfo, ChainSpec, EthereumHardforks};
use reth_db::{tables, BlockNumberList, PlainAccountState, PlainStorageState};
use reth_db::{
cursor::DbDupCursorRW, tables, BlockNumberList, PlainAccountState, PlainStorageState,
};
use reth_db_api::{
common::KeyValue,
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, RangeWalker},
@@ -34,10 +38,10 @@ use reth_execution_types::{Chain, ExecutionOutcome};
use reth_network_p2p::headers::downloader::SyncTarget;
use reth_primitives::{
keccak256, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber,
BlockWithSenders, GotExpected, Header, Receipt, Requests, SealedBlock, SealedBlockWithSenders,
SealedHeader, StaticFileSegment, StorageEntry, TransactionMeta, TransactionSigned,
TransactionSignedEcRecovered, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal,
Withdrawals, B256, U256,
BlockWithSenders, Bytecode, GotExpected, Header, Receipt, Requests, SealedBlock,
SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry, TransactionMeta,
TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash, TxHash, TxNumber,
Withdrawal, Withdrawals, B256, U256,
};
use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneModes, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
@@ -49,7 +53,10 @@ use reth_trie::{
HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
};
use reth_trie_db::DatabaseStateRoot;
use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg};
use revm::{
db::states::{PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset},
primitives::{BlockEnv, CfgEnvWithHandlerCfg},
};
use std::{
cmp::Ordering,
collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
@@ -2628,6 +2635,137 @@ impl<TX: DbTx> StorageReader for DatabaseProvider<TX> {
}
}
impl<TX: DbTxMut + DbTx> StateChangeWriter for DatabaseProvider<TX> {
fn write_state_reverts(
&self,
reverts: PlainStateReverts,
first_block: BlockNumber,
) -> ProviderResult<()> {
// Write storage changes
tracing::trace!("Writing storage changes");
let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
let mut storage_changeset_cursor =
self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
tracing::trace!(block_number, "Writing block change");
// sort changes by address.
storage_changes.par_sort_unstable_by_key(|a| a.address);
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
let storage_id = BlockNumberAddress((block_number, address));
let mut storage = storage_revert
.into_iter()
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
.collect::<Vec<_>>();
// sort storage slots by key.
storage.par_sort_unstable_by_key(|a| a.0);
// If we are writing the primary storage wipe transition, the pre-existing plain
// storage state has to be taken from the database and written to storage history.
// See [StorageWipe::Primary] for more details.
let mut wiped_storage = Vec::new();
if wiped {
tracing::trace!(?address, "Wiping storage");
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
wiped_storage.push((entry.key, entry.value));
while let Some(entry) = storages_cursor.next_dup_val()? {
wiped_storage.push((entry.key, entry.value))
}
}
}
tracing::trace!(?address, ?storage, "Writing storage reverts");
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
}
}
}
// Write account changes
tracing::trace!("Writing account changes");
let mut account_changeset_cursor =
self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
// Sort accounts by address.
account_block_reverts.par_sort_by_key(|a| a.0);
for (address, info) in account_block_reverts {
account_changeset_cursor.append_dup(
block_number,
AccountBeforeTx { address, info: info.map(Into::into) },
)?;
}
}
Ok(())
}
fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
// sort all entries so they can be written to database in more performant way.
// and take smaller memory footprint.
changes.accounts.par_sort_by_key(|a| a.0);
changes.storage.par_sort_by_key(|a| a.address);
changes.contracts.par_sort_by_key(|a| a.0);
// Write new account state
tracing::trace!(len = changes.accounts.len(), "Writing new account state");
let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
// write account to database.
for (address, account) in changes.accounts {
if let Some(account) = account {
tracing::trace!(?address, "Updating plain state account");
accounts_cursor.upsert(address, account.into())?;
} else if accounts_cursor.seek_exact(address)?.is_some() {
tracing::trace!(?address, "Deleting plain state account");
accounts_cursor.delete_current()?;
}
}
// Write bytecode
tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
for (hash, bytecode) in changes.contracts {
bytecodes_cursor.upsert(hash, Bytecode(bytecode))?;
}
// Write new storage state and wipe storage if needed.
tracing::trace!(len = changes.storage.len(), "Writing new storage state");
let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
// Wiping of storage.
if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
storages_cursor.delete_current_duplicates()?;
}
// cast storages to B256.
let mut storage = storage
.into_iter()
.map(|(k, value)| StorageEntry { key: k.into(), value })
.collect::<Vec<_>>();
// sort storage slots by key.
storage.par_sort_unstable_by_key(|a| a.key);
for entry in storage {
tracing::trace!(?address, ?entry.key, "Updating plain state storage");
if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
if db_entry.key == entry.key {
storages_cursor.delete_current()?;
}
}
if !entry.value.is_zero() {
storages_cursor.upsert(address, entry)?;
}
}
}
Ok(())
}
}
impl<TX: DbTxMut + DbTx> TrieWriter for DatabaseProvider<TX> {
/// Writes trie updates. Returns the number of entries modified.
fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {

View File

@@ -16,7 +16,7 @@ mod header_sync_gap;
pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider};
mod state;
pub use state::StateWriter;
pub use state::{StateChangeWriter, StateWriter};
mod spec;
pub use spec::ChainSpecProvider;

View File

@@ -1,6 +1,10 @@
use reth_execution_types::ExecutionOutcome;
use reth_primitives::BlockNumber;
use reth_storage_errors::provider::ProviderResult;
use revm::db::OriginalValuesKnown;
use revm::db::{
states::{PlainStateReverts, StateChangeset},
OriginalValuesKnown,
};
/// A helper trait for [`ExecutionOutcome`] to write state and receipts to storage.
pub trait StateWriter {
@@ -12,3 +16,18 @@ pub trait StateWriter {
is_value_known: OriginalValuesKnown,
) -> ProviderResult<()>;
}
/// A trait specifically for writing state changes or reverts
pub trait StateChangeWriter {
/// Write state reverts to the database.
///
/// NOTE: Reverts will delete all wiped storage from plain state.
fn write_state_reverts(
&self,
reverts: PlainStateReverts,
first_block: BlockNumber,
) -> ProviderResult<()>;
/// Write state changes to the database.
fn write_state_changes(&self, changes: StateChangeset) -> ProviderResult<()>;
}

View File

@@ -1,6 +1,6 @@
use crate::{
providers::StaticFileProviderRWRefMut, DatabaseProviderRW, StateChanges, StateReverts,
StateWriter, TrieWriter,
providers::StaticFileProviderRWRefMut, DatabaseProviderRW, StateChangeWriter, StateWriter,
TrieWriter,
};
use itertools::Itertools;
use reth_db::{
@@ -334,14 +334,14 @@ impl<'a, 'b, DB: Database> StateWriter for StorageWriter<'a, 'b, DB> {
let (plain_state, reverts) =
execution_outcome.bundle.into_plain_state_and_reverts(is_value_known);
StateReverts(reverts).write_to_db(self.database_writer(), execution_outcome.first_block)?;
self.database_writer().write_state_reverts(reverts, execution_outcome.first_block)?;
self.append_receipts_from_blocks(
execution_outcome.first_block,
execution_outcome.receipts.into_iter(),
)?;
StateChanges(plain_state).write_to_db(self.database_writer())?;
self.database_writer().write_state_changes(plain_state)?;
Ok(())
}
@@ -473,12 +473,10 @@ mod tests {
let plain_state = revm_bundle_state.into_plain_state(OriginalValuesKnown::Yes);
assert!(plain_state.storage.is_empty());
assert!(plain_state.contracts.is_empty());
StateChanges(plain_state)
.write_to_db(&provider)
.expect("Could not write plain state to DB");
provider.write_state_changes(plain_state).expect("Could not write plain state to DB");
assert_eq!(reverts.storage, [[]]);
StateReverts(reverts).write_to_db(&provider, 1).expect("Could not write reverts to DB");
provider.write_state_reverts(reverts, 1).expect("Could not write reverts to DB");
let reth_account_a = account_a.into();
let reth_account_b = account_b.into();
@@ -537,15 +535,13 @@ mod tests {
[PlainStorageChangeset { address: address_b, wipe_storage: true, storage: vec![] }]
);
assert!(plain_state.contracts.is_empty());
StateChanges(plain_state)
.write_to_db(&provider)
.expect("Could not write plain state to DB");
provider.write_state_changes(plain_state).expect("Could not write plain state to DB");
assert_eq!(
reverts.storage,
[[PlainStorageRevert { address: address_b, wiped: true, storage_revert: vec![] }]]
);
StateReverts(reverts).write_to_db(&provider, 2).expect("Could not write reverts to DB");
provider.write_state_reverts(reverts, 2).expect("Could not write reverts to DB");
// Check new plain state for account B
assert_eq!(