perf(engine): drop sparse trie after task returned result (#16000)

This commit is contained in:
Alexey Shekhirin
2025-04-30 23:23:19 +01:00
committed by GitHub
parent b93f4c6080
commit b2b1a3c68c
2 changed files with 21 additions and 9 deletions

View File

@@ -171,7 +171,7 @@ where
multi_proof_task.run();
});
let sparse_trie_task = SparseTrieTask::new(
let mut sparse_trie_task = SparseTrieTask::new(
self.executor.clone(),
sparse_trie_rx,
proof_task.handle(),

View File

@@ -24,13 +24,21 @@ use tracing::{debug, trace, trace_span};
const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;
/// A task responsible for populating the sparse trie.
pub(super) struct SparseTrieTask<BPF> {
pub(super) struct SparseTrieTask<BPF>
where
BPF: BlindedProviderFactory + Send + Sync,
BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
{
/// Executor used to spawn subtasks.
#[expect(unused)] // TODO use this for spawning trie tasks
pub(super) executor: WorkloadExecutor,
/// Receives updates from the state root task.
pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
pub(super) blinded_provider_factory: BPF,
/// Sparse Trie initialized with the blinded provider factory.
///
/// It's kept as a field on the struct to prevent blocking on de-allocation in [`Self::run`].
pub(super) trie: SparseStateTrie<BPF>,
pub(super) metrics: MultiProofTaskMetrics,
}
@@ -41,13 +49,18 @@ where
BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
{
/// Creates a new sparse trie task.
pub(super) const fn new(
pub(super) fn new(
executor: WorkloadExecutor,
updates: mpsc::Receiver<SparseTrieUpdate>,
blinded_provider_factory: BPF,
metrics: MultiProofTaskMetrics,
) -> Self {
Self { executor, updates, blinded_provider_factory, metrics }
Self {
executor,
updates,
metrics,
trie: SparseStateTrie::new(blinded_provider_factory).with_updates(true),
}
}
/// Runs the sparse trie task to completion.
@@ -58,11 +71,10 @@ where
///
/// NOTE: This function does not take `self` by value to prevent blocking on [`SparseStateTrie`]
/// drop.
pub(super) fn run(&self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
let mut num_iterations = 0;
let mut trie = SparseStateTrie::new(&self.blinded_provider_factory).with_updates(true);
while let Ok(mut update) = self.updates.recv() {
num_iterations += 1;
@@ -80,7 +92,7 @@ where
"Updating sparse trie"
);
let elapsed = update_sparse_trie(&mut trie, update).map_err(|e| {
let elapsed = update_sparse_trie(&mut self.trie, update).map_err(|e| {
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
})?;
self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
@@ -90,7 +102,7 @@ where
debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
let start = Instant::now();
let (state_root, trie_updates) = trie.root_with_updates().map_err(|e| {
let (state_root, trie_updates) = self.trie.root_with_updates().map_err(|e| {
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
})?;