diff --git a/Cargo.lock b/Cargo.lock index 61d9437205..8d144c0183 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5160,6 +5160,7 @@ name = "reth-provider" version = "0.1.0" dependencies = [ "auto_impl", + "derive_more", "itertools", "parking_lot 0.12.1", "pin-project", diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index 3ab0f1a6c9..8d6995559f 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -23,7 +23,10 @@ use reth_staged_sync::{ }; use reth_stages::{ prelude::*, - stages::{ExecutionStage, HeaderSyncMode, SenderRecoveryStage, TotalDifficultyStage}, + stages::{ + ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage, + TotalDifficultyStage, + }, }; use std::{path::PathBuf, sync::Arc}; use tokio::sync::watch; @@ -171,7 +174,14 @@ impl ImportCommand { .set(SenderRecoveryStage { commit_threshold: config.stages.sender_recovery.commit_threshold, }) - .set(ExecutionStage::new(factory, config.stages.execution.commit_threshold)), + .set(ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: config.stages.execution.max_blocks, + max_changes: config.stages.execution.max_changes, + max_changesets: config.stages.execution.max_changesets, + }, + )), ) .build(db); diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs index bc85223adc..f50b7b01b6 100644 --- a/bin/reth/src/dump_stage/merkle.rs +++ b/bin/reth/src/dump_stage/merkle.rs @@ -4,7 +4,10 @@ use reth_db::{database::Database, table::TableImporter, tables}; use reth_primitives::{BlockNumber, MAINNET}; use reth_provider::Transaction; use reth_stages::{ - stages::{AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage}, + stages::{ + AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, + StorageHashingStage, + }, Stage, StageId, UnwindInput, }; use std::{ops::DerefMut, path::PathBuf, sync::Arc}; @@ -58,8 +61,14 @@ async fn unwind_and_copy( MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?; // Bring Plainstate to TO (hashing stage execution requires it) - let mut exec_stage = - ExecutionStage::new(reth_revm::Factory::new(Arc::new(MAINNET.clone())), u64::MAX); + let mut exec_stage = ExecutionStage::new( + reth_revm::Factory::new(Arc::new(MAINNET.clone())), + ExecutionStageThresholds { + max_blocks: Some(u64::MAX), + max_changes: None, + max_changesets: None, + }, + ); exec_stage .unwind( diff --git a/bin/reth/src/merkle_debug.rs b/bin/reth/src/merkle_debug.rs index 853921eb27..2772f380a2 100644 --- a/bin/reth/src/merkle_debug.rs +++ b/bin/reth/src/merkle_debug.rs @@ -7,8 +7,9 @@ use reth_provider::Transaction; use reth_staged_sync::utils::{chainspec::genesis_value_parser, init::init_db}; use reth_stages::{ stages::{ - AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage, ACCOUNT_HASHING, - EXECUTION, MERKLE_EXECUTION, SENDER_RECOVERY, STORAGE_HASHING, + AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, + StorageHashingStage, ACCOUNT_HASHING, EXECUTION, MERKLE_EXECUTION, SENDER_RECOVERY, + STORAGE_HASHING, }, ExecInput, Stage, }; @@ -82,7 +83,14 @@ impl Command { MERKLE_EXECUTION.get_progress(tx.deref())?.unwrap_or_default()); let factory = reth_revm::Factory::new(self.chain.clone()); - let mut execution_stage = ExecutionStage::new(factory, 1); + let mut execution_stage = ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: Some(1), + max_changes: None, + max_changesets: None, + }, + ); let mut account_hashing_stage = AccountHashingStage::default(); let mut storage_hashing_stage = StorageHashingStage::default(); diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index e1309edc69..0ae6d01a1a 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -54,7 +54,10 @@ use reth_staged_sync::{ }; use reth_stages::{ prelude::*, - stages::{ExecutionStage, HeaderSyncMode, SenderRecoveryStage, TotalDifficultyStage, FINISH}, + stages::{ + ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage, + TotalDifficultyStage, FINISH, + }, }; use reth_tasks::TaskExecutor; use reth_transaction_pool::{EthTransactionValidator, TransactionPool}; @@ -689,7 +692,14 @@ impl Command { .set(SenderRecoveryStage { commit_threshold: stage_conf.sender_recovery.commit_threshold, }) - .set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)) + .set(ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: stage_conf.execution.max_blocks, + max_changes: stage_conf.execution.max_changes, + max_changesets: stage_conf.execution.max_changesets, + }, + )) .disable_if(MERKLE_UNWIND, || self.auto_mine) .disable_if(MERKLE_EXECUTION, || self.auto_mine), ) diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index a090bb9d3e..fe365d2654 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -16,7 +16,10 @@ use reth_staged_sync::{ Config, }; use reth_stages::{ - stages::{BodyStage, ExecutionStage, MerkleStage, SenderRecoveryStage, TransactionLookupStage}, + stages::{ + BodyStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, SenderRecoveryStage, + TransactionLookupStage, + }, ExecInput, Stage, StageId, UnwindInput, }; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; @@ -181,7 +184,14 @@ impl Command { } StageEnum::Execution => { let factory = reth_revm::Factory::new(self.chain.clone()); - let mut stage = ExecutionStage::new(factory, num_blocks); + let mut stage = ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: Some(num_blocks), + max_changes: None, + max_changesets: None, + }, + ); if !self.skip_unwind { stage.unwind(&mut tx, unwind).await?; } diff --git a/bin/reth/src/test_eth_chain/runner.rs b/bin/reth/src/test_eth_chain/runner.rs index 9d6118ab18..3351ecf433 100644 --- a/bin/reth/src/test_eth_chain/runner.rs +++ b/bin/reth/src/test_eth_chain/runner.rs @@ -193,7 +193,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result { // Initialize the execution stage // Hardcode the chain_id to Ethereum 1. let factory = reth_revm::Factory::new(Arc::new(chain_spec)); - let mut stage = ExecutionStage::new(factory, 1_000); + let mut stage = ExecutionStage::new_with_factory(factory); // Call execution stage let input = ExecInput { diff --git a/crates/primitives/src/constants.rs b/crates/primitives/src/constants.rs index b69aff60b9..225b59c0d2 100644 --- a/crates/primitives/src/constants.rs +++ b/crates/primitives/src/constants.rs @@ -50,6 +50,9 @@ pub const FINNEY_TO_WEI: u128 = (GWEI_TO_WEI as u128) * 1_000_000; /// Multiplier for converting ether to wei. pub const ETH_TO_WEI: u128 = FINNEY_TO_WEI * 1000; +/// Multiplier for converting mgas to gas. +pub const MGAS_TO_GAS: u64 = 1_000_000u64; + /// The Ethereum mainnet genesis hash. pub const MAINNET_GENESIS: H256 = H256(hex!("d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3")); diff --git a/crates/revm/src/executor.rs b/crates/revm/src/executor.rs index 3486c63129..cfc58bae4c 100644 --- a/crates/revm/src/executor.rs +++ b/crates/revm/src/executor.rs @@ -800,8 +800,8 @@ mod tests { // Clone and sort to make the test deterministic assert_eq!( - post_state.account_changes(), - &BTreeMap::from([( + post_state.account_changes().inner, + BTreeMap::from([( block.number, BTreeMap::from([ // New account @@ -815,8 +815,8 @@ mod tests { "Account changeset did not match" ); assert_eq!( - post_state.storage_changes(), - &BTreeMap::from([( + post_state.storage_changes().inner, + BTreeMap::from([( block.number, BTreeMap::from([( account1, diff --git a/crates/staged-sync/src/config.rs b/crates/staged-sync/src/config.rs index e47d31551c..0f6b2a16b8 100644 --- a/crates/staged-sync/src/config.rs +++ b/crates/staged-sync/src/config.rs @@ -154,13 +154,25 @@ impl Default for SenderRecoveryConfig { /// Execution stage configuration. #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] pub struct ExecutionConfig { - /// The maximum number of blocks to execution before committing progress to the database. - pub commit_threshold: u64, + /// The maximum number of blocks to process before the execution stage commits. + pub max_blocks: Option, + /// The maximum amount of state changes to keep in memory before the execution stage commits. + pub max_changes: Option, + /// The maximum amount of changesets to keep in memory before they are written to the pending + /// database transaction. + /// + /// If this is lower than `max_gas`, then history is periodically flushed to the database + /// transaction, which frees up memory. + pub max_changesets: Option, } impl Default for ExecutionConfig { fn default() -> Self { - Self { commit_threshold: 5_000 } + Self { + max_blocks: Some(500_000), + max_changes: Some(5_000_000), + max_changesets: Some(1_000_000), + } } } diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index 6062cf80ee..a6d882d0b3 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -247,7 +247,7 @@ impl StageSet for ExecutionStages { fn builder(self) -> StageSetBuilder { StageSetBuilder::default() .add_stage(SenderRecoveryStage::default()) - .add_stage(ExecutionStage::new(self.executor_factory, 10_000)) + .add_stage(ExecutionStage::new_with_factory(self.executor_factory)) } } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index e489a4c909..700b678eab 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -8,7 +8,9 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_metrics_derive::Metrics; -use reth_primitives::{Block, BlockNumber, BlockWithSenders, U256}; +use reth_primitives::{ + constants::MGAS_TO_GAS, Block, BlockNumber, BlockWithSenders, TransactionSigned, U256, +}; use reth_provider::{ post_state::PostState, BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction, }; @@ -60,25 +62,21 @@ pub struct ExecutionStage { metrics: ExecutionStageMetrics, /// The stage's internal executor executor_factory: EF, - /// Commit threshold - commit_threshold: u64, + /// The commit thresholds of the execution stage. + thresholds: ExecutionStageThresholds, } impl ExecutionStage { /// Create new execution stage with specified config. - pub fn new(executor_factory: EF, commit_threshold: u64) -> Self { - Self { metrics: ExecutionStageMetrics::default(), executor_factory, commit_threshold } + pub fn new(executor_factory: EF, thresholds: ExecutionStageThresholds) -> Self { + Self { metrics: ExecutionStageMetrics::default(), executor_factory, thresholds } } /// Create an execution stage with the provided executor factory. /// /// The commit threshold will be set to 10_000. pub fn new_with_factory(executor_factory: EF) -> Self { - Self { - metrics: ExecutionStageMetrics::default(), - executor_factory, - commit_threshold: 10_000, - } + Self::new(executor_factory, ExecutionStageThresholds::default()) } // TODO: This should be in the block provider trait once we consolidate @@ -92,14 +90,42 @@ impl ExecutionStage { let ommers = tx.get::(block_number)?.unwrap_or_default().ommers; let withdrawals = tx.get::(block_number)?.map(|v| v.withdrawals); - let (transactions, senders): (Vec<_>, Vec<_>) = tx - .get_block_transaction_range(block_number..=block_number)? - .into_iter() - .flat_map(|(_, txs)| txs.into_iter()) - .map(|tx| tx.to_components()) - .unzip(); + // Get the block body + let body = tx.get::(block_number)?.unwrap(); + let tx_range = body.tx_num_range(); - Ok((Block { header, body: transactions, ommers, withdrawals }.with_senders(senders), td)) + // Get the transactions in the body + let (transactions, senders) = if tx_range.is_empty() { + (Vec::new(), Vec::new()) + } else { + let transactions = tx + .cursor_read::()? + .walk_range(tx_range.clone())? + .map(|entry| entry.map(|tx| tx.1)) + .collect::, _>>()?; + + let senders = tx + .cursor_read::()? + .walk_range(tx_range)? + .map(|entry| entry.map(|sender| sender.1)) + .collect::, _>>()?; + + (transactions, senders) + }; + + let body = transactions + .into_iter() + .map(|tx| { + TransactionSigned { + // TODO: This is the fastest way right now to make everything just work with + // a dummy transaction hash. + hash: Default::default(), + signature: tx.signature, + transaction: tx.transaction, + } + }) + .collect(); + Ok((Block { header, body, ommers, withdrawals }.with_senders(senders), td)) } /// Execute the stage. @@ -108,37 +134,59 @@ impl ExecutionStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); + let start_block = input.stage_progress() + 1; + let max_block = input.previous_stage_progress(); - // Create state provider with cached state + // Build executor let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx)); - // Fetch transactions, execute them and generate results + // Progress tracking + let mut progress = start_block; + + // Execute block range let mut state = PostState::default(); - for block_number in range.clone() { + for block_number in start_block..=max_block { let (block, td) = Self::read_block_with_senders(tx, block_number)?; // Configure the executor to use the current state. trace!(target: "sync::stages::execution", number = block_number, txs = block.body.len(), "Executing block"); + + // Execute the block let (block, senders) = block.into_components(); let block_state = executor .execute_and_verify_receipt(&block, td, Some(senders)) .map_err(|error| StageError::ExecutionError { block: block_number, error })?; - if let Some(last_receipt) = block_state.receipts().last() { - self.metrics - .mgas_processed_total - .increment(last_receipt.cumulative_gas_used as f64 / 1_000_000.); + + // Gas metrics + self.metrics + .mgas_processed_total + .increment(block.header.gas_used as f64 / MGAS_TO_GAS as f64); + + // Write history periodically to free up memory + if self.thresholds.should_write_history(state.changeset_size() as u64) { + info!(target: "sync::stages::execution", ?block_number, "Writing history."); + state.write_history_to_db(&**tx)?; + info!(target: "sync::stages::execution", ?block_number, "Wrote history."); + // gas_since_history_write = 0; } + + // Check if we should commit now + if self.thresholds.is_end_of_batch(block_number - start_block, state.size() as u64) { + info!(target: "sync::stages::execution", ?block_number, "Threshold hit, committing."); + break + } + + // Merge state changes state.extend(block_state); + progress = block_number; } + // Write remaining changes let start = Instant::now(); trace!(target: "sync::stages::execution", accounts = state.accounts().len(), "Writing updated state to database"); state.write_to_db(&**tx)?; trace!(target: "sync::stages::execution", took = ?Instant::now().duration_since(start), "Wrote state"); - - info!(target: "sync::stages::execution", stage_progress = *range.end(), is_final_range, "Sync iteration finished"); - Ok(ExecOutput { stage_progress: *range.end(), done: is_final_range }) + Ok(ExecOutput { stage_progress: progress, done: progress == max_block }) } } @@ -258,6 +306,52 @@ impl Stage for ExecutionStage { } } +/// The thresholds at which the execution stage writes state changes to the database. +/// +/// If either of the thresholds (`max_blocks` and `max_changes`) are hit, then the execution stage +/// commits all pending changes to the database. +/// +/// A third threshold, `max_changesets`, can be set to periodically write changesets to the +/// current database transaction, which frees up memory. +#[derive(Debug)] +pub struct ExecutionStageThresholds { + /// The maximum number of blocks to process before the execution stage commits. + pub max_blocks: Option, + /// The maximum amount of state changes to keep in memory before the execution stage commits. + pub max_changes: Option, + /// The maximum amount of changesets to keep in memory before they are written to the pending + /// database transaction. + /// + /// If this is lower than `max_changes`, then history is periodically flushed to the database + /// transaction, which frees up memory. + pub max_changesets: Option, +} + +impl Default for ExecutionStageThresholds { + fn default() -> Self { + Self { + max_blocks: Some(500_000), + max_changes: Some(5_000_000), + max_changesets: Some(1_000_000), + } + } +} + +impl ExecutionStageThresholds { + /// Check if the batch thresholds have been hit. + #[inline] + pub fn is_end_of_batch(&self, blocks_processed: u64, changes_processed: u64) -> bool { + blocks_processed >= self.max_blocks.unwrap_or(u64::MAX) || + changes_processed >= self.max_changes.unwrap_or(u64::MAX) + } + + /// Check if the history write threshold has been hit. + #[inline] + pub fn should_write_history(&self, history_changes: u64) -> bool { + history_changes >= self.max_changesets.unwrap_or(u64::MAX) + } +} + #[cfg(test)] mod tests { use super::*; @@ -281,7 +375,14 @@ mod tests { fn stage() -> ExecutionStage { let factory = Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())); - ExecutionStage::new(factory, 100) + ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: Some(100), + max_changes: None, + max_changesets: None, + }, + ) } #[tokio::test] diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index c93219c5ec..4608d13abe 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -27,6 +27,7 @@ thiserror = "1.0.37" auto_impl = "1.0" itertools = "0.10" pin-project = "1.0" +derive_more = "0.99" # test-utils reth-rlp = { path = "../../rlp", optional = true } diff --git a/crates/storage/provider/src/post_state/account.rs b/crates/storage/provider/src/post_state/account.rs new file mode 100644 index 0000000000..e50f42789f --- /dev/null +++ b/crates/storage/provider/src/post_state/account.rs @@ -0,0 +1,73 @@ +use derive_more::Deref; +use reth_primitives::{Account, Address, BlockNumber}; +use std::collections::{btree_map::Entry, BTreeMap}; + +/// A mapping of `block -> address -> account` that represents what accounts were changed, and what +/// their state were prior to that change. +/// +/// If the prior state was `None`, then the account is new. +#[derive(Default, Clone, Eq, PartialEq, Debug, Deref)] +pub struct AccountChanges { + /// The inner mapping of block changes. + #[deref] + pub inner: BTreeMap>>, + /// Hand tracked change size. + pub size: usize, +} + +impl AccountChanges { + /// Insert account change at specified block number. The value is **not** updated if it already + /// exists. + pub fn insert(&mut self, block: BlockNumber, address: Address, account: Option) { + if let Entry::Vacant(entry) = self.inner.entry(block).or_default().entry(address) { + self.size += 1; + entry.insert(account); + } + } + + /// Insert account changes at specified block number. The values are **not** updated if they + /// already exist. + pub fn insert_for_block( + &mut self, + block: BlockNumber, + changes: BTreeMap>, + ) { + let block_entry = self.inner.entry(block).or_default(); + for (address, account) in changes { + if let Entry::Vacant(entry) = block_entry.entry(address) { + entry.insert(account); + self.size += 1; + } + } + } + + /// Drain and return any entries above the target block number. + pub fn drain_above( + &mut self, + target_block: BlockNumber, + ) -> BTreeMap>> { + let mut evicted = BTreeMap::new(); + self.inner.retain(|block_number, accounts| { + if *block_number > target_block { + self.size -= accounts.len(); + evicted.insert(*block_number, accounts.clone()); + false + } else { + true + } + }); + evicted + } + + /// Retain entries only above specified block number. + pub fn retain_above(&mut self, target_block: BlockNumber) { + self.inner.retain(|block_number, accounts| { + if *block_number > target_block { + true + } else { + self.size -= accounts.len(); + false + } + }); + } +} diff --git a/crates/storage/provider/src/post_state.rs b/crates/storage/provider/src/post_state/mod.rs similarity index 89% rename from crates/storage/provider/src/post_state.rs rename to crates/storage/provider/src/post_state/mod.rs index 53c879727f..7ee9d57779 100644 --- a/crates/storage/provider/src/post_state.rs +++ b/crates/storage/provider/src/post_state/mod.rs @@ -16,55 +16,11 @@ use reth_trie::{ }; use std::collections::BTreeMap; -/// Storage for an account with the old and new values for each slot: (slot -> (old, new)). -pub type StorageChangeset = BTreeMap; +mod account; +pub use account::AccountChanges; -/// A mapping of `block -> address -> account` that represents what accounts were changed, and what -/// their state were prior to that change. -/// -/// If the prior state was `None`, then the account is new. -pub type AccountChanges = BTreeMap>>; - -/// A mapping of `block -> account -> slot -> old value` that represents what slots were changed, -/// and what their values were prior to that change. -pub type StorageChanges = BTreeMap>; - -/// Changed storage state for the account. -/// -/// # Wiped Storage -/// -/// The field `wiped` denotes whether the pre-existing storage in the database should be cleared or -/// not. -#[derive(Debug, Default, Clone, Eq, PartialEq)] -pub struct ChangedStorage { - /// Whether the storage was wiped or not. - pub wiped: bool, - /// The storage slots. - pub storage: BTreeMap, -} - -/// Latest storage state for the account. -/// -/// # Wiped Storage -/// -/// The `times_wiped` field indicates the number of times the storage was wiped in this poststate. -/// -/// If `times_wiped` is greater than 0, then the account was selfdestructed at some point, and the -/// values contained in `storage` should be the only values written to the database. -#[derive(Debug, Default, Clone, Eq, PartialEq)] -pub struct Storage { - /// The number of times the storage was wiped. - pub times_wiped: u64, - /// The storage slots. - pub storage: BTreeMap, -} - -impl Storage { - /// Returns `true` if the storage was wiped at any point. - pub fn wiped(&self) -> bool { - self.times_wiped > 0 - } -} +mod storage; +pub use storage::{ChangedStorage, Storage, StorageChanges, StorageChangeset}; // todo: rewrite all the docs for this /// The state of accounts after execution of one or more transactions, including receipts and new @@ -135,6 +91,18 @@ impl PostState { Self { receipts: Vec::with_capacity(txs), ..Default::default() } } + /// Return the current size of the poststate. + /// + /// Size is the sum of individual changes to accounts, storage, bytecode and receipts. + pub fn size(&self) -> usize { + self.accounts.len() + self.bytecode.len() + self.receipts.len() + self.changeset_size() + } + + /// Return the current size of history changes in the poststate. + pub fn changeset_size(&self) -> usize { + self.account_changes.size + self.storage_changes.size + } + /// Get the latest state of all changed accounts. pub fn accounts(&self) -> &BTreeMap> { &self.accounts @@ -289,32 +257,21 @@ impl PostState { } // Insert account change sets - for (block_number, account_changes) in std::mem::take(&mut other.account_changes) { - let block = self.account_changes.entry(block_number).or_default(); - for (address, account) in account_changes { - if block.contains_key(&address) { - continue - } - block.insert(address, account); - } + for (block_number, account_changes) in std::mem::take(&mut other.account_changes).inner { + self.account_changes.insert_for_block(block_number, account_changes); } // Insert storage change sets - for (block_number, storage_changes) in std::mem::take(&mut other.storage_changes) { + for (block_number, storage_changes) in std::mem::take(&mut other.storage_changes).inner { for (address, their_storage) in storage_changes { - let our_storage = self - .storage_changes - .entry(block_number) - .or_default() - .entry(address) - .or_default(); - if their_storage.wiped { - our_storage.wiped = true; - } - for (slot, value) in their_storage.storage { - our_storage.storage.entry(slot).or_insert(value); + self.storage_changes.set_wiped(block_number, address); } + self.storage_changes.insert_for_block_and_address( + block_number, + address, + their_storage.storage.into_iter(), + ); } } self.receipts.extend(other.receipts); @@ -325,28 +282,12 @@ impl PostState { /// /// The reverted changes are removed from this post-state, and their effects are reverted. pub fn revert_to(&mut self, target_block_number: BlockNumber) { - let mut account_changes_to_revert = BTreeMap::new(); - self.account_changes.retain(|block_number, accounts| { - if *block_number > target_block_number { - account_changes_to_revert.insert(*block_number, accounts.clone()); - false - } else { - true - } - }); + let account_changes_to_revert = self.account_changes.drain_above(target_block_number); for (_, accounts) in account_changes_to_revert.into_iter().rev() { self.accounts.extend(accounts); } - let mut storage_changes_to_revert = BTreeMap::new(); - self.storage_changes.retain(|block_number, storages| { - if *block_number > target_block_number { - storage_changes_to_revert.insert(*block_number, storages.clone()); - false - } else { - true - } - }); + let storage_changes_to_revert = self.storage_changes.drain_above(target_block_number); for (_, storages) in storage_changes_to_revert.into_iter().rev() { for (address, storage) in storages { self.storage.entry(address).and_modify(|head_storage| { @@ -379,12 +320,8 @@ impl PostState { self.revert_to(revert_to_block); // Remove all changes in the returned post-state that were not reverted - non_reverted_state - .storage_changes - .retain(|block_number, _| *block_number > revert_to_block); - non_reverted_state - .account_changes - .retain(|block_number, _| *block_number > revert_to_block); + non_reverted_state.storage_changes.retain_above(revert_to_block); + non_reverted_state.account_changes.retain_above(revert_to_block); non_reverted_state } @@ -397,7 +334,7 @@ impl PostState { account: Account, ) { self.accounts.insert(address, Some(account)); - self.account_changes.entry(block_number).or_default().entry(address).or_insert(None); + self.account_changes.insert(block_number, address, None); } /// Add a changed account to the post-state. @@ -412,7 +349,7 @@ impl PostState { new: Account, ) { self.accounts.insert(address, Some(new)); - self.account_changes.entry(block_number).or_default().entry(address).or_insert(Some(old)); + self.account_changes.insert(block_number, address, Some(old)); } /// Mark an account as destroyed. @@ -423,17 +360,12 @@ impl PostState { account: Account, ) { self.accounts.insert(address, None); - self.account_changes - .entry(block_number) - .or_default() - .entry(address) - .or_insert(Some(account)); + self.account_changes.insert(block_number, address, Some(account)); + let storage = self.storage.entry(address).or_default(); storage.times_wiped += 1; storage.storage.clear(); - let storage_changes = - self.storage_changes.entry(block_number).or_default().entry(address).or_default(); - storage_changes.wiped = true; + self.storage_changes.set_wiped(block_number, address); } /// Add changed storage values to the post-state. @@ -448,11 +380,11 @@ impl PostState { .or_default() .storage .extend(changeset.iter().map(|(slot, (_, new))| (*slot, *new))); - let storage_changes = - self.storage_changes.entry(block_number).or_default().entry(address).or_default(); - for (slot, (old, _)) in changeset.into_iter() { - storage_changes.storage.entry(slot).or_insert(old); - } + self.storage_changes.insert_for_block_and_address( + block_number, + address, + changeset.into_iter().map(|(slot, (old, _))| (slot, old)), + ); } /// Add new bytecode to the post-state. @@ -472,12 +404,17 @@ impl PostState { self.receipts.push(receipt); } - /// Write the post state to the database. - pub fn write_to_db<'a, TX: DbTxMut<'a> + DbTx<'a>>(self, tx: &TX) -> Result<(), DbError> { + /// Write changeset history to the database. + pub fn write_history_to_db<'a, TX: DbTxMut<'a> + DbTx<'a>>( + &mut self, + tx: &TX, + ) -> Result<(), DbError> { // Write account changes tracing::trace!(target: "provider::post_state", "Writing account changes"); let mut account_changeset_cursor = tx.cursor_dup_write::()?; - for (block_number, account_changes) in self.account_changes.into_iter() { + for (block_number, account_changes) in + std::mem::take(&mut self.account_changes).inner.into_iter() + { for (address, info) in account_changes.into_iter() { tracing::trace!(target: "provider::post_state", block_number, ?address, old = ?info, "Account changed"); account_changeset_cursor @@ -489,7 +426,9 @@ impl PostState { tracing::trace!(target: "provider::post_state", "Writing storage changes"); let mut storages_cursor = tx.cursor_dup_write::()?; let mut storage_changeset_cursor = tx.cursor_dup_write::()?; - for (block_number, storage_changes) in self.storage_changes.into_iter() { + for (block_number, storage_changes) in + std::mem::take(&mut self.storage_changes).inner.into_iter() + { for (address, mut storage) in storage_changes.into_iter() { let storage_id = BlockNumberAddress((block_number, address)); @@ -514,7 +453,15 @@ impl PostState { } } + Ok(()) + } + + /// Write the post state to the database. + pub fn write_to_db<'a, TX: DbTxMut<'a> + DbTx<'a>>(mut self, tx: &TX) -> Result<(), DbError> { + self.write_history_to_db(tx)?; + // Write new storage state + let mut storages_cursor = tx.cursor_dup_write::()?; for (address, storage) in self.storage.into_iter() { // If the storage was wiped, remove all previous entries from the database. if storage.wiped() { @@ -526,7 +473,7 @@ impl PostState { for (key, value) in storage.storage { tracing::trace!(target: "provider::post_state", ?address, ?key, "Updating plain state storage"); - let key = H256(key.to_be_bytes()); + let key: H256 = key.into(); if let Some(entry) = storages_cursor.seek_by_key_subkey(address, key)? { if entry.key == key { storages_cursor.delete_current()?; @@ -968,8 +915,8 @@ mod tests { // The value in the changeset for the account should be `None` since this was an account // creation assert_eq!( - state.account_changes(), - &BTreeMap::from([(block, BTreeMap::from([(address, None)]))]), + state.account_changes().inner, + BTreeMap::from([(block, BTreeMap::from([(address, None)]))]), "The changeset for the account is incorrect" ); @@ -990,8 +937,8 @@ mod tests { // The value in the changeset for the account should still be `None` assert_eq!( - state.account_changes(), - &BTreeMap::from([(block, BTreeMap::from([(address, None)]))]), + state.account_changes().inner, + BTreeMap::from([(block, BTreeMap::from([(address, None)]))]), "The changeset for the account is incorrect" ); @@ -1046,8 +993,8 @@ mod tests { // Slot 0: 0 (the value before the first tx in the block) // Slot 1: 3 assert_eq!( - state.storage_changes(), - &BTreeMap::from([( + state.storage_changes().inner, + BTreeMap::from([( block, BTreeMap::from([( address, @@ -1113,13 +1060,13 @@ mod tests { BTreeMap::from([(U256::from(0), (U256::from(0), U256::from(1)))]), ); assert_eq!( - a.account_changes(), - &BTreeMap::from([(block, BTreeMap::from([(address, None)]))]), + a.account_changes().inner, + BTreeMap::from([(block, BTreeMap::from([(address, None)]))]), "The changeset for the account is incorrect in state A" ); assert_eq!( - a.storage_changes(), - &BTreeMap::from([( + a.storage_changes().inner, + BTreeMap::from([( block, BTreeMap::from([( address, @@ -1150,8 +1097,8 @@ mod tests { BTreeMap::from([(U256::from(0), (U256::from(1), U256::from(2)))]), ); assert_eq!( - b.account_changes(), - &BTreeMap::from([( + b.account_changes().inner, + BTreeMap::from([( block, BTreeMap::from([( address, @@ -1161,8 +1108,8 @@ mod tests { "The changeset for the account is incorrect in state B" ); assert_eq!( - b.storage_changes(), - &BTreeMap::from([( + b.storage_changes().inner, + BTreeMap::from([( block, BTreeMap::from([( address, @@ -1190,13 +1137,13 @@ mod tests { // Storage: // - Slot 0: 2 assert_eq!( - a.account_changes(), - &BTreeMap::from([(block, BTreeMap::from([(address, None)]))]), + a.account_changes().inner, + BTreeMap::from([(block, BTreeMap::from([(address, None)]))]), "The changeset for the account is incorrect in the merged state" ); assert_eq!( - a.storage_changes(), - &BTreeMap::from([( + a.storage_changes().inner, + BTreeMap::from([( block, BTreeMap::from([( address, diff --git a/crates/storage/provider/src/post_state/storage.rs b/crates/storage/provider/src/post_state/storage.rs new file mode 100644 index 0000000000..efb54b1972 --- /dev/null +++ b/crates/storage/provider/src/post_state/storage.rs @@ -0,0 +1,114 @@ +use derive_more::Deref; +use reth_primitives::{Address, BlockNumber, U256}; +use std::collections::{btree_map::Entry, BTreeMap}; + +/// Storage for an account with the old and new values for each slot: (slot -> (old, new)). +pub type StorageChangeset = BTreeMap; + +/// Changed storage state for the account. +/// +/// # Wiped Storage +/// +/// The field `wiped` denotes whether the pre-existing storage in the database should be cleared or +/// not. +#[derive(Debug, Default, Clone, Eq, PartialEq)] +pub struct ChangedStorage { + /// Whether the storage was wiped or not. + pub wiped: bool, + /// The storage slots. + pub storage: BTreeMap, +} + +/// Latest storage state for the account. +/// +/// # Wiped Storage +/// +/// The `times_wiped` field indicates the number of times the storage was wiped in this poststate. +/// +/// If `times_wiped` is greater than 0, then the account was selfdestructed at some point, and the +/// values contained in `storage` should be the only values written to the database. +#[derive(Debug, Default, Clone, Eq, PartialEq)] +pub struct Storage { + /// The number of times the storage was wiped. + pub times_wiped: u64, + /// The storage slots. + pub storage: BTreeMap, +} + +impl Storage { + /// Returns `true` if the storage was wiped at any point. + pub fn wiped(&self) -> bool { + self.times_wiped > 0 + } +} + +/// A mapping of `block -> account -> slot -> old value` that represents what slots were changed, +/// and what their values were prior to that change. +#[derive(Default, Clone, Eq, PartialEq, Debug, Deref)] +pub struct StorageChanges { + /// The inner mapping of block changes. + #[deref] + pub inner: BTreeMap>, + /// Hand tracked change size. + pub size: usize, +} + +impl StorageChanges { + /// Set storage `wiped` flag for specified block number and address. + pub fn set_wiped(&mut self, block: BlockNumber, address: Address) { + self.inner.entry(block).or_default().entry(address).or_default().wiped = true; + } + + /// Insert storage entries for specified block number and address. + pub fn insert_for_block_and_address( + &mut self, + block: BlockNumber, + address: Address, + storage: I, + ) where + I: Iterator, + { + let block_entry = self.inner.entry(block).or_default(); + let storage_entry = block_entry.entry(address).or_default(); + for (slot, value) in storage { + if let Entry::Vacant(entry) = storage_entry.storage.entry(slot) { + entry.insert(value); + self.size += 1; + } + } + } + + /// Drain and return any entries above the target block number. + pub fn drain_above( + &mut self, + target_block: BlockNumber, + ) -> BTreeMap> { + let mut evicted = BTreeMap::new(); + self.inner.retain(|block_number, storages| { + if *block_number > target_block { + // This is fine, because it's called only on post state splits + self.size -= + storages.iter().fold(0, |acc, (_, storage)| acc + storage.storage.len()); + evicted.insert(*block_number, storages.clone()); + false + } else { + true + } + }); + evicted + } + + /// Retain entries only above specified block number. + pub fn retain_above(&mut self, target_block: BlockNumber) { + self.inner.retain(|block_number, storages| { + if *block_number > target_block { + true + } else { + // This is fine, because it's called only on post state splits + self.size -= + storages.iter().fold(0, |acc, (_, storage)| acc + storage.storage.len()); + false + } + }); + } +}