mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf(engine): implement BAL handler for SparseTrieCacheTask (#21990)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com> Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -253,15 +253,13 @@ where
|
||||
// When BAL is present, use BAL prewarming and send BAL to multiproof
|
||||
debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
|
||||
|
||||
// Send BAL message immediately to MultiProofTask
|
||||
let _ = to_multi_proof.send(MultiProofMessage::BlockAccessList(Arc::clone(&bal)));
|
||||
|
||||
// Spawn with BAL prewarming
|
||||
// The prewarm task converts the BAL to HashedPostState and sends it on
|
||||
// to_multi_proof after slot prefetching completes.
|
||||
self.spawn_caching_with(
|
||||
env,
|
||||
prewarm_rx,
|
||||
provider_builder.clone(),
|
||||
None, // Don't send proof targets when BAL is present
|
||||
Some(to_multi_proof.clone()),
|
||||
Some(bal),
|
||||
v2_proofs_enabled,
|
||||
)
|
||||
|
||||
@@ -115,6 +115,8 @@ pub enum MultiProofMessage {
|
||||
/// The state update that was used to calculate the proof
|
||||
state: HashedPostState,
|
||||
},
|
||||
/// Pre-hashed state update from BAL conversion that can be applied directly without proofs.
|
||||
HashedStateUpdate(HashedPostState),
|
||||
/// Block Access List (EIP-7928; BAL) containing complete state changes for the block.
|
||||
///
|
||||
/// When received, the task generates a single state update from the BAL and processes it.
|
||||
@@ -1189,6 +1191,11 @@ impl MultiProofTask {
|
||||
}
|
||||
false
|
||||
}
|
||||
MultiProofMessage::HashedStateUpdate(hashed_state) => {
|
||||
batch_metrics.state_update_proofs_requested +=
|
||||
self.on_hashed_state_update(Source::BlockAccessList, hashed_state);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
use crate::tree::{
|
||||
cached_state::{CachedStateProvider, SavedCache},
|
||||
payload_processor::{
|
||||
bal::{total_slots, BALSlotIter},
|
||||
bal::{self, total_slots, BALSlotIter},
|
||||
executor::WorkloadExecutor,
|
||||
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
|
||||
PayloadExecutionCache,
|
||||
@@ -287,6 +287,7 @@ where
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
"Skipping BAL prewarm - no cache available"
|
||||
);
|
||||
self.send_bal_hashed_state(&bal);
|
||||
let _ =
|
||||
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
|
||||
return;
|
||||
@@ -302,7 +303,7 @@ where
|
||||
);
|
||||
|
||||
if total_slots == 0 {
|
||||
// No slots to prefetch, signal completion immediately
|
||||
self.send_bal_hashed_state(&bal);
|
||||
let _ =
|
||||
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
|
||||
return;
|
||||
@@ -347,10 +348,51 @@ where
|
||||
"All BAL prewarm workers completed"
|
||||
);
|
||||
|
||||
// Convert BAL to HashedPostState and send to multiproof task
|
||||
self.send_bal_hashed_state(&bal);
|
||||
|
||||
// Signal that execution has finished
|
||||
let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
|
||||
}
|
||||
|
||||
/// Converts the BAL to [`HashedPostState`](reth_trie::HashedPostState) and sends it to the
|
||||
/// multiproof task.
|
||||
fn send_bal_hashed_state(&self, bal: &BlockAccessList) {
|
||||
let Some(to_multi_proof) = &self.to_multi_proof else { return };
|
||||
|
||||
let provider = match self.ctx.provider.build() {
|
||||
Ok(provider) => provider,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
?err,
|
||||
"Failed to build provider for BAL hashed state conversion"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match bal::bal_to_hashed_post_state(bal, &provider) {
|
||||
Ok(hashed_state) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
accounts = hashed_state.accounts.len(),
|
||||
storages = hashed_state.storages.len(),
|
||||
"Converted BAL to hashed post state"
|
||||
);
|
||||
let _ = to_multi_proof.send(MultiProofMessage::HashedStateUpdate(hashed_state));
|
||||
let _ = to_multi_proof.send(MultiProofMessage::FinishedStateUpdates);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
?err,
|
||||
"Failed to convert BAL to hashed state"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes the task.
|
||||
///
|
||||
/// This will execute the transactions until all transactions have been processed or the task
|
||||
|
||||
@@ -343,6 +343,9 @@ where
|
||||
MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => {
|
||||
continue
|
||||
}
|
||||
MultiProofMessage::HashedStateUpdate(state) => {
|
||||
SparseTrieTaskMessage::HashedState(state)
|
||||
}
|
||||
};
|
||||
if hashed_state_tx.send(msg).is_err() {
|
||||
break;
|
||||
@@ -1029,3 +1032,59 @@ where
|
||||
|
||||
Ok(elapsed)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::{keccak256, Address, U256};
|
||||
use reth_trie_sparse::ParallelSparseTrie;
|
||||
|
||||
#[test]
|
||||
fn test_run_hashing_task_hashed_state_update_forwards() {
|
||||
let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
|
||||
let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
|
||||
|
||||
let address = keccak256(Address::random());
|
||||
let slot = keccak256(U256::from(42).to_be_bytes::<32>());
|
||||
let value = U256::from(999);
|
||||
|
||||
let mut hashed_state = HashedPostState::default();
|
||||
hashed_state.accounts.insert(
|
||||
address,
|
||||
Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
|
||||
);
|
||||
let mut storage = reth_trie::HashedStorage::new(false);
|
||||
storage.storage.insert(slot, value);
|
||||
hashed_state.storages.insert(address, storage);
|
||||
|
||||
let expected_state = hashed_state.clone();
|
||||
|
||||
let handle = std::thread::spawn(move || {
|
||||
SparseTrieCacheTask::<ParallelSparseTrie, ParallelSparseTrie>::run_hashing_task(
|
||||
updates_rx,
|
||||
hashed_state_tx,
|
||||
);
|
||||
});
|
||||
|
||||
updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
|
||||
updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap();
|
||||
drop(updates_tx);
|
||||
|
||||
let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
|
||||
panic!("expected HashedState message");
|
||||
};
|
||||
|
||||
let account = received.accounts.get(&address).unwrap().unwrap();
|
||||
assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
|
||||
assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
|
||||
|
||||
let storage = received.storages.get(&address).unwrap();
|
||||
assert_eq!(*storage.storage.get(&slot).unwrap(), value);
|
||||
|
||||
let second = hashed_state_rx.recv().unwrap();
|
||||
assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
|
||||
|
||||
assert!(hashed_state_rx.recv().is_err());
|
||||
handle.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user