From a8ec78fc87d2a719d6062063b05fa150e1b651f8 Mon Sep 17 00:00:00 2001 From: YK Date: Tue, 10 Feb 2026 15:50:16 -0500 Subject: [PATCH] perf(engine): implement BAL handler for SparseTrieCacheTask (#21990) Co-authored-by: Georgios Konstantopoulos Co-authored-by: Amp --- .../tree/src/tree/payload_processor/mod.rs | 8 +-- .../src/tree/payload_processor/multiproof.rs | 7 +++ .../src/tree/payload_processor/prewarm.rs | 46 ++++++++++++++- .../src/tree/payload_processor/sparse_trie.rs | 59 +++++++++++++++++++ 4 files changed, 113 insertions(+), 7 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index de99dc6d66..8ee4d4e670 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -253,15 +253,13 @@ where // When BAL is present, use BAL prewarming and send BAL to multiproof debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming"); - // Send BAL message immediately to MultiProofTask - let _ = to_multi_proof.send(MultiProofMessage::BlockAccessList(Arc::clone(&bal))); - - // Spawn with BAL prewarming + // The prewarm task converts the BAL to HashedPostState and sends it on + // to_multi_proof after slot prefetching completes. self.spawn_caching_with( env, prewarm_rx, provider_builder.clone(), - None, // Don't send proof targets when BAL is present + Some(to_multi_proof.clone()), Some(bal), v2_proofs_enabled, ) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index c01bd516d4..2811efdef9 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -115,6 +115,8 @@ pub enum MultiProofMessage { /// The state update that was used to calculate the proof state: HashedPostState, }, + /// Pre-hashed state update from BAL conversion that can be applied directly without proofs. + HashedStateUpdate(HashedPostState), /// Block Access List (EIP-7928; BAL) containing complete state changes for the block. /// /// When received, the task generates a single state update from the BAL and processes it. @@ -1189,6 +1191,11 @@ impl MultiProofTask { } false } + MultiProofMessage::HashedStateUpdate(hashed_state) => { + batch_metrics.state_update_proofs_requested += + self.on_hashed_state_update(Source::BlockAccessList, hashed_state); + false + } } } diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 5ac0d17507..7b48ba46ed 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -14,7 +14,7 @@ use crate::tree::{ cached_state::{CachedStateProvider, SavedCache}, payload_processor::{ - bal::{total_slots, BALSlotIter}, + bal::{self, total_slots, BALSlotIter}, executor::WorkloadExecutor, multiproof::{MultiProofMessage, VersionedMultiProofTargets}, PayloadExecutionCache, @@ -287,6 +287,7 @@ where target: "engine::tree::payload_processor::prewarm", "Skipping BAL prewarm - no cache available" ); + self.send_bal_hashed_state(&bal); let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 }); return; @@ -302,7 +303,7 @@ where ); if total_slots == 0 { - // No slots to prefetch, signal completion immediately + self.send_bal_hashed_state(&bal); let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 }); return; @@ -347,10 +348,51 @@ where "All BAL prewarm workers completed" ); + // Convert BAL to HashedPostState and send to multiproof task + self.send_bal_hashed_state(&bal); + // Signal that execution has finished let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 }); } + /// Converts the BAL to [`HashedPostState`](reth_trie::HashedPostState) and sends it to the + /// multiproof task. + fn send_bal_hashed_state(&self, bal: &BlockAccessList) { + let Some(to_multi_proof) = &self.to_multi_proof else { return }; + + let provider = match self.ctx.provider.build() { + Ok(provider) => provider, + Err(err) => { + warn!( + target: "engine::tree::payload_processor::prewarm", + ?err, + "Failed to build provider for BAL hashed state conversion" + ); + return; + } + }; + + match bal::bal_to_hashed_post_state(bal, &provider) { + Ok(hashed_state) => { + debug!( + target: "engine::tree::payload_processor::prewarm", + accounts = hashed_state.accounts.len(), + storages = hashed_state.storages.len(), + "Converted BAL to hashed post state" + ); + let _ = to_multi_proof.send(MultiProofMessage::HashedStateUpdate(hashed_state)); + let _ = to_multi_proof.send(MultiProofMessage::FinishedStateUpdates); + } + Err(err) => { + warn!( + target: "engine::tree::payload_processor::prewarm", + ?err, + "Failed to convert BAL to hashed state" + ); + } + } + } + /// Executes the task. /// /// This will execute the transactions until all transactions have been processed or the task diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index d7c4c960d5..93ca2b3573 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -343,6 +343,9 @@ where MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => { continue } + MultiProofMessage::HashedStateUpdate(state) => { + SparseTrieTaskMessage::HashedState(state) + } }; if hashed_state_tx.send(msg).is_err() { break; @@ -1029,3 +1032,59 @@ where Ok(elapsed) } + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{keccak256, Address, U256}; + use reth_trie_sparse::ParallelSparseTrie; + + #[test] + fn test_run_hashing_task_hashed_state_update_forwards() { + let (updates_tx, updates_rx) = crossbeam_channel::unbounded(); + let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded(); + + let address = keccak256(Address::random()); + let slot = keccak256(U256::from(42).to_be_bytes::<32>()); + let value = U256::from(999); + + let mut hashed_state = HashedPostState::default(); + hashed_state.accounts.insert( + address, + Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }), + ); + let mut storage = reth_trie::HashedStorage::new(false); + storage.storage.insert(slot, value); + hashed_state.storages.insert(address, storage); + + let expected_state = hashed_state.clone(); + + let handle = std::thread::spawn(move || { + SparseTrieCacheTask::::run_hashing_task( + updates_rx, + hashed_state_tx, + ); + }); + + updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap(); + updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap(); + drop(updates_tx); + + let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else { + panic!("expected HashedState message"); + }; + + let account = received.accounts.get(&address).unwrap().unwrap(); + assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance); + assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce); + + let storage = received.storages.get(&address).unwrap(); + assert_eq!(*storage.storage.get(&slot).unwrap(), value); + + let second = hashed_state_rx.recv().unwrap(); + assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates)); + + assert!(hashed_state_rx.recv().is_err()); + handle.join().unwrap(); + } +}