feat(engine): Prefetch storage and accounts when BAL is provided (#20468)

This commit is contained in:
Brian Picciano
2025-12-23 17:04:05 +01:00
committed by GitHub
parent cf457689a6
commit 0b6361afa5
5 changed files with 513 additions and 45 deletions

View File

@@ -1,17 +1,124 @@
//! BAL (Block Access List, EIP-7928) related functionality.
use crate::tree::cached_state::CachedStateProvider;
use alloy_consensus::constants::KECCAK_EMPTY;
use alloy_eip7928::BlockAccessList;
use alloy_primitives::{keccak256, U256};
use alloy_primitives::{keccak256, Address, StorageKey, U256};
use reth_primitives_traits::Account;
use reth_provider::{AccountReader, ProviderError};
use reth_trie::{HashedPostState, HashedStorage};
use std::ops::Range;
/// Returns the total number of storage slots (both changed and read-only) across all accounts in
/// the BAL.
pub fn total_slots(bal: &BlockAccessList) -> usize {
bal.iter().map(|account| account.storage_changes.len() + account.storage_reads.len()).sum()
}
/// Iterator over storage slots in a [`BlockAccessList`], with range-based filtering.
///
/// Iterates over all `(Address, StorageKey)` pairs representing both changed and read-only
/// storage slots across all accounts in the BAL. For each account, changed slots are iterated
/// first, followed by read-only slots. The iterator intelligently skips accounts and slots
/// outside the specified range for efficient traversal.
#[derive(Debug)]
pub(crate) struct BALSlotIter<'a> {
bal: &'a BlockAccessList,
range: Range<usize>,
current_index: usize,
account_idx: usize,
/// Index within the current account's combined slots (changed + read-only).
/// If `slot_idx < storage_changes.len()`, we're in changed slots.
/// Otherwise, we're in read-only slots at index `slot_idx - storage_changes.len()`.
slot_idx: usize,
}
impl<'a> BALSlotIter<'a> {
/// Creates a new iterator over storage slots within the specified range.
pub(crate) fn new(bal: &'a BlockAccessList, range: Range<usize>) -> Self {
let mut iter = Self { bal, range, current_index: 0, account_idx: 0, slot_idx: 0 };
iter.skip_to_range_start();
iter
}
/// Skips to the first item within the range.
fn skip_to_range_start(&mut self) {
while self.account_idx < self.bal.len() {
let account = &self.bal[self.account_idx];
let slots_in_account = account.storage_changes.len() + account.storage_reads.len();
// Check if this account contains items in our range
let account_end = self.current_index + slots_in_account;
if account_end <= self.range.start {
// Entire account is before range, skip it
self.current_index = account_end;
self.account_idx += 1;
self.slot_idx = 0;
} else if self.current_index < self.range.start {
// Range starts somewhere in this account
let skip_slots = self.range.start - self.current_index;
self.slot_idx = skip_slots;
self.current_index = self.range.start;
break;
} else {
// We're at or past range start
break;
}
}
}
}
impl<'a> Iterator for BALSlotIter<'a> {
type Item = (Address, StorageKey);
fn next(&mut self) -> Option<Self::Item> {
// Check if we've exceeded the range
if self.current_index >= self.range.end {
return None;
}
// Find the next valid slot
while self.account_idx < self.bal.len() {
let account = &self.bal[self.account_idx];
let changed_len = account.storage_changes.len();
let total_len = changed_len + account.storage_reads.len();
if self.slot_idx < total_len {
let address = account.address;
let slot = if self.slot_idx < changed_len {
// We're in changed slots
account.storage_changes[self.slot_idx].slot
} else {
// We're in read-only slots
account.storage_reads[self.slot_idx - changed_len]
};
self.slot_idx += 1;
self.current_index += 1;
// Check if we've reached the end of range
if self.current_index > self.range.end {
return None;
}
return Some((address, slot));
}
// Move to next account
self.account_idx += 1;
self.slot_idx = 0;
}
None
}
}
/// Converts a Block Access List into a [`HashedPostState`] by extracting the final state
/// of modified accounts and storage slots.
pub fn bal_to_hashed_post_state<P>(
pub(crate) fn bal_to_hashed_post_state<P>(
bal: &BlockAccessList,
provider: &P,
provider: &CachedStateProvider<P>,
) -> Result<HashedPostState, ProviderError>
where
P: AccountReader,
@@ -20,7 +127,10 @@ where
for account_changes in bal {
let address = account_changes.address;
let hashed_address = keccak256(address);
// Always fetch the account; even if we don't need the db account to construct the final
// `Account`, doing this fills the cache.
let existing_account = provider.basic_account(&address)?;
// Get the latest balance (last balance change if any)
let balance = account_changes.balance_changes.last().map(|change| change.post_balance);
@@ -39,12 +149,14 @@ where
None
};
// Only fetch account from provider if we're missing any field
let existing_account = if balance.is_none() || nonce.is_none() || code_hash.is_none() {
provider.basic_account(&address)?
} else {
None
};
// If the account was only read then don't add it to the HashedPostState
if balance.is_none() &&
nonce.is_none() &&
code_hash.is_none() &&
account_changes.storage_changes.is_empty()
{
continue
}
// Build the final account state
let account = Account {
@@ -58,6 +170,7 @@ where
}),
};
let hashed_address = keccak256(address);
hashed_state.accounts.insert(hashed_address, Some(account));
// Process storage changes
@@ -75,9 +188,7 @@ where
}
}
if !storage_map.storage.is_empty() {
hashed_state.storages.insert(hashed_address, storage_map);
}
hashed_state.storages.insert(hashed_address, storage_map);
}
}
@@ -87,12 +198,17 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::tree::cached_state::{ExecutionCache, ExecutionCacheBuilder};
use alloy_eip7928::{
AccountChanges, BalanceChange, CodeChange, NonceChange, SlotChanges, StorageChange,
};
use alloy_primitives::{Address, Bytes, StorageKey, B256};
use reth_revm::test_utils::StateProviderTest;
fn new_cache() -> ExecutionCache {
ExecutionCacheBuilder::default().build_caches(1000)
}
#[test]
fn test_bal_to_hashed_post_state_basic() {
let provider = StateProviderTest::default();
@@ -108,6 +224,7 @@ mod tests {
};
let bal = vec![account_changes];
let provider = CachedStateProvider::new(provider, new_cache(), Default::default());
let result = bal_to_hashed_post_state(&bal, &provider).unwrap();
assert_eq!(result.accounts.len(), 1);
@@ -142,6 +259,7 @@ mod tests {
};
let bal = vec![account_changes];
let provider = CachedStateProvider::new(provider, new_cache(), Default::default());
let result = bal_to_hashed_post_state(&bal, &provider).unwrap();
let hashed_address = keccak256(address);
@@ -171,6 +289,7 @@ mod tests {
};
let bal = vec![account_changes];
let provider = CachedStateProvider::new(provider, new_cache(), Default::default());
let result = bal_to_hashed_post_state(&bal, &provider).unwrap();
let hashed_address = keccak256(address);
@@ -198,6 +317,7 @@ mod tests {
};
let bal = vec![account_changes];
let provider = CachedStateProvider::new(provider, new_cache(), Default::default());
let result = bal_to_hashed_post_state(&bal, &provider).unwrap();
let hashed_address = keccak256(address);
@@ -232,6 +352,7 @@ mod tests {
};
let bal = vec![account_changes];
let provider = CachedStateProvider::new(provider, new_cache(), Default::default());
let result = bal_to_hashed_post_state(&bal, &provider).unwrap();
let hashed_address = keccak256(address);
@@ -264,6 +385,7 @@ mod tests {
};
let bal = vec![account_changes];
let provider = CachedStateProvider::new(provider, new_cache(), Default::default());
let result = bal_to_hashed_post_state(&bal, &provider).unwrap();
let hashed_address = keccak256(address);
@@ -304,6 +426,7 @@ mod tests {
};
let bal = vec![account_changes];
let provider = CachedStateProvider::new(provider, new_cache(), Default::default());
let result = bal_to_hashed_post_state(&bal, &provider).unwrap();
let hashed_address = keccak256(address);
@@ -315,4 +438,117 @@ mod tests {
// Should have the last value
assert_eq!(*stored_value, U256::from(300));
}
#[test]
fn test_bal_slot_iter() {
// Create test data with multiple accounts and slots (both changed and read-only)
let addr1 = Address::repeat_byte(0x01);
let addr2 = Address::repeat_byte(0x02);
let addr3 = Address::repeat_byte(0x03);
// Account 1: 2 changed slots + 1 read-only = 3 total slots (indices 0, 1, 2)
let account1 = AccountChanges {
address: addr1,
storage_changes: vec![
SlotChanges {
slot: StorageKey::from(U256::from(100)),
changes: vec![StorageChange::new(0, B256::ZERO)],
},
SlotChanges {
slot: StorageKey::from(U256::from(101)),
changes: vec![StorageChange::new(0, B256::ZERO)],
},
],
storage_reads: vec![StorageKey::from(U256::from(102))],
balance_changes: vec![],
nonce_changes: vec![],
code_changes: vec![],
};
// Account 2: 1 changed slot + 1 read-only = 2 total slots (indices 3, 4)
let account2 = AccountChanges {
address: addr2,
storage_changes: vec![SlotChanges {
slot: StorageKey::from(U256::from(200)),
changes: vec![StorageChange::new(0, B256::ZERO)],
}],
storage_reads: vec![StorageKey::from(U256::from(201))],
balance_changes: vec![],
nonce_changes: vec![],
code_changes: vec![],
};
// Account 3: 2 changed slots + 1 read-only = 3 total slots (indices 5, 6, 7)
let account3 = AccountChanges {
address: addr3,
storage_changes: vec![
SlotChanges {
slot: StorageKey::from(U256::from(300)),
changes: vec![StorageChange::new(0, B256::ZERO)],
},
SlotChanges {
slot: StorageKey::from(U256::from(301)),
changes: vec![StorageChange::new(0, B256::ZERO)],
},
],
storage_reads: vec![StorageKey::from(U256::from(302))],
balance_changes: vec![],
nonce_changes: vec![],
code_changes: vec![],
};
let bal = vec![account1, account2, account3];
// Test 1: Iterate over all slots (range 0..8)
let items: Vec<_> = BALSlotIter::new(&bal, 0..8).collect();
assert_eq!(items.len(), 8);
// Account 1: changed slots first (100, 101), then read-only (102)
assert_eq!(items[0], (addr1, StorageKey::from(U256::from(100))));
assert_eq!(items[1], (addr1, StorageKey::from(U256::from(101))));
assert_eq!(items[2], (addr1, StorageKey::from(U256::from(102))));
// Account 2: changed slot (200), then read-only (201)
assert_eq!(items[3], (addr2, StorageKey::from(U256::from(200))));
assert_eq!(items[4], (addr2, StorageKey::from(U256::from(201))));
// Account 3: changed slots (300, 301), then read-only (302)
assert_eq!(items[5], (addr3, StorageKey::from(U256::from(300))));
assert_eq!(items[6], (addr3, StorageKey::from(U256::from(301))));
assert_eq!(items[7], (addr3, StorageKey::from(U256::from(302))));
// Test 2: Range that skips first account (range 3..6)
let items: Vec<_> = BALSlotIter::new(&bal, 3..6).collect();
assert_eq!(items.len(), 3);
assert_eq!(items[0], (addr2, StorageKey::from(U256::from(200))));
assert_eq!(items[1], (addr2, StorageKey::from(U256::from(201))));
assert_eq!(items[2], (addr3, StorageKey::from(U256::from(300))));
// Test 3: Range within first account (range 1..2)
let items: Vec<_> = BALSlotIter::new(&bal, 1..2).collect();
assert_eq!(items.len(), 1);
assert_eq!(items[0], (addr1, StorageKey::from(U256::from(101))));
// Test 4: Range spanning multiple accounts (range 2..5)
let items: Vec<_> = BALSlotIter::new(&bal, 2..5).collect();
assert_eq!(items.len(), 3);
// Last slot from account 1 (read-only)
assert_eq!(items[0], (addr1, StorageKey::from(U256::from(102))));
// Account 2 (changed + read-only)
assert_eq!(items[1], (addr2, StorageKey::from(U256::from(200))));
assert_eq!(items[2], (addr2, StorageKey::from(U256::from(201))));
// Test 5: Empty range
let items: Vec<_> = BALSlotIter::new(&bal, 5..5).collect();
assert_eq!(items.len(), 0);
// Test 6: Range beyond end (starts at index 6)
let items: Vec<_> = BALSlotIter::new(&bal, 6..100).collect();
assert_eq!(items.len(), 2);
assert_eq!(items[0], (addr3, StorageKey::from(U256::from(301))));
assert_eq!(items[1], (addr3, StorageKey::from(U256::from(302))));
// Test 7: Range that starts in read-only slots (index 2 is the read-only slot of account 1)
let items: Vec<_> = BALSlotIter::new(&bal, 2..4).collect();
assert_eq!(items.len(), 2);
assert_eq!(items[0], (addr1, StorageKey::from(U256::from(102))));
assert_eq!(items[1], (addr2, StorageKey::from(U256::from(200))));
}
}

View File

@@ -3,11 +3,11 @@
use super::precompile_cache::PrecompileCacheMap;
use crate::tree::{
cached_state::{
CachedStateMetrics, ExecutionCache as StateExecutionCache, ExecutionCacheBuilder,
SavedCache,
CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache,
ExecutionCacheBuilder, SavedCache,
},
payload_processor::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::StateRootComputeOutcome,
},
sparse_trie::SparseTrieTask,
@@ -230,6 +230,8 @@ where
+ Send
+ 'static,
{
let parent_hash = env.parent_hash;
// start preparing transactions immediately
let (prewarm_rx, execution_rx, transaction_count_hint) =
self.spawn_tx_iterator(transactions);
@@ -240,28 +242,30 @@ where
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
// When BAL is present, skip spawning prewarm tasks entirely and send BAL to multiproof
debug!(target: "engine::tree::payload_processor", "BAL present, skipping prewarm tasks");
// When BAL is present, use BAL prewarming and send BAL to multiproof
debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
// Send BAL message immediately to MultiProofTask
let _ = to_multi_proof.send(MultiProofMessage::BlockAccessList(bal));
let _ = to_multi_proof.send(MultiProofMessage::BlockAccessList(Arc::clone(&bal)));
// Spawn minimal cache-only task without prewarming
// Spawn with BAL prewarming
self.spawn_caching_with(
env,
prewarm_rx,
transaction_count_hint,
provider_builder.clone(),
None, // Don't send proof targets when BAL is present
Some(bal),
)
} else {
// Normal path: spawn with full prewarming
// Normal path: spawn with transaction prewarming
self.spawn_caching_with(
env,
prewarm_rx,
transaction_count_hint,
provider_builder.clone(),
Some(to_multi_proof.clone()),
None,
)
};
@@ -289,11 +293,17 @@ where
// spawn multi-proof task
let parent_span = span.clone();
self.executor.spawn_blocking(move || {
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
multi_proof_task.run(provider);
self.executor.spawn_blocking({
let saved_cache = self.cache_for(parent_hash);
let cache = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
move || {
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
let provider = CachedStateProvider::new(provider, cache, cache_metrics);
multi_proof_task.run(provider);
}
});
// wire the sparse trie to the state root response receiver
@@ -320,13 +330,14 @@ where
env: ExecutionEnv<Evm>,
transactions: I,
provider_builder: StateProviderBuilder<N, P>,
bal: Option<Arc<BlockAccessList>>,
) -> IteratorPayloadHandle<Evm, I, N>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None);
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal);
PayloadHandle {
to_multi_proof: None,
prewarm_handle,
@@ -400,6 +411,7 @@ where
transaction_count_hint: usize,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
) -> CacheTaskHandle<N::Receipt>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
@@ -444,7 +456,12 @@ where
{
let to_prewarm_task = to_prewarm_task.clone();
self.executor.spawn_blocking(move || {
prewarm_task.run(transactions, to_prewarm_task);
let mode = if let Some(bal) = bal {
PrewarmMode::BlockAccessList(bal)
} else {
PrewarmMode::Transactions(transactions)
};
prewarm_task.run(mode, to_prewarm_task);
});
}

View File

@@ -1,6 +1,8 @@
//! Multiproof task related functionality.
use crate::tree::payload_processor::bal::bal_to_hashed_post_state;
use crate::tree::{
cached_state::CachedStateProvider, payload_processor::bal::bal_to_hashed_post_state,
};
use alloy_eip7928::BlockAccessList;
use alloy_evm::block::StateChangeSource;
use alloy_primitives::{keccak256, map::HashSet, B256};
@@ -873,7 +875,7 @@ impl MultiProofTask {
msg: MultiProofMessage,
ctx: &mut MultiproofBatchCtx,
batch_metrics: &mut MultiproofBatchMetrics,
provider: &P,
provider: &CachedStateProvider<P>,
) -> bool
where
P: AccountReader,
@@ -1049,7 +1051,7 @@ impl MultiProofTask {
}
// Convert BAL to HashedPostState and process it
match bal_to_hashed_post_state(&bal, &provider) {
match bal_to_hashed_post_state(&bal, provider) {
Ok(hashed_state) => {
debug!(
target: "engine::tree::payload_processor::multiproof",
@@ -1178,7 +1180,7 @@ impl MultiProofTask {
target = "engine::tree::payload_processor::multiproof",
skip_all
)]
pub(crate) fn run<P>(mut self, provider: P)
pub(crate) fn run<P>(mut self, provider: CachedStateProvider<P>)
where
P: AccountReader,
{
@@ -1495,12 +1497,13 @@ fn estimate_evm_state_targets(state: &EvmState) -> usize {
#[cfg(test)]
mod tests {
use super::*;
use crate::tree::cached_state::ExecutionCacheBuilder;
use alloy_eip7928::{AccountChanges, BalanceChange};
use alloy_primitives::{map::B256Set, Address};
use reth_provider::{
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
BlockReader, DatabaseProviderFactory, PruneCheckpointReader, StageCheckpointReader,
TrieReader,
BlockReader, DatabaseProviderFactory, LatestStateProvider, PruneCheckpointReader,
StageCheckpointReader, StateProviderBox, TrieReader,
};
use reth_trie::MultiProof;
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
@@ -1537,6 +1540,20 @@ mod tests {
MultiProofTask::new(proof_handle, to_sparse_trie, Some(1), tx, rx)
}
fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
where
F: DatabaseProviderFactory<
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
> + Clone
+ Send
+ 'static,
{
let db_provider = factory.database_provider_ro().unwrap();
let state_provider: StateProviderBox = Box::new(LatestStateProvider::new(db_provider));
let cache = ExecutionCacheBuilder::default().build_caches(1000);
CachedStateProvider::new(state_provider, cache, Default::default())
}
#[test]
fn test_add_proof_in_sequence() {
let mut sequencer = ProofSequencer::default();
@@ -2466,7 +2483,7 @@ mod tests {
use revm_state::Account;
let test_provider_factory = create_test_provider_factory();
let test_provider = test_provider_factory.latest().unwrap();
let test_provider = create_cached_provider(test_provider_factory.clone());
let mut task = create_test_state_root_task(test_provider_factory);
// Queue: Prefetch1, StateUpdate, Prefetch2
@@ -2686,7 +2703,7 @@ mod tests {
#[test]
fn test_bal_message_processing() {
let test_provider_factory = create_test_provider_factory();
let test_provider = test_provider_factory.latest().unwrap();
let test_provider = create_cached_provider(test_provider_factory.clone());
let mut task = create_test_state_root_task(test_provider_factory);
// Create a simple BAL with one account change

View File

@@ -14,13 +14,16 @@
use crate::tree::{
cached_state::{CachedStateProvider, SavedCache},
payload_processor::{
executor::WorkloadExecutor, multiproof::MultiProofMessage,
bal::{total_slots, BALSlotIter},
executor::WorkloadExecutor,
multiproof::MultiProofMessage,
ExecutionCache as PayloadExecutionCache,
},
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
ExecutionEnv, StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::Typed2718;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
@@ -30,10 +33,11 @@ use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
use reth_execution_types::ExecutionOutcome;
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{BlockReader, StateProviderFactory, StateReader};
use reth_provider::{AccountReader, BlockReader, StateProvider, StateProviderFactory, StateReader};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_trie::MultiProofTargets;
use std::{
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender},
@@ -43,6 +47,14 @@ use std::{
};
use tracing::{debug, debug_span, instrument, trace, warn, Span};
/// Determines the prewarming mode: transaction-based or BAL-based.
pub(super) enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream.
Transactions(Receiver<Tx>),
/// Prewarm by prefetching slots from a Block Access List.
BlockAccessList(Arc<BlockAccessList>),
}
/// A wrapper for transactions that includes their index in the block.
#[derive(Clone)]
struct IndexedTransaction<Tx> {
@@ -286,6 +298,86 @@ where
}
}
/// Runs BAL-based prewarming by spawning workers to prefetch storage slots.
///
/// Divides the total slots across `max_concurrency` workers, each responsible for
/// prefetching a range of slots from the BAL.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn run_bal_prewarm(
&self,
bal: Arc<BlockAccessList>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
) {
// Only prefetch if we have a cache to populate
if self.ctx.saved_cache.is_none() {
trace!(
target: "engine::tree::payload_processor::prewarm",
"Skipping BAL prewarm - no cache available"
);
let _ =
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
return;
}
let total_slots = total_slots(&bal);
trace!(
target: "engine::tree::payload_processor::prewarm",
total_slots,
max_concurrency = self.max_concurrency,
"Starting BAL prewarm"
);
if total_slots == 0 {
// No slots to prefetch, signal completion immediately
let _ =
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
return;
}
let (done_tx, done_rx) = mpsc::channel();
// Calculate number of workers needed (at most max_concurrency)
let workers_needed = total_slots.min(self.max_concurrency);
// Calculate slots per worker
let slots_per_worker = total_slots / workers_needed;
let remainder = total_slots % workers_needed;
// Spawn workers with their assigned ranges
for i in 0..workers_needed {
let start = i * slots_per_worker + i.min(remainder);
let extra = if i < remainder { 1 } else { 0 };
let end = start + slots_per_worker + extra;
self.ctx.spawn_bal_worker(
i,
&self.executor,
Arc::clone(&bal),
start..end,
done_tx.clone(),
);
}
// Drop our handle to done_tx so we can detect completion
drop(done_tx);
// Wait for all workers to complete
let mut completed_workers = 0;
while done_rx.recv().is_ok() {
completed_workers += 1;
}
trace!(
target: "engine::tree::payload_processor::prewarm",
completed_workers,
"All BAL prewarm workers completed"
);
// Signal that execution has finished
let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
}
/// Executes the task.
///
/// This will execute the transactions until all transactions have been processed or the task
@@ -297,13 +389,22 @@ where
name = "prewarm and caching",
skip_all
)]
pub(super) fn run(
pub(super) fn run<Tx>(
self,
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
mode: PrewarmMode<Tx>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
) {
// spawn execution tasks.
self.spawn_all(pending, actions_tx);
) where
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
{
// Spawn execution tasks based on mode
match mode {
PrewarmMode::Transactions(pending) => {
self.spawn_all(pending, actions_tx);
}
PrewarmMode::BlockAccessList(bal) => {
self.run_bal_prewarm(bal, actions_tx);
}
}
let mut final_execution_outcome = None;
let mut finished_execution = false;
@@ -569,6 +670,97 @@ where
handles
}
/// Spawns a worker task for BAL slot prefetching.
///
/// The worker iterates over the specified range of slots in the BAL and ensures
/// each slot is loaded into the cache by accessing it through the state provider.
fn spawn_bal_worker(
&self,
idx: usize,
executor: &WorkloadExecutor,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,
) {
let ctx = self.clone();
let span = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"bal prewarm worker",
idx,
range_start = range.start,
range_end = range.end
);
executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.prefetch_bal_slots(bal, range, done_tx);
});
}
/// Prefetches storage slots from a BAL range into the cache.
///
/// This iterates through the specified range of slots and accesses them via the state
/// provider to populate the cache.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn prefetch_bal_slots(
self,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,
) {
let Self { saved_cache, provider, metrics, .. } = self;
// Build state provider
let state_provider = match provider.build() {
Ok(provider) => provider,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
%err,
"Failed to build state provider in BAL prewarm thread"
);
let _ = done_tx.send(());
return;
}
};
// Wrap with cache (guaranteed to be Some since run_bal_prewarm checks)
let saved_cache = saved_cache.expect("BAL prewarm should only run with cache");
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
let state_provider = CachedStateProvider::new(state_provider, caches, cache_metrics);
let start = Instant::now();
// Track last seen address to avoid fetching the same account multiple times.
let mut last_address = None;
// Iterate through the assigned range of slots
for (address, slot) in BALSlotIter::new(&bal, range.clone()) {
// Fetch the account if this is a different address than the last one
if last_address != Some(address) {
let _ = state_provider.basic_account(&address);
last_address = Some(address);
}
// Access the slot to populate the cache
let _ = state_provider.storage(address, slot);
}
let elapsed = start.elapsed();
trace!(
target: "engine::tree::payload_processor::prewarm",
?range,
elapsed_ms = elapsed.as_millis(),
"BAL prewarm worker completed"
);
// Signal completion
let _ = done_tx.send(());
metrics.bal_slot_iteration_duration.record(elapsed.as_secs_f64());
}
}
/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
@@ -650,4 +842,6 @@ pub(crate) struct PrewarmMetrics {
pub(crate) cache_saving_duration: Gauge,
/// Counter for transaction execution errors during prewarming
pub(crate) transaction_errors: Counter,
/// A histogram of BAL slot iteration duration during prefetching
pub(crate) bal_slot_iteration_duration: Histogram,
}

View File

@@ -861,8 +861,12 @@ where
}
StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
let start = Instant::now();
let handle =
self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
let handle = self.payload_processor.spawn_cache_exclusive(
env,
txs,
provider_builder,
block_access_list,
);
// Record prewarming initialization duration
self.metrics