feat(engine): account proof task integration

- Added `account_proof_task_handle` to `MultiproofManager` for managing account multiproof tasks.
- Updated multiproof calculation logic to queue tasks to the account proof manager.
- Improved error handling for account manager task failures.
- Refactored test setup to create separate storage and account proof managers.
This commit is contained in:
Yong Kang
2025-10-02 04:44:43 +00:00
parent 0b8e8dc597
commit 7333845a55

View File

@@ -19,7 +19,11 @@ use reth_trie::{
updates::TrieUpdatesSorted, DecodedMultiProof, HashedPostState, HashedPostStateSorted,
HashedStorage, MultiProofTargets, TrieInput,
};
use reth_trie_parallel::{proof::ParallelProof, proof_task::ProofTaskManagerHandle};
use reth_trie_parallel::{
proof::ParallelProof,
proof_task::{AccountMultiproofInput, ProofTaskKind, ProofTaskManagerHandle},
root::ParallelStateRootError,
};
use std::{
collections::{BTreeMap, VecDeque},
ops::DerefMut,
@@ -349,7 +353,9 @@ pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
pending: VecDeque<PendingMultiproofTask<Factory>>,
/// Executor for tasks
executor: WorkloadExecutor,
/// Sender to the storage proof task.
/// Sender to the account proof task manager.
account_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
/// Sender to the storage proof task manager.
storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
/// Cached storage proof roots for missed leaves; this maps
/// hashed (missed) addresses to their storage proof roots.
@@ -375,6 +381,7 @@ where
fn new(
executor: WorkloadExecutor,
metrics: MultiProofTaskMetrics,
account_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
max_concurrent: usize,
) -> Self {
@@ -384,6 +391,7 @@ where
executor,
inflight: 0,
metrics,
account_proof_task_handle,
storage_proof_task_handle,
missed_leaves_storage_roots: Default::default(),
}
@@ -526,48 +534,60 @@ where
state_root_message_sender,
multi_added_removed_keys,
} = multiproof_input;
let storage_proof_task_handle = self.storage_proof_task_handle.clone();
let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
let account_targets = proof_targets.len();
let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
trace!(
target: "engine::root",
proof_sequence_number,
?proof_targets,
account_targets,
storage_targets,
?source,
"Starting multiproof calculation",
);
// Create channel for receiving multiproof result from account manager
let (result_tx, result_rx) = std::sync::mpsc::channel();
// Queue account multiproof to account manager
let account_input = AccountMultiproofInput::new(
proof_targets,
Arc::unwrap_or_clone(config.prefix_sets).freeze(),
true, // collect_branch_node_masks
multi_added_removed_keys,
self.storage_proof_task_handle.clone(),
result_tx,
);
if self
.account_proof_task_handle
.queue_task(ProofTaskKind::AccountMultiproof(Box::new(account_input)))
.is_err()
{
let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculationError(
ParallelStateRootError::Other("account manager closed".into()).into(),
));
return;
}
// Spawn receiver task to handle the result
self.executor.spawn_blocking(move || {
let account_targets = proof_targets.len();
let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
trace!(
target: "engine::root",
proof_sequence_number,
?proof_targets,
account_targets,
storage_targets,
?source,
"Starting multiproof calculation",
);
let start = Instant::now();
let proof_result = ParallelProof::new(
config.consistent_view,
config.nodes_sorted,
config.state_sorted,
config.prefix_sets,
missed_leaves_storage_roots,
storage_proof_task_handle.clone(),
)
.with_branch_node_masks(true)
.with_multi_added_removed_keys(multi_added_removed_keys)
.decoded_multiproof(proof_targets);
let elapsed = start.elapsed();
trace!(
target: "engine::root",
proof_sequence_number,
?elapsed,
?source,
account_targets,
storage_targets,
"Multiproof calculated",
);
match result_rx.recv() {
Ok(Ok(proof)) => {
let elapsed = start.elapsed();
trace!(
target: "engine::root",
proof_sequence_number,
?elapsed,
?source,
account_targets,
storage_targets,
"Multiproof calculated",
);
match proof_result {
Ok(proof) => {
let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
Box::new(ProofCalculated {
sequence_number: proof_sequence_number,
@@ -579,10 +599,16 @@ where
}),
));
}
Err(error) => {
Ok(Err(error)) => {
let _ = state_root_message_sender
.send(MultiProofMessage::ProofCalculationError(error.into()));
}
Err(_) => {
let _ =
state_root_message_sender.send(MultiProofMessage::ProofCalculationError(
ParallelStateRootError::Other("account manager closed".into()).into(),
));
}
}
});
@@ -678,7 +704,8 @@ where
pub(super) fn new(
config: MultiProofConfig<Factory>,
executor: WorkloadExecutor,
proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
account_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
to_sparse_trie: Sender<SparseTrieUpdate>,
max_concurrency: usize,
chunk_size: Option<usize>,
@@ -698,7 +725,8 @@ where
multiproof_manager: MultiproofManager::new(
executor,
metrics.clone(),
proof_task_handle,
account_proof_task_handle,
storage_proof_task_handle,
max_concurrency,
),
metrics,
@@ -1231,17 +1259,36 @@ mod tests {
config.state_sorted.clone(),
config.prefix_sets.clone(),
);
let proof_task = ProofTaskManager::new(
// Create both storage and account managers for testing
let storage_manager = ProofTaskManager::new(
executor.handle().clone(),
config.consistent_view.clone(),
task_ctx.clone(),
1, // storage_worker_count
0, // account_worker_count
1, // max_concurrency
)
.unwrap();
let account_manager = ProofTaskManager::new(
executor.handle().clone(),
config.consistent_view.clone(),
task_ctx,
1, // num_workers
0, // storage_worker_count
1, // account_worker_count
1, // max_concurrency
)
.unwrap();
let channel = channel();
MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)
MultiProofTask::new(
config,
executor,
account_manager.handle(),
storage_manager.handle(),
channel.0,
1,
None,
)
}
#[test]