preliminary integration of acc proofs

This commit is contained in:
Yong Kang
2025-10-02 04:47:18 +00:00
parent 7333845a55
commit 2b38f20878
4 changed files with 272 additions and 61 deletions

View File

@@ -20,6 +20,9 @@ pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Default number of storage proof workers.
pub const DEFAULT_STORAGE_PROOF_WORKERS: usize = 6;
/// Default number of account proof workers.
pub const DEFAULT_ACCOUNT_PROOF_WORKERS: usize = 2;
/// Default maximum concurrency for prewarm task.
pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
@@ -86,6 +89,8 @@ pub struct TreeConfig {
max_proof_task_concurrency: u64,
/// Number of workers dedicated to storage proof execution
storage_proof_workers: usize,
/// Number of workers dedicated to account proof execution
account_proof_workers: usize,
/// Whether multiproof task should chunk proof targets.
multiproof_chunking_enabled: bool,
/// Multiproof task chunk size for proof targets.
@@ -133,6 +138,7 @@ impl Default for TreeConfig {
has_enough_parallelism: has_enough_parallelism(),
max_proof_task_concurrency: DEFAULT_MAX_PROOF_TASK_CONCURRENCY,
storage_proof_workers: DEFAULT_STORAGE_PROOF_WORKERS,
account_proof_workers: DEFAULT_ACCOUNT_PROOF_WORKERS,
multiproof_chunking_enabled: true,
multiproof_chunk_size: DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
@@ -163,6 +169,7 @@ impl TreeConfig {
has_enough_parallelism: bool,
max_proof_task_concurrency: u64,
storage_proof_workers: usize,
account_proof_workers: usize,
multiproof_chunking_enabled: bool,
multiproof_chunk_size: usize,
reserved_cpu_cores: usize,
@@ -187,6 +194,7 @@ impl TreeConfig {
has_enough_parallelism,
max_proof_task_concurrency,
storage_proof_workers,
account_proof_workers,
multiproof_chunking_enabled,
multiproof_chunk_size,
reserved_cpu_cores,
@@ -233,6 +241,11 @@ impl TreeConfig {
self.storage_proof_workers
}
/// Return the number of account proof workers.
pub const fn account_proof_workers(&self) -> usize {
self.account_proof_workers
}
/// Return whether the multiproof task chunking is enabled.
pub const fn multiproof_chunking_enabled(&self) -> bool {
self.multiproof_chunking_enabled
@@ -417,6 +430,12 @@ impl TreeConfig {
self
}
/// Setter for number of account proof workers.
pub const fn with_account_proof_workers(mut self, account_proof_workers: usize) -> Self {
self.account_proof_workers = account_proof_workers;
self
}
/// Setter for whether multiproof task should chunk proof targets.
pub const fn with_multiproof_chunking_enabled(
mut self,

View File

@@ -26,13 +26,13 @@ use reth_evm::{
};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory,
StateReader,
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, FactoryTx, ProviderResult,
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_trie::TrieInput;
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofTaskManager},
proof_task::{ProofTaskCtx, ProofTaskManager, ProofTaskManagerHandle},
root::ParallelStateRootError,
};
use reth_trie_sparse::{
@@ -174,7 +174,7 @@ where
consistent_view: ConsistentDbView<P>,
trie_input: TrieInput,
config: &TreeConfig,
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
) -> ProviderResult<PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>
where
P: DatabaseProviderFactory<Provider: BlockReader>
+ BlockReader
@@ -189,30 +189,25 @@ where
MultiProofConfig::new_from_input(consistent_view, trie_input);
self.trie_input = Some(trie_input);
// Create and spawn the storage proof task
// Create and spawn dual proof task managers
let task_ctx = ProofTaskCtx::new(
state_root_config.nodes_sorted.clone(),
state_root_config.state_sorted.clone(),
state_root_config.prefix_sets.clone(),
);
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
let num_workers = config.storage_proof_workers();
let proof_task = ProofTaskManager::new(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
task_ctx,
num_workers,
max_proof_task_concurrency,
)
.expect("Failed to create ProofTaskManager");
// Create and spawn dual proof task managers
let (storage_handle, account_handle, max_storage_concurrency) =
self.create_proof_managers(&state_root_config, task_ctx, &config)?;
// We set it to half of the proof task concurrency, because often for each multiproof we
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
let max_multi_proof_task_concurrency = max_storage_concurrency / 2;
let multi_proof_task = MultiProofTask::new(
state_root_config,
self.executor.clone(),
proof_task.handle(),
account_handle,
storage_handle.clone(),
to_sparse_trie,
max_multi_proof_task_concurrency,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
@@ -241,26 +236,15 @@ where
let (state_root_tx, state_root_rx) = channel();
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
// Pass storage handle for blinded node retrieval during sparse trie construction
self.spawn_sparse_trie_task(sparse_trie_rx, storage_handle, state_root_tx);
// spawn the proof task
self.executor.spawn_blocking(move || {
if let Err(err) = proof_task.run() {
// At least log if there is an error at any point
tracing::error!(
target: "engine::root",
?err,
"Storage proof task returned an error"
);
}
});
PayloadHandle {
Ok(PayloadHandle {
to_multi_proof,
prewarm_handle,
state_root: Some(state_root_rx),
transactions: execution_rx,
}
})
}
/// Spawns a task that exclusively handles cache prewarming for transaction execution.
@@ -440,6 +424,75 @@ where
cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
});
}
/// Creates both storage and account proof task managers, reducing code duplication.
///
/// Returns a tuple of (storage_handle, account_handle, max_storage_concurrency) for use with
/// multiproof tasks.
fn create_proof_managers<Factory>(
&self,
state_root_config: &MultiProofConfig<Factory>,
task_ctx: ProofTaskCtx,
config: &TreeConfig,
) -> ProviderResult<(
ProofTaskManagerHandle<FactoryTx<Factory>>,
ProofTaskManagerHandle<FactoryTx<Factory>>,
usize,
)>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
{
// Calculate worker counts and concurrency limits
let storage_workers = config.storage_proof_workers();
let account_workers = config.account_proof_workers();
let max_storage_concurrency = config.max_proof_task_concurrency() as usize;
let max_account_concurrency = account_workers; // Use worker count as concurrency limit
// Create storage proof manager
let storage_proof_manager = ProofTaskManager::new(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
task_ctx.clone(),
storage_workers,
0, // account_worker_count = 0 (no account workers)
max_storage_concurrency,
)?;
// Create account proof manager
let account_proof_manager = ProofTaskManager::new(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
task_ctx,
0, // storage_worker_count = 0 (no storage workers)
account_workers, // account_worker_count
max_account_concurrency,
)?;
let storage_handle = storage_proof_manager.handle();
let account_handle = account_proof_manager.handle();
// Spawn both managers with error logging
self.executor.spawn_blocking(move || {
if let Err(err) = storage_proof_manager.run() {
tracing::error!(
target: "engine::root",
?err,
"Storage proof manager returned an error"
);
}
});
self.executor.spawn_blocking(move || {
if let Err(err) = account_proof_manager.run() {
tracing::error!(
target: "engine::root",
?err,
"Account proof manager returned an error"
);
}
});
Ok((storage_handle, account_handle, max_storage_concurrency))
}
}
/// Handle to all the spawned tasks.
@@ -860,14 +913,19 @@ mod tests {
PrecompileCacheMap::default(),
);
let provider = BlockchainProvider::new(factory).unwrap();
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
);
let mut handle =
payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
&TreeConfig::default(),
)
.unwrap();
let mut state_hook = handle.state_hook();

View File

@@ -483,7 +483,8 @@ mod tests {
rt.handle().clone(),
consistent_view.clone(),
task_ctx,
1, // num_workers
1, // storage_worker_count
0, // account_worker_count
1, // max_concurrency
)
.unwrap();

View File

@@ -9,7 +9,10 @@
//! [`HashedPostStateCursorFactory`], which are each backed by a database transaction.
use crate::root::ParallelStateRootError;
use alloy_primitives::{map::B256Set, B256};
use alloy_primitives::{
map::{B256Map, B256Set},
B256,
};
use reth_db_api::transaction::DbTx;
use reth_execution_errors::SparseTrieError;
use reth_provider::{
@@ -31,7 +34,7 @@ use reth_trie_common::{
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
use std::{
collections::VecDeque,
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::{channel, Receiver, SendError, Sender},
@@ -48,18 +51,108 @@ use crate::proof_task_metrics::ProofTaskMetrics;
type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
/// Executes an account multiproof task in a worker thread.
///
/// This function coordinates with the storage manager to compute storage proofs for all
/// accounts in the multiproof, then assembles the final account multiproof using the
/// pre-computed storage proofs.
fn execute_account_multiproof_worker<Tx: DbTx>(
input: AccountMultiproofInput<Tx>,
proof_tx: &ProofTaskTx<Tx>,
) {
let AccountMultiproofInput {
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
storage_proof_handle,
result_sender,
} = input;
// Phase 1: Queue ALL storage proof requests to storage manager
let mut storage_receivers = HashMap::new();
for (address, slots) in targets.iter() {
if slots.is_empty() {
continue;
}
let (sender, receiver) = channel();
let prefix_set = prefix_sets.storage_prefix_sets.get(address).cloned().unwrap_or_default();
let storage_input = StorageProofInput::new(
*address,
prefix_set,
slots.clone(),
collect_branch_node_masks,
multi_added_removed_keys.clone(),
);
if storage_proof_handle
.queue_task(ProofTaskKind::StorageProof(storage_input, sender))
.is_err()
{
let _ = result_sender
.send(Err(ParallelStateRootError::Other("storage manager closed".into())));
return;
}
storage_receivers.insert(*address, receiver);
}
// Phase 2: Wait for ALL storage proofs (BLOCKS worker until all complete)
let mut storage_proofs = B256Map::default();
for (address, receiver) in storage_receivers {
match receiver.recv() {
Ok(Ok(proof)) => {
storage_proofs.insert(address, proof);
}
Ok(Err(e)) => {
let _ = result_sender.send(Err(e));
return;
}
Err(_) => {
let _ = result_sender.send(Err(ParallelStateRootError::Other(
"storage proof channel closed".into(),
)));
return;
}
}
}
// Phase 3: Assemble account multiproof using pre-computed storage proofs
let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories();
let result = crate::proof::build_account_multiproof_with_storage(
trie_cursor_factory,
hashed_cursor_factory,
targets,
prefix_sets,
storage_proofs,
collect_branch_node_masks,
multi_added_removed_keys,
);
let _ = result_sender.send(result.map(|(multiproof, _stats)| multiproof));
}
/// Proof job dispatched to the storage worker pool.
struct ProofJob {
input: StorageProofInput,
result_tx: Sender<StorageProofResult>,
}
/// Account multiproof job dispatched to the account worker pool.
struct AccountProofJob<Tx> {
input: AccountMultiproofInput<Tx>,
}
/// A task that manages sending multiproof requests to a number of tasks that have longer-running
/// database transactions
#[derive(Debug)]
pub struct ProofTaskManager<Factory: DatabaseProviderFactory> {
/// Channel for dispatching storage proof work to workers
storage_work_tx: crossbeam_channel::Sender<ProofJob>,
/// Channel for dispatching account multiproof work to workers
account_work_tx: crossbeam_channel::Sender<AccountProofJob<FactoryTx<Factory>>>,
/// Max number of database transactions to create (for blinded nodes)
max_concurrency: usize,
/// Number of database transactions created (for blinded nodes)
@@ -92,24 +185,27 @@ impl<Factory> ProofTaskManager<Factory>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
{
/// Creates a new [`ProofTaskManager`] with the given number of workers and max concurrency.
/// Creates a new [`ProofTaskManager`] with the given worker counts and max concurrency.
///
/// Spawns `num_workers` storage proof workers upfront that reuse database transactions
/// for the entire block lifetime (prewarm.rs pattern).
/// Spawns `storage_worker_count` storage proof workers and `account_worker_count` account
/// multiproof workers upfront that reuse database transactions for the entire block lifetime
/// (prewarm.rs pattern).
///
/// Returns an error if the consistent view provider fails to create a read-only transaction.
pub fn new(
executor: Handle,
view: ConsistentDbView<Factory>,
task_ctx: ProofTaskCtx,
num_workers: usize,
storage_worker_count: usize,
account_worker_count: usize,
max_concurrency: usize,
) -> ProviderResult<Self> {
let (tx_sender, proof_task_rx) = channel();
let (storage_work_tx, storage_work_rx) = crossbeam_channel::unbounded();
let (account_work_tx, account_work_rx) = crossbeam_channel::unbounded();
// Spawn workers upfront (prewarm.rs pattern)
for worker_id in 0..num_workers {
// Spawn storage workers upfront (prewarm.rs pattern)
for worker_id in 0..storage_worker_count {
let provider = view.provider_ro()?;
let tx = provider.into_tx();
let proof_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id);
@@ -133,8 +229,36 @@ where
});
}
// Spawn account workers upfront
for worker_id in 0..account_worker_count {
let provider = view.provider_ro()?;
let tx = provider.into_tx();
let proof_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id + storage_worker_count);
let account_work_rx = account_work_rx.clone();
executor.spawn_blocking(move || {
debug!(target: "trie::proof_pool", worker_id, "Account proof worker started");
// Worker loop - reuse transaction for entire block
loop {
let AccountProofJob { input } = match account_work_rx.recv() {
Ok(item) => item,
Err(_) => break, /* Channel closed, shutdown
* TODO: should we handle this error? */
};
// TODO: When would this be executed? also fix blocking
// Execute account multiproof (blocks on storage proofs)
execute_account_multiproof_worker(input, &proof_tx);
}
debug!(target: "trie::proof_pool", worker_id, "Account proof worker shutdown");
});
}
Ok(Self {
storage_work_tx,
account_work_tx,
max_concurrency,
total_transactions: 0,
view,
@@ -208,10 +332,11 @@ where
ProofTaskKind::BlindedStorageNode(account, path, sender) => {
proof_task_tx.blinded_storage_node(account, path, sender, tx_sender);
}
ProofTaskKind::AccountMultiproof(_input) => {
// Phase 1b: Account multiproof logic not yet activated
trace!(target: "trie::proof_task", "AccountMultiproof task received but not yet implemented");
// Return transaction to pool to prevent leak
ProofTaskKind::AccountMultiproof(_) => {
// AccountMultiproof should never go through spawn-per-task path
// It should be dispatched to account worker pool in run() loop
debug!(target: "trie::proof_task", "AccountMultiproof in spawn path - this should not happen");
// Return transaction to pool
let _ = tx_sender.send(ProofTaskMessage::Transaction(proof_task_tx));
}
});
@@ -254,9 +379,15 @@ where
// Queue blinded node task for spawn-per-task
self.queue_proof_task(task);
}
ProofTaskKind::AccountMultiproof(_input) => {
// Phase 1b: Account multiproof logic not yet activated
trace!(target: "trie::proof_task", "AccountMultiproof task received in run loop but not yet implemented");
ProofTaskKind::AccountMultiproof(input) => {
// Dispatch to account worker pool
if self
.account_work_tx
.send(AccountProofJob { input: *input })
.is_err()
{
trace!(target: "trie::proof_task", "Account worker pool shut down");
}
}
}
}
@@ -269,8 +400,9 @@ where
#[cfg(feature = "metrics")]
self.metrics.record();
// Shutdown: drop work_tx, workers will exit when channel closes
// Shutdown: drop work channels, workers will exit when channels close
drop(self.storage_work_tx);
drop(self.account_work_tx);
return Ok(());
}
},
@@ -279,6 +411,7 @@ where
Err(_) => {
// Shutdown workers
drop(self.storage_work_tx);
drop(self.account_work_tx);
return Ok(());
}
};
@@ -789,12 +922,12 @@ mod tests {
#[test]
fn proof_task_manager_new_propagates_consistent_view_error() {
let factory = create_test_provider_factory();
let view = ConsistentDbView::new(factory.clone(), Some((B256::repeat_byte(0x11), 0)));
let view = ConsistentDbView::new(factory, Some((B256::repeat_byte(0x11), 0)));
let rt = Runtime::new().unwrap();
let task_ctx = default_task_ctx();
let err = ProofTaskManager::new(rt.handle().clone(), view, task_ctx, 1, 1).unwrap_err();
let err = ProofTaskManager::new(rt.handle().clone(), view, task_ctx, 1, 0, 1).unwrap_err();
assert!(matches!(
err,
@@ -807,13 +940,13 @@ mod tests {
let inner_factory = create_test_provider_factory();
let calls = Arc::new(AtomicUsize::new(0));
let counting_factory = CountingFactory::new(inner_factory, Arc::clone(&calls));
let view = ConsistentDbView::new(counting_factory.clone(), None);
let view = ConsistentDbView::new(counting_factory, None);
let rt = Runtime::new().unwrap();
let task_ctx = default_task_ctx();
let num_workers = 2usize;
let manager =
ProofTaskManager::new(rt.handle().clone(), view, task_ctx, num_workers, 4).unwrap();
ProofTaskManager::new(rt.handle().clone(), view, task_ctx, num_workers, 0, 4).unwrap();
assert_eq!(calls.load(Ordering::SeqCst), num_workers);