mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf: optimize SparseTrieCacheTask (#21704)
This commit is contained in:
@@ -517,6 +517,8 @@ where
|
||||
let disable_sparse_trie_as_cache = !config.enable_sparse_trie_as_cache();
|
||||
let prune_depth = self.sparse_trie_prune_depth;
|
||||
let max_storage_tries = self.sparse_trie_max_storage_tries;
|
||||
let chunk_size =
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
@@ -557,6 +559,7 @@ where
|
||||
proof_worker_handle,
|
||||
trie_metrics.clone(),
|
||||
sparse_state_trie,
|
||||
chunk_size,
|
||||
))
|
||||
};
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use reth_trie_parallel::{
|
||||
AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
|
||||
ProofWorkerHandle,
|
||||
},
|
||||
targets_v2::{ChunkedMultiProofTargetsV2, MultiProofTargetsV2},
|
||||
targets_v2::MultiProofTargetsV2,
|
||||
};
|
||||
use revm_primitives::map::{hash_map, B256Map};
|
||||
use std::{collections::BTreeMap, sync::Arc, time::Instant};
|
||||
@@ -63,7 +63,7 @@ const PREFETCH_MAX_BATCH_MESSAGES: usize = 16;
|
||||
|
||||
/// The default max targets, for limiting the number of account and storage proof targets to be
|
||||
/// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability.
|
||||
const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
|
||||
pub(crate) const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
|
||||
|
||||
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
|
||||
/// state.
|
||||
@@ -311,11 +311,7 @@ impl VersionedMultiProofTargets {
|
||||
fn chunking_length(&self) -> usize {
|
||||
match self {
|
||||
Self::Legacy(targets) => targets.chunking_length(),
|
||||
Self::V2(targets) => {
|
||||
// For V2, count accounts + storage slots
|
||||
targets.account_targets.len() +
|
||||
targets.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
|
||||
}
|
||||
Self::V2(targets) => targets.chunking_length(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -367,9 +363,7 @@ impl VersionedMultiProofTargets {
|
||||
Self::Legacy(targets) => {
|
||||
Box::new(MultiProofTargets::chunks(targets, chunk_size).map(Self::Legacy))
|
||||
}
|
||||
Self::V2(targets) => {
|
||||
Box::new(ChunkedMultiProofTargetsV2::new(targets, chunk_size).map(Self::V2))
|
||||
}
|
||||
Self::V2(targets) => Box::new(targets.chunks(chunk_size).map(Self::V2)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1494,7 +1488,7 @@ fn get_proof_targets(
|
||||
/// Dispatches work items as a single unit or in chunks based on target size and worker
|
||||
/// availability.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn dispatch_with_chunking<T, I>(
|
||||
pub(crate) fn dispatch_with_chunking<T, I>(
|
||||
items: T,
|
||||
chunking_len: usize,
|
||||
chunk_size: Option<usize>,
|
||||
|
||||
@@ -1,19 +1,21 @@
|
||||
//! Sparse Trie task related functionality.
|
||||
|
||||
use crate::tree::{
|
||||
multiproof::{evm_state_to_hashed_post_state, MultiProofMessage, VersionedMultiProofTargets},
|
||||
multiproof::{
|
||||
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
|
||||
VersionedMultiProofTargets, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
|
||||
},
|
||||
payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::ParallelIterator;
|
||||
use reth_errors::ProviderError;
|
||||
use rayon::iter::{IntoParallelRefMutIterator, ParallelBridge, ParallelIterator};
|
||||
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
|
||||
use reth_revm::state::EvmState;
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, HashedPostState, Nibbles, TrieAccount, EMPTY_ROOT_HASH,
|
||||
TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
|
||||
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
};
|
||||
use reth_trie_parallel::{
|
||||
proof_task::{
|
||||
@@ -24,7 +26,7 @@ use reth_trie_parallel::{
|
||||
targets_v2::MultiProofTargetsV2,
|
||||
};
|
||||
use reth_trie_sparse::{
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind},
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
|
||||
};
|
||||
@@ -34,7 +36,7 @@ use std::{
|
||||
sync::mpsc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, trace};
|
||||
use tracing::{debug, debug_span, error, instrument, trace};
|
||||
|
||||
#[expect(clippy::large_enum_variant)]
|
||||
pub(super) enum SpawnedSparseTrieTask<BPF, A, S>
|
||||
@@ -203,6 +205,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
|
||||
const MAX_PENDING_UPDATES: usize = 100;
|
||||
|
||||
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
|
||||
pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie> {
|
||||
/// Sender for proof results.
|
||||
@@ -215,6 +220,15 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
|
||||
trie: SparseStateTrie<A, S>,
|
||||
/// Handle to the proof worker pools (storage and account).
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
|
||||
/// The size of proof targets chunk to spawn in one calculation.
|
||||
/// If None, chunking is disabled and all targets are processed in a single proof.
|
||||
chunk_size: Option<usize>,
|
||||
/// If this number is exceeded and chunking is enabled, then this will override whether or not
|
||||
/// there are any active workers and force chunking across workers. This is to prevent tasks
|
||||
/// which are very long from hitting a single worker.
|
||||
max_targets_for_chunking: usize,
|
||||
|
||||
/// Account trie updates.
|
||||
account_updates: B256Map<LeafUpdate>,
|
||||
/// Storage trie updates. hashed address -> slot -> update.
|
||||
@@ -241,6 +255,14 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
|
||||
fetched_storage_targets: B256Map<B256Map<u8>>,
|
||||
/// Reusable buffer for RLP encoding of accounts.
|
||||
account_rlp_buf: Vec<u8>,
|
||||
/// Whether the last state update has been received.
|
||||
finished_state_updates: bool,
|
||||
/// Pending targets to be dispatched to the proof workers.
|
||||
pending_targets: MultiProofTargetsV2,
|
||||
/// Number of pending execution/prewarming updates received but not yet passed to
|
||||
/// `update_leaves`.
|
||||
pending_updates: usize,
|
||||
|
||||
/// Metrics for the sparse trie.
|
||||
metrics: MultiProofTaskMetrics,
|
||||
}
|
||||
@@ -255,7 +277,8 @@ where
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
sparse_state_trie: SparseStateTrie<A, S>,
|
||||
trie: SparseStateTrie<A, S>,
|
||||
chunk_size: Option<usize>,
|
||||
) -> Self {
|
||||
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
|
||||
Self {
|
||||
@@ -263,13 +286,18 @@ where
|
||||
proof_result_rx,
|
||||
updates,
|
||||
proof_worker_handle,
|
||||
trie: sparse_state_trie,
|
||||
trie,
|
||||
chunk_size,
|
||||
max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
|
||||
account_updates: Default::default(),
|
||||
storage_updates: Default::default(),
|
||||
pending_account_updates: Default::default(),
|
||||
fetched_account_targets: Default::default(),
|
||||
fetched_storage_targets: Default::default(),
|
||||
account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
|
||||
finished_state_updates: Default::default(),
|
||||
pending_targets: Default::default(),
|
||||
pending_updates: Default::default(),
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
@@ -317,15 +345,8 @@ where
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut finished_state_updates = false;
|
||||
loop {
|
||||
crossbeam_channel::select_biased! {
|
||||
recv(self.proof_result_rx) -> message => {
|
||||
let Ok(result) = message else {
|
||||
unreachable!("we own the sender half")
|
||||
};
|
||||
self.on_proof_result(result)?;
|
||||
},
|
||||
recv(self.updates) -> message => {
|
||||
let update = match message {
|
||||
Ok(m) => m,
|
||||
@@ -334,27 +355,48 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
match update {
|
||||
MultiProofMessage::PrefetchProofs(targets) => {
|
||||
self.on_prewarm_targets(targets);
|
||||
}
|
||||
MultiProofMessage::StateUpdate(_, state) => {
|
||||
self.on_state_update(state);
|
||||
}
|
||||
MultiProofMessage::EmptyProof { sequence_number: _, state } => {
|
||||
self.on_hashed_state_update(state);
|
||||
}
|
||||
MultiProofMessage::BlockAccessList(_) => todo!(),
|
||||
MultiProofMessage::FinishedStateUpdates => {
|
||||
finished_state_updates = true;
|
||||
}
|
||||
}
|
||||
self.on_multiproof_message(update);
|
||||
self.pending_updates += 1;
|
||||
}
|
||||
recv(self.proof_result_rx) -> message => {
|
||||
let Ok(result) = message else {
|
||||
unreachable!("we own the sender half")
|
||||
};
|
||||
let ProofResult::V2(mut result) = result.result? else {
|
||||
unreachable!("sparse trie as cache must only be used with multiproof v2");
|
||||
};
|
||||
|
||||
while let Ok(next) = self.proof_result_rx.try_recv() {
|
||||
let ProofResult::V2(res) = next.result? else {
|
||||
unreachable!("sparse trie as cache must only be used with multiproof v2");
|
||||
};
|
||||
result.extend(res);
|
||||
}
|
||||
|
||||
self.on_proof_result(result)?;
|
||||
},
|
||||
}
|
||||
|
||||
self.process_updates()?;
|
||||
if self.updates.is_empty() && self.proof_result_rx.is_empty() {
|
||||
// If we don't have any pending messages, we can spend some time on computing
|
||||
// storage roots and promoting account updates.
|
||||
self.dispatch_pending_targets();
|
||||
self.promote_pending_account_updates()?;
|
||||
self.dispatch_pending_targets();
|
||||
} else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
|
||||
// If we don't have any pending updates OR we've accumulated a lot already, apply
|
||||
// them to the trie,
|
||||
self.process_leaf_updates()?;
|
||||
self.dispatch_pending_targets();
|
||||
} else if self.updates.is_empty() ||
|
||||
self.pending_targets.chunking_length() > self.chunk_size.unwrap_or_default()
|
||||
{
|
||||
// Make sure to dispatch targets if we don't have any updates or if we've
|
||||
// accumulated a lot of them.
|
||||
self.dispatch_pending_targets();
|
||||
}
|
||||
|
||||
if finished_state_updates &&
|
||||
if self.finished_state_updates &&
|
||||
self.account_updates.is_empty() &&
|
||||
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
|
||||
{
|
||||
@@ -377,6 +419,22 @@ where
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates })
|
||||
}
|
||||
|
||||
/// Processes a [`MultiProofMessage`].
|
||||
fn on_multiproof_message(&mut self, message: MultiProofMessage) {
|
||||
match message {
|
||||
MultiProofMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
|
||||
MultiProofMessage::StateUpdate(_, state) => self.on_state_update(state),
|
||||
MultiProofMessage::EmptyProof { .. } => unreachable!(),
|
||||
MultiProofMessage::BlockAccessList(_) => todo!(),
|
||||
MultiProofMessage::FinishedStateUpdates => self.finished_state_updates = true,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn on_prewarm_targets(&mut self, targets: VersionedMultiProofTargets) {
|
||||
let VersionedMultiProofTargets::V2(targets) = targets else {
|
||||
unreachable!("sparse trie as cache must only be used with V2 multiproof targets");
|
||||
@@ -412,11 +470,7 @@ where
|
||||
)]
|
||||
fn on_state_update(&mut self, update: EvmState) {
|
||||
let hashed_state_update = evm_state_to_hashed_post_state(update);
|
||||
self.on_hashed_state_update(hashed_state_update)
|
||||
}
|
||||
|
||||
/// Processes a hashed state update and encodes all state changes as trie updates.
|
||||
fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
|
||||
for (address, storage) in hashed_state_update.storages {
|
||||
for (slot, value) in storage.storage {
|
||||
let encoded = if value.is_zero() {
|
||||
@@ -454,72 +508,148 @@ where
|
||||
|
||||
fn on_proof_result(
|
||||
&mut self,
|
||||
result: ProofResultMessage,
|
||||
result: DecodedMultiProofV2,
|
||||
) -> Result<(), ParallelStateRootError> {
|
||||
let ProofResult::V2(result) = result.result? else {
|
||||
unreachable!("sparse trie as cache must only be used with multiproof v2");
|
||||
};
|
||||
|
||||
self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
|
||||
ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
|
||||
})
|
||||
}
|
||||
|
||||
/// Applies updates to the sparse trie and dispatches requested multiproof targets.
|
||||
fn process_updates(&mut self) -> Result<(), ProviderError> {
|
||||
let mut targets = MultiProofTargetsV2::default();
|
||||
/// Applies all account and storage leaf updates to corresponding tries and collects any new
|
||||
/// multiproof targets.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn process_leaf_updates(&mut self) -> SparseTrieResult<()> {
|
||||
self.pending_updates = 0;
|
||||
|
||||
for (addr, updates) in &mut self.storage_updates {
|
||||
let trie = self.trie.get_or_create_storage_trie_mut(*addr);
|
||||
let fetched_storage = self.fetched_storage_targets.entry(*addr).or_default();
|
||||
// Start with processing all storage updates in parallel.
|
||||
let storage_results = self
|
||||
.storage_updates
|
||||
.iter_mut()
|
||||
.map(|(address, updates)| {
|
||||
let trie = self.trie.take_or_create_storage_trie(address);
|
||||
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched_storage.entry(path) {
|
||||
(address, updates, fetched, trie)
|
||||
})
|
||||
.par_bridge()
|
||||
.map(|(address, updates, mut fetched, mut trie)| {
|
||||
let mut targets = Vec::new();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
|
||||
SparseTrieResult::Ok((address, targets, fetched, trie))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
for (address, targets, fetched, trie) in storage_results {
|
||||
self.fetched_storage_targets.insert(*address, fetched);
|
||||
self.trie.insert_storage_trie(*address, trie);
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
|
||||
}
|
||||
}
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.process_account_leaf_updates()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
|
||||
///
|
||||
/// Returns whether any updates were drained (applied to the trie).
|
||||
fn process_account_leaf_updates(&mut self) -> SparseTrieResult<bool> {
|
||||
let updates_len_before = self.account_updates.len();
|
||||
|
||||
self.trie.trie_mut().update_leaves(
|
||||
&mut self.account_updates,
|
||||
|target, min_len| match self.fetched_account_targets.entry(target) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets
|
||||
.storage_targets
|
||||
.entry(*addr)
|
||||
.or_default()
|
||||
.push(Target::new(path).with_min_len(min_len));
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets
|
||||
.storage_targets
|
||||
.entry(*addr)
|
||||
.or_default()
|
||||
.push(Target::new(path).with_min_len(min_len));
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
})
|
||||
.map_err(ProviderError::other)?;
|
||||
},
|
||||
)?;
|
||||
|
||||
// If all storage updates were processed, we can now compute the new storage root.
|
||||
if updates.is_empty() {
|
||||
let storage_root =
|
||||
Ok(self.account_updates.len() < updates_len_before)
|
||||
}
|
||||
|
||||
/// Iterates through all storage tries for which all updates were processed, computes their
|
||||
/// storage roots, and promotes corresponding pending account updates into proper leaf updates
|
||||
/// for accounts trie.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
|
||||
self.process_leaf_updates()?;
|
||||
|
||||
if self.pending_account_updates.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let roots = self
|
||||
.trie
|
||||
.storage_tries_mut()
|
||||
.par_iter_mut()
|
||||
.filter(|(address, _)| {
|
||||
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
|
||||
})
|
||||
.map(|(address, trie)| {
|
||||
let root =
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
|
||||
// If there is a pending account update for this address with known info, we can
|
||||
// encode it into proper update right away.
|
||||
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
|
||||
entry.get().is_some()
|
||||
(address, root)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (addr, storage_root) in roots {
|
||||
// If the storage root is known and we have a pending update for this account, encode it
|
||||
// into a proper update.
|
||||
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
|
||||
entry.get().is_some()
|
||||
{
|
||||
let account = entry.remove().expect("just checked, should be Some");
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) &&
|
||||
storage_root == EMPTY_ROOT_HASH
|
||||
{
|
||||
let account = entry.remove().expect("just checked, should be Some");
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) &&
|
||||
storage_root == EMPTY_ROOT_HASH
|
||||
{
|
||||
Vec::new()
|
||||
} else {
|
||||
self.account_rlp_buf.clear();
|
||||
account
|
||||
.unwrap_or_default()
|
||||
.into_trie_account(storage_root)
|
||||
.encode(&mut self.account_rlp_buf);
|
||||
self.account_rlp_buf.clone()
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
}
|
||||
Vec::new()
|
||||
} else {
|
||||
self.account_rlp_buf.clear();
|
||||
account
|
||||
.unwrap_or_default()
|
||||
.into_trie_account(storage_root)
|
||||
.encode(&mut self.account_rlp_buf);
|
||||
self.account_rlp_buf.clone()
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -568,52 +698,52 @@ where
|
||||
false
|
||||
});
|
||||
|
||||
let updates_len_before = self.account_updates.len();
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.trie
|
||||
.trie_mut()
|
||||
.update_leaves(&mut self.account_updates, |target, min_len| {
|
||||
match self.fetched_account_targets.entry(target) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.account_targets.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
})
|
||||
.map_err(ProviderError::other)?;
|
||||
|
||||
if updates_len_before == self.account_updates.len() {
|
||||
// Only exit when no new updates are processed.
|
||||
//
|
||||
// We need to keep iterating if any updates are being drained because that might
|
||||
// indicate that more pending account updates can be promoted.
|
||||
break;
|
||||
// Only exit when no new updates are processed.
|
||||
//
|
||||
// We need to keep iterating if any updates are being drained because that might
|
||||
// indicate that more pending account updates can be promoted.
|
||||
if !self.process_account_leaf_updates()? {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput::V2 {
|
||||
targets,
|
||||
proof_result_sender: ProofResultContext::new(
|
||||
self.proof_result_tx.clone(),
|
||||
0,
|
||||
HashedPostState::default(),
|
||||
Instant::now(),
|
||||
),
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn dispatch_pending_targets(&mut self) {
|
||||
if !self.pending_targets.is_empty() {
|
||||
let chunking_length = self.pending_targets.chunking_length();
|
||||
dispatch_with_chunking(
|
||||
std::mem::take(&mut self.pending_targets),
|
||||
chunking_length,
|
||||
self.chunk_size,
|
||||
self.max_targets_for_chunking,
|
||||
self.proof_worker_handle.available_account_workers(),
|
||||
self.proof_worker_handle.available_storage_workers(),
|
||||
MultiProofTargetsV2::chunks,
|
||||
|proof_targets| {
|
||||
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(
|
||||
AccountMultiproofInput::V2 {
|
||||
targets: proof_targets,
|
||||
proof_result_sender: ProofResultContext::new(
|
||||
self.proof_result_tx.clone(),
|
||||
0,
|
||||
HashedPostState::default(),
|
||||
Instant::now(),
|
||||
),
|
||||
},
|
||||
) {
|
||||
error!("failed to dispatch account multiproof: {e:?}");
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Outcome of the state root computation, including the state root itself with
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{BufMut, Encodable};
|
||||
use itertools::Itertools;
|
||||
use reth_execution_errors::{StateProofError, StorageRootError};
|
||||
use reth_execution_errors::{SparseTrieError, StateProofError, StorageRootError};
|
||||
use reth_provider::{DatabaseProviderROFactory, ProviderError};
|
||||
use reth_storage_errors::db::DatabaseError;
|
||||
use reth_trie::{
|
||||
@@ -232,6 +232,9 @@ pub enum ParallelStateRootError {
|
||||
/// Provider error.
|
||||
#[error(transparent)]
|
||||
Provider(#[from] ProviderError),
|
||||
/// Sparse trie error.
|
||||
#[error(transparent)]
|
||||
SparseTrie(#[from] SparseTrieError),
|
||||
/// Other unspecified error.
|
||||
#[error("{_0}")]
|
||||
Other(String),
|
||||
@@ -244,6 +247,7 @@ impl From<ParallelStateRootError> for ProviderError {
|
||||
ParallelStateRootError::StorageRoot(StorageRootError::Database(error)) => {
|
||||
Self::Database(error)
|
||||
}
|
||||
ParallelStateRootError::SparseTrie(error) => Self::other(error),
|
||||
ParallelStateRootError::Other(other) => Self::Database(DatabaseError::Other(other)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,17 @@ impl MultiProofTargetsV2 {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.account_targets.is_empty() && self.storage_targets.is_empty()
|
||||
}
|
||||
|
||||
/// Returns the number of items that will be considered during chunking.
|
||||
pub fn chunking_length(&self) -> usize {
|
||||
self.account_targets.len() +
|
||||
self.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
|
||||
}
|
||||
|
||||
/// Returns an iterator that yields chunks of the specified size.
|
||||
pub fn chunks(self, chunk_size: usize) -> impl Iterator<Item = Self> {
|
||||
ChunkedMultiProofTargetsV2::new(self, chunk_size)
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator that yields chunks of V2 proof targets of at most `size` account and storage
|
||||
|
||||
@@ -182,11 +182,23 @@ where
|
||||
self.storage.tries.get_mut(address).and_then(|e| e.as_revealed_mut())
|
||||
}
|
||||
|
||||
/// Returns mutable reference to storage tries.
|
||||
pub const fn storage_tries_mut(&mut self) -> &mut B256Map<RevealableSparseTrie<S>> {
|
||||
&mut self.storage.tries
|
||||
}
|
||||
|
||||
/// Takes the storage trie for the provided address.
|
||||
pub fn take_storage_trie(&mut self, address: &B256) -> Option<RevealableSparseTrie<S>> {
|
||||
self.storage.tries.remove(address)
|
||||
}
|
||||
|
||||
/// Takes the storage trie for the provided address, creating a blind one if it doesn't exist.
|
||||
pub fn take_or_create_storage_trie(&mut self, address: &B256) -> RevealableSparseTrie<S> {
|
||||
self.storage.tries.remove(address).unwrap_or_else(|| {
|
||||
self.storage.cleared_tries.pop().unwrap_or_else(|| self.storage.default_trie.clone())
|
||||
})
|
||||
}
|
||||
|
||||
/// Inserts storage trie for the provided address.
|
||||
pub fn insert_storage_trie(&mut self, address: B256, storage_trie: RevealableSparseTrie<S>) {
|
||||
self.storage.tries.insert(address, storage_trie);
|
||||
|
||||
Reference in New Issue
Block a user