mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat: sparse trie as cache (#21583)
Co-authored-by: yongkangc <chiayongkang@hotmail.com> Co-authored-by: Amp <amp@ampcode.com> Co-authored-by: Georgios Konstantopoulos <me@gakonst.com> Co-authored-by: Brian Picciano <me@mediocregopher.com>
This commit is contained in:
@@ -152,6 +152,8 @@ pub struct TreeConfig {
|
||||
disable_proof_v2: bool,
|
||||
/// Whether to disable cache metrics recording (can be expensive with large cached state).
|
||||
disable_cache_metrics: bool,
|
||||
/// Whether to enable sparse trie as cache.
|
||||
enable_sparse_trie_as_cache: bool,
|
||||
}
|
||||
|
||||
impl Default for TreeConfig {
|
||||
@@ -181,6 +183,7 @@ impl Default for TreeConfig {
|
||||
account_worker_count: default_account_worker_count(),
|
||||
disable_proof_v2: false,
|
||||
disable_cache_metrics: false,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -239,6 +242,7 @@ impl TreeConfig {
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
disable_cache_metrics,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -540,4 +544,15 @@ impl TreeConfig {
|
||||
self.disable_cache_metrics = disable_cache_metrics;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns whether sparse trie as cache is enabled.
|
||||
pub const fn enable_sparse_trie_as_cache(&self) -> bool {
|
||||
self.enable_sparse_trie_as_cache
|
||||
}
|
||||
|
||||
/// Setter for whether to enable sparse trie as cache.
|
||||
pub const fn with_enable_sparse_trie_as_cache(mut self, value: bool) -> Self {
|
||||
self.enable_sparse_trie_as_cache = value;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,14 +7,14 @@ use crate::tree::{
|
||||
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
|
||||
sparse_trie::StateRootComputeOutcome,
|
||||
},
|
||||
sparse_trie::SparseTrieTask,
|
||||
sparse_trie::{SparseTrieCacheTask, SparseTrieTask},
|
||||
StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
use alloy_eip7928::BlockAccessList;
|
||||
use alloy_eips::eip1898::BlockWithParent;
|
||||
use alloy_evm::block::StateChangeSource;
|
||||
use alloy_primitives::B256;
|
||||
use crossbeam_channel::Sender as CrossbeamSender;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use executor::WorkloadExecutor;
|
||||
use metrics::Counter;
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
@@ -39,10 +39,7 @@ use reth_trie_parallel::{
|
||||
proof_task::{ProofTaskCtx, ProofWorkerHandle},
|
||||
root::ParallelStateRootError,
|
||||
};
|
||||
use reth_trie_sparse::{
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
ClearedSparseStateTrie, RevealableSparseTrie, SparseStateTrie,
|
||||
};
|
||||
use reth_trie_sparse::{ClearedSparseStateTrie, RevealableSparseTrie, SparseStateTrie};
|
||||
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
@@ -283,37 +280,45 @@ where
|
||||
v2_proofs_enabled,
|
||||
);
|
||||
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
proof_handle.clone(),
|
||||
to_sparse_trie,
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
|
||||
to_multi_proof.clone(),
|
||||
from_multi_proof,
|
||||
)
|
||||
.with_v2_proofs_enabled(v2_proofs_enabled);
|
||||
if !config.enable_sparse_trie_as_cache() {
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
proof_handle.clone(),
|
||||
to_sparse_trie,
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
|
||||
to_multi_proof.clone(),
|
||||
from_multi_proof.clone(),
|
||||
)
|
||||
.with_v2_proofs_enabled(v2_proofs_enabled);
|
||||
|
||||
// spawn multi-proof task
|
||||
let parent_span = span.clone();
|
||||
let saved_cache = prewarm_handle.saved_cache.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = parent_span.entered();
|
||||
// Build a state provider for the multiproof task
|
||||
let provider = provider_builder.build().expect("failed to build provider");
|
||||
let provider = if let Some(saved_cache) = saved_cache {
|
||||
let (cache, metrics, _disable_metrics) = saved_cache.split();
|
||||
Box::new(CachedStateProvider::new(provider, cache, metrics))
|
||||
as Box<dyn StateProvider>
|
||||
} else {
|
||||
Box::new(provider)
|
||||
};
|
||||
multi_proof_task.run(provider);
|
||||
});
|
||||
// spawn multi-proof task
|
||||
let parent_span = span.clone();
|
||||
let saved_cache = prewarm_handle.saved_cache.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = parent_span.entered();
|
||||
// Build a state provider for the multiproof task
|
||||
let provider = provider_builder.build().expect("failed to build provider");
|
||||
let provider = if let Some(saved_cache) = saved_cache {
|
||||
let (cache, metrics, _disable_metrics) = saved_cache.split();
|
||||
Box::new(CachedStateProvider::new(provider, cache, metrics))
|
||||
as Box<dyn StateProvider>
|
||||
} else {
|
||||
Box::new(provider)
|
||||
};
|
||||
multi_proof_task.run(provider);
|
||||
});
|
||||
}
|
||||
|
||||
// wire the sparse trie to the state root response receiver
|
||||
let (state_root_tx, state_root_rx) = channel();
|
||||
|
||||
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
|
||||
self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
|
||||
self.spawn_sparse_trie_task(
|
||||
sparse_trie_rx,
|
||||
proof_handle,
|
||||
state_root_tx,
|
||||
from_multi_proof,
|
||||
config,
|
||||
);
|
||||
|
||||
PayloadHandle {
|
||||
to_multi_proof: Some(to_multi_proof),
|
||||
@@ -493,19 +498,18 @@ where
|
||||
|
||||
/// Spawns the [`SparseTrieTask`] for this payload processor.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
|
||||
fn spawn_sparse_trie_task<BPF>(
|
||||
fn spawn_sparse_trie_task(
|
||||
&self,
|
||||
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
|
||||
proof_worker_handle: BPF,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
|
||||
) where
|
||||
BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
{
|
||||
from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
|
||||
config: &TreeConfig,
|
||||
) {
|
||||
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
|
||||
let trie_metrics = self.trie_metrics.clone();
|
||||
let span = Span::current();
|
||||
let disable_sparse_trie_as_cache = !config.enable_sparse_trie_as_cache();
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
@@ -525,15 +529,24 @@ where
|
||||
)
|
||||
});
|
||||
|
||||
let task =
|
||||
SparseTrieTask::<_, ParallelSparseTrie, ParallelSparseTrie>::new_with_cleared_trie(
|
||||
let (result, trie) = if disable_sparse_trie_as_cache {
|
||||
SparseTrieTask::new_with_cleared_trie(
|
||||
sparse_trie_rx,
|
||||
proof_worker_handle,
|
||||
trie_metrics,
|
||||
sparse_state_trie,
|
||||
);
|
||||
)
|
||||
.run()
|
||||
} else {
|
||||
SparseTrieCacheTask::new_with_cleared_trie(
|
||||
from_multi_proof,
|
||||
proof_worker_handle,
|
||||
trie_metrics,
|
||||
sparse_state_trie,
|
||||
)
|
||||
.run()
|
||||
};
|
||||
|
||||
let (result, trie) = task.run();
|
||||
// Send state root computation result
|
||||
let _ = state_root_tx.send(result);
|
||||
|
||||
|
||||
@@ -1,15 +1,34 @@
|
||||
//! Sparse Trie task related functionality.
|
||||
|
||||
use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate};
|
||||
use crate::tree::{
|
||||
multiproof::{evm_state_to_hashed_post_state, MultiProofMessage, VersionedMultiProofTargets},
|
||||
payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::Decodable;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::{ParallelBridge, ParallelIterator};
|
||||
use reth_trie::{updates::TrieUpdates, Nibbles};
|
||||
use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError};
|
||||
use reth_errors::ProviderError;
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_revm::state::EvmState;
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, HashedPostState, Nibbles, TrieAccount, EMPTY_ROOT_HASH,
|
||||
};
|
||||
use reth_trie_parallel::{
|
||||
proof_task::{
|
||||
AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
|
||||
ProofWorkerHandle,
|
||||
},
|
||||
root::ParallelStateRootError,
|
||||
targets_v2::MultiProofTargetsV2,
|
||||
};
|
||||
use reth_trie_sparse::{
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind},
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
|
||||
ClearedSparseStateTrie, LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie,
|
||||
SparseTrieExt,
|
||||
};
|
||||
use revm_primitives::{hash_map::Entry, B256Map};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
sync::mpsc,
|
||||
@@ -129,6 +148,359 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
|
||||
pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie> {
|
||||
/// Sender for proof results.
|
||||
proof_result_tx: CrossbeamSender<ProofResultMessage>,
|
||||
/// Receiver for proof results directly from workers.
|
||||
proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
|
||||
/// Receives updates from execution and prewarming.
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
/// `SparseStateTrie` used for computing the state root.
|
||||
trie: SparseStateTrie<A, S>,
|
||||
/// Handle to the proof worker pools (storage and account).
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
/// Account trie updates.
|
||||
account_updates: B256Map<LeafUpdate>,
|
||||
/// Storage trie updates. hashed address -> slot -> update.
|
||||
storage_updates: B256Map<B256Map<LeafUpdate>>,
|
||||
/// Account updates that are blocked by storage root calculation or account reveal.
|
||||
///
|
||||
/// Those are being moved into `account_updates` once storage roots
|
||||
/// are revealed and/or calculated.
|
||||
///
|
||||
/// Invariant: for each entry in `pending_account_updates` account must either be already
|
||||
/// revealed in the trie or have an entry in `account_updates`.
|
||||
///
|
||||
/// Values can be either of:
|
||||
/// - None: account had a storage update and is awaiting storage root calculation and/or
|
||||
/// account node reveal to complete.
|
||||
/// - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
|
||||
/// to complete.
|
||||
pending_account_updates: B256Map<Option<Option<Account>>>,
|
||||
/// Metrics for the sparse trie.
|
||||
metrics: MultiProofTaskMetrics,
|
||||
}
|
||||
|
||||
impl<A, S> SparseTrieCacheTask<A, S>
|
||||
where
|
||||
A: SparseTrieExt + Default,
|
||||
S: SparseTrieExt + Default + Clone,
|
||||
{
|
||||
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
|
||||
pub(super) fn new_with_cleared_trie(
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
sparse_state_trie: ClearedSparseStateTrie<A, S>,
|
||||
) -> Self {
|
||||
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
|
||||
Self {
|
||||
proof_result_tx,
|
||||
proof_result_rx,
|
||||
updates,
|
||||
proof_worker_handle,
|
||||
trie: sparse_state_trie.into_inner(),
|
||||
account_updates: Default::default(),
|
||||
storage_updates: Default::default(),
|
||||
pending_account_updates: Default::default(),
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs the sparse trie task to completion.
|
||||
///
|
||||
/// This waits for new incoming [`MultiProofMessage`]s, applies updates to the trie and
|
||||
/// schedules proof fetching when needed.
|
||||
///
|
||||
/// This concludes once the last state update has been received and processed.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// - State root computation outcome.
|
||||
/// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
pub(super) fn run(
|
||||
mut self,
|
||||
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
|
||||
// run the main loop to completion
|
||||
let result = self.run_inner();
|
||||
(result, self.trie)
|
||||
}
|
||||
|
||||
/// Inner function to run the sparse trie task to completion.
|
||||
///
|
||||
/// See [`Self::run`] for more information.
|
||||
fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut finished_state_updates = false;
|
||||
loop {
|
||||
crossbeam_channel::select_biased! {
|
||||
recv(self.proof_result_rx) -> message => {
|
||||
let Ok(result) = message else {
|
||||
unreachable!("we own the sender half")
|
||||
};
|
||||
self.on_proof_result(result)?;
|
||||
},
|
||||
recv(self.updates) -> message => {
|
||||
let update = match message {
|
||||
Ok(m) => m,
|
||||
Err(_) => {
|
||||
break
|
||||
}
|
||||
};
|
||||
|
||||
match update {
|
||||
MultiProofMessage::PrefetchProofs(targets) => {
|
||||
self.on_prewarm_targets(targets);
|
||||
}
|
||||
MultiProofMessage::StateUpdate(_, state) => {
|
||||
self.on_state_update(state);
|
||||
}
|
||||
MultiProofMessage::EmptyProof { sequence_number: _, state } => {
|
||||
self.on_hashed_state_update(state);
|
||||
}
|
||||
MultiProofMessage::BlockAccessList(_) => todo!(),
|
||||
MultiProofMessage::FinishedStateUpdates => {
|
||||
finished_state_updates = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.process_updates()?;
|
||||
|
||||
if finished_state_updates &&
|
||||
self.account_updates.is_empty() &&
|
||||
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Process any remaining pending account updates.
|
||||
if !self.pending_account_updates.is_empty() {
|
||||
self.process_updates()?;
|
||||
}
|
||||
|
||||
debug!(target: "engine::root", "All proofs processed, ending calculation");
|
||||
|
||||
let start = Instant::now();
|
||||
let (state_root, trie_updates) =
|
||||
self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
|
||||
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
|
||||
})?;
|
||||
|
||||
let end = Instant::now();
|
||||
self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
|
||||
self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
|
||||
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates })
|
||||
}
|
||||
|
||||
fn on_prewarm_targets(&mut self, targets: VersionedMultiProofTargets) {
|
||||
let VersionedMultiProofTargets::V2(targets) = targets else {
|
||||
unreachable!("sparse trie as cache must only be used with V2 multiproof targets");
|
||||
};
|
||||
|
||||
for target in targets.account_targets {
|
||||
// Only touch accounts that are not yet present in the updates set.
|
||||
self.account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
|
||||
}
|
||||
|
||||
for (address, slots) in targets.storage_targets {
|
||||
for slot in slots {
|
||||
// Only touch storages that are not yet present in the updates set.
|
||||
self.storage_updates
|
||||
.entry(address)
|
||||
.or_default()
|
||||
.entry(slot.key())
|
||||
.or_insert(LeafUpdate::Touched);
|
||||
}
|
||||
|
||||
// Touch corresponding account leaf to make sure its revealed in accounts trie for
|
||||
// storage root update.
|
||||
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes a state update and encodes all state changes as trie updates.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all,
|
||||
fields(accounts = update.len())
|
||||
)]
|
||||
fn on_state_update(&mut self, update: EvmState) {
|
||||
let hashed_state_update = evm_state_to_hashed_post_state(update);
|
||||
self.on_hashed_state_update(hashed_state_update)
|
||||
}
|
||||
|
||||
/// Processes a hashed state update and encodes all state changes as trie updates.
|
||||
fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
|
||||
for (address, storage) in hashed_state_update.storages {
|
||||
for (slot, value) in storage.storage {
|
||||
let encoded = if value.is_zero() {
|
||||
Vec::new()
|
||||
} else {
|
||||
alloy_rlp::encode_fixed_size(&value).to_vec()
|
||||
};
|
||||
self.storage_updates
|
||||
.entry(address)
|
||||
.or_default()
|
||||
.insert(slot, LeafUpdate::Changed(encoded));
|
||||
}
|
||||
|
||||
// Make sure account is tracked in `account_updates` so that it is revealed in accounts
|
||||
// trie for storage root update.
|
||||
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
|
||||
|
||||
// Make sure account is tracked in `pending_account_updates` so that once storage root
|
||||
// is computed, it will be updated in the accounts trie.
|
||||
self.pending_account_updates.entry(address).or_insert(None);
|
||||
}
|
||||
|
||||
for (address, account) in hashed_state_update.accounts {
|
||||
// Track account as touched.
|
||||
//
|
||||
// This might overwrite an existing update, which is fine, because storage root from it
|
||||
// is already tracked in the trie and can be easily fetched again.
|
||||
self.account_updates.insert(address, LeafUpdate::Touched);
|
||||
|
||||
// Track account in `pending_account_updates` so that once storage root is computed,
|
||||
// it will be updated in the accounts trie.
|
||||
self.pending_account_updates.insert(address, Some(account));
|
||||
}
|
||||
}
|
||||
|
||||
fn on_proof_result(
|
||||
&mut self,
|
||||
result: ProofResultMessage,
|
||||
) -> Result<(), ParallelStateRootError> {
|
||||
let ProofResult::V2(result) = result.result? else {
|
||||
unreachable!("sparse trie as cache must only be used with multiproof v2");
|
||||
};
|
||||
|
||||
self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
|
||||
ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
|
||||
})
|
||||
}
|
||||
|
||||
/// Applies updates to the sparse trie and dispatches requested multiproof targets.
|
||||
fn process_updates(&mut self) -> Result<(), ProviderError> {
|
||||
let mut targets = MultiProofTargetsV2::default();
|
||||
|
||||
for (addr, updates) in &mut self.storage_updates {
|
||||
let trie = self.trie.get_or_create_storage_trie_mut(*addr);
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| {
|
||||
targets
|
||||
.storage_targets
|
||||
.entry(*addr)
|
||||
.or_default()
|
||||
.push(Target::new(path).with_min_len(min_len));
|
||||
})
|
||||
.map_err(ProviderError::other)?;
|
||||
|
||||
// If all storage updates were processed, we can now compute the new storage root.
|
||||
if updates.is_empty() {
|
||||
let storage_root =
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
|
||||
// If there is a pending account update for this address with known info, we can
|
||||
// encode it into proper update right away.
|
||||
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
|
||||
entry.get().is_some()
|
||||
{
|
||||
let account = entry.remove().expect("just checked, should be Some");
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) &&
|
||||
storage_root == EMPTY_ROOT_HASH
|
||||
{
|
||||
Vec::new()
|
||||
} else {
|
||||
// TODO: optimize allocation
|
||||
alloy_rlp::encode(
|
||||
account.unwrap_or_default().into_trie_account(storage_root),
|
||||
)
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now handle pending account updates that can be upgraded to a proper update.
|
||||
self.pending_account_updates.retain(|addr, account| {
|
||||
// If account has pending storage updates, it is still pending.
|
||||
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Get the current account state either from the trie or from latest account update.
|
||||
let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
|
||||
Some(encoded).filter(|encoded| !encoded.is_empty())
|
||||
} else if !self.account_updates.contains_key(addr) {
|
||||
self.trie.get_account_value(addr)
|
||||
} else {
|
||||
// Needs to be revealed first
|
||||
return true;
|
||||
};
|
||||
|
||||
let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
|
||||
|
||||
let (account, storage_root) = if let Some(account) = account.take() {
|
||||
// If account is Some(_) here it means it didn't have any storage updates
|
||||
// and we can fetch the storage root directly from the account trie.
|
||||
//
|
||||
// If it did have storage updates, we would've had processed it above when iterating over storage tries.
|
||||
let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
|
||||
|
||||
(account, storage_root)
|
||||
} else {
|
||||
(trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
|
||||
};
|
||||
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
|
||||
Vec::new()
|
||||
} else {
|
||||
let account = account.unwrap_or_default().into_trie_account(storage_root);
|
||||
|
||||
// TODO: optimize allocation
|
||||
alloy_rlp::encode(account)
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
|
||||
false
|
||||
});
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.trie
|
||||
.trie_mut()
|
||||
.update_leaves(&mut self.account_updates, |target, min_len| {
|
||||
targets.account_targets.push(Target::new(target).with_min_len(min_len));
|
||||
})
|
||||
.map_err(ProviderError::other)?;
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput::V2 {
|
||||
targets,
|
||||
proof_result_sender: ProofResultContext::new(
|
||||
self.proof_result_tx.clone(),
|
||||
0,
|
||||
HashedPostState::default(),
|
||||
Instant::now(),
|
||||
),
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Outcome of the state root computation, including the state root itself with
|
||||
/// the trie updates.
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -171,7 +171,7 @@ pub enum SparseTrieErrorKind {
|
||||
/// Path to the node.
|
||||
path: Nibbles,
|
||||
/// Node that was at the path when revealing.
|
||||
node: Box<dyn core::fmt::Debug + Send>,
|
||||
node: Box<dyn core::fmt::Debug + Send + Sync>,
|
||||
},
|
||||
/// RLP error.
|
||||
#[error(transparent)]
|
||||
@@ -184,7 +184,7 @@ pub enum SparseTrieErrorKind {
|
||||
},
|
||||
/// Other.
|
||||
#[error(transparent)]
|
||||
Other(#[from] Box<dyn core::error::Error + Send>),
|
||||
Other(#[from] Box<dyn core::error::Error + Send + Sync>),
|
||||
}
|
||||
|
||||
/// Trie witness errors.
|
||||
|
||||
@@ -37,6 +37,7 @@ pub struct DefaultEngineValues {
|
||||
account_worker_count: Option<usize>,
|
||||
disable_proof_v2: bool,
|
||||
cache_metrics_disabled: bool,
|
||||
enable_sparse_trie_as_cache: bool,
|
||||
}
|
||||
|
||||
impl DefaultEngineValues {
|
||||
@@ -172,6 +173,12 @@ impl DefaultEngineValues {
|
||||
self.cache_metrics_disabled = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether to enable sparse trie as cache by default
|
||||
pub const fn with_enable_sparse_trie_as_cache(mut self, v: bool) -> Self {
|
||||
self.enable_sparse_trie_as_cache = v;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DefaultEngineValues {
|
||||
@@ -197,6 +204,7 @@ impl Default for DefaultEngineValues {
|
||||
account_worker_count: None,
|
||||
disable_proof_v2: false,
|
||||
cache_metrics_disabled: false,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -324,6 +332,10 @@ pub struct EngineArgs {
|
||||
/// Disable cache metrics recording, which can take up to 50ms with large cached state.
|
||||
#[arg(long = "engine.disable-cache-metrics", default_value_t = DefaultEngineValues::get_global().cache_metrics_disabled)]
|
||||
pub cache_metrics_disabled: bool,
|
||||
|
||||
/// Enable sparse trie as cache.
|
||||
#[arg(long = "engine.enable-sparse-trie-as-cache", default_value_t = DefaultEngineValues::get_global().enable_sparse_trie_as_cache, conflicts_with = "disable_proof_v2")]
|
||||
pub enable_sparse_trie_as_cache: bool,
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
@@ -350,6 +362,7 @@ impl Default for EngineArgs {
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
cache_metrics_disabled,
|
||||
enable_sparse_trie_as_cache,
|
||||
} = DefaultEngineValues::get_global().clone();
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -376,6 +389,7 @@ impl Default for EngineArgs {
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
cache_metrics_disabled,
|
||||
enable_sparse_trie_as_cache,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -412,6 +426,7 @@ impl EngineArgs {
|
||||
|
||||
config = config.with_disable_proof_v2(self.disable_proof_v2);
|
||||
config = config.without_cache_metrics(self.cache_metrics_disabled);
|
||||
config = config.with_enable_sparse_trie_as_cache(self.enable_sparse_trie_as_cache);
|
||||
|
||||
config
|
||||
}
|
||||
@@ -464,6 +479,7 @@ mod tests {
|
||||
account_worker_count: Some(8),
|
||||
disable_proof_v2: false,
|
||||
cache_metrics_disabled: true,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
};
|
||||
|
||||
let parsed_args = CommandParser::<EngineArgs>::parse_from([
|
||||
|
||||
@@ -89,6 +89,12 @@ impl From<revm_state::Account> for Account {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TrieAccount> for Account {
|
||||
fn from(value: TrieAccount) -> Self {
|
||||
Self { balance: value.balance, nonce: value.nonce, bytecode_hash: Some(value.code_hash) }
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemorySize for Account {
|
||||
fn size(&self) -> usize {
|
||||
size_of::<Self>()
|
||||
|
||||
@@ -154,6 +154,11 @@ where
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Returns mutable reference to account trie.
|
||||
pub const fn trie_mut(&mut self) -> &mut RevealableSparseTrie<A> {
|
||||
&mut self.state
|
||||
}
|
||||
|
||||
/// Returns `true` if account was already revealed.
|
||||
pub fn is_account_revealed(&self, account: B256) -> bool {
|
||||
self.revealed_account_paths.contains(&Nibbles::unpack(account))
|
||||
@@ -224,6 +229,14 @@ where
|
||||
self.storage.tries.insert(address, storage_trie);
|
||||
}
|
||||
|
||||
/// Returns mutable reference to storage sparse trie, creating a blind one if it doesn't exist.
|
||||
pub fn get_or_create_storage_trie_mut(
|
||||
&mut self,
|
||||
address: B256,
|
||||
) -> &mut RevealableSparseTrie<S> {
|
||||
self.storage.get_or_create_trie_mut(address)
|
||||
}
|
||||
|
||||
/// Reveal unknown trie paths from multiproof.
|
||||
/// NOTE: This method does not extensively validate the proof.
|
||||
pub fn reveal_multiproof(&mut self, multiproof: MultiProof) -> SparseStateTrieResult<()> {
|
||||
@@ -460,16 +473,17 @@ where
|
||||
self.metrics.increment_skipped_account_nodes(_metric_values.skipped_nodes as u64);
|
||||
}
|
||||
|
||||
if let Some(root_node) = root_node {
|
||||
let trie = if let Some(root_node) = root_node {
|
||||
trace!(target: "trie::sparse", ?root_node, "Revealing root account node from V2 proof");
|
||||
let trie =
|
||||
self.state.reveal_root(root_node.node, root_node.masks, self.retain_updates)?;
|
||||
self.state.reveal_root(root_node.node, root_node.masks, self.retain_updates)?
|
||||
} else {
|
||||
self.state.as_revealed_mut().ok_or(SparseTrieErrorKind::Blind)?
|
||||
};
|
||||
|
||||
trie.reserve_nodes(new_nodes);
|
||||
trie.reserve_nodes(new_nodes);
|
||||
|
||||
trace!(target: "trie::sparse", total_nodes = ?nodes.len(), "Revealing account nodes from V2 proof");
|
||||
trie.reveal_nodes(nodes)?;
|
||||
}
|
||||
trace!(target: "trie::sparse", total_nodes = ?nodes.len(), "Revealing account nodes from V2 proof");
|
||||
trie.reveal_nodes(nodes)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -513,15 +527,17 @@ where
|
||||
let FilteredV2ProofNodes { root_node, nodes, new_nodes, metric_values } =
|
||||
filter_revealed_v2_proof_nodes(nodes, revealed_nodes)?;
|
||||
|
||||
if let Some(root_node) = root_node {
|
||||
let trie = if let Some(root_node) = root_node {
|
||||
trace!(target: "trie::sparse", ?account, ?root_node, "Revealing root storage node from V2 proof");
|
||||
let trie = trie.reveal_root(root_node.node, root_node.masks, retain_updates)?;
|
||||
trie.reveal_root(root_node.node, root_node.masks, retain_updates)?
|
||||
} else {
|
||||
trie.as_revealed_mut().ok_or(SparseTrieErrorKind::Blind)?
|
||||
};
|
||||
|
||||
trie.reserve_nodes(new_nodes);
|
||||
trie.reserve_nodes(new_nodes);
|
||||
|
||||
trace!(target: "trie::sparse", ?account, total_nodes = ?nodes.len(), "Revealing storage nodes from V2 proof");
|
||||
trie.reveal_nodes(nodes)?;
|
||||
}
|
||||
trace!(target: "trie::sparse", ?account, total_nodes = ?nodes.len(), "Revealing storage nodes from V2 proof");
|
||||
trie.reveal_nodes(nodes)?;
|
||||
|
||||
Ok(metric_values)
|
||||
}
|
||||
@@ -714,8 +730,8 @@ where
|
||||
}
|
||||
|
||||
/// Returns storage sparse trie root if the trie has been revealed.
|
||||
pub fn storage_root(&mut self, account: B256) -> Option<B256> {
|
||||
self.storage.tries.get_mut(&account).and_then(|trie| trie.root())
|
||||
pub fn storage_root(&mut self, account: &B256) -> Option<B256> {
|
||||
self.storage.tries.get_mut(account).and_then(|trie| trie.root())
|
||||
}
|
||||
|
||||
/// Returns mutable reference to the revealed account sparse trie.
|
||||
@@ -1170,6 +1186,13 @@ impl<S: SparseTrieTrait + Clone> StorageTries<S> {
|
||||
(trie, revealed_paths)
|
||||
}
|
||||
|
||||
// Returns mutable reference to storage sparse trie, creating a blind one if it doesn't exist.
|
||||
fn get_or_create_trie_mut(&mut self, address: B256) -> &mut RevealableSparseTrie<S> {
|
||||
self.tries.entry(address).or_insert_with(|| {
|
||||
self.cleared_tries.pop().unwrap_or_else(|| self.default_trie.clone())
|
||||
})
|
||||
}
|
||||
|
||||
/// Takes the storage trie for the account from the internal `HashMap`, creating it if it
|
||||
/// doesn't already exist.
|
||||
#[cfg(feature = "std")]
|
||||
@@ -1772,7 +1795,7 @@ mod tests {
|
||||
&provider_factory,
|
||||
)
|
||||
.unwrap();
|
||||
trie_account_1.storage_root = sparse.storage_root(address_1).unwrap();
|
||||
trie_account_1.storage_root = sparse.storage_root(&address_1).unwrap();
|
||||
sparse
|
||||
.update_account_leaf(
|
||||
address_path_1,
|
||||
@@ -1782,7 +1805,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
sparse.wipe_storage(address_2).unwrap();
|
||||
trie_account_2.storage_root = sparse.storage_root(address_2).unwrap();
|
||||
trie_account_2.storage_root = sparse.storage_root(&address_2).unwrap();
|
||||
sparse
|
||||
.update_account_leaf(
|
||||
address_path_2,
|
||||
|
||||
@@ -1007,6 +1007,9 @@ Engine:
|
||||
--engine.disable-cache-metrics
|
||||
Disable cache metrics recording, which can take up to 50ms with large cached state
|
||||
|
||||
--engine.enable-sparse-trie-as-cache
|
||||
Enable sparse trie as cache
|
||||
|
||||
ERA:
|
||||
--era.enable
|
||||
Enable import from ERA1 files
|
||||
|
||||
@@ -1007,6 +1007,9 @@ Engine:
|
||||
--engine.disable-cache-metrics
|
||||
Disable cache metrics recording, which can take up to 50ms with large cached state
|
||||
|
||||
--engine.enable-sparse-trie-as-cache
|
||||
Enable sparse trie as cache
|
||||
|
||||
ERA:
|
||||
--era.enable
|
||||
Enable import from ERA1 files
|
||||
|
||||
Reference in New Issue
Block a user