mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
feat(trie): add generics to SparseTrieTask (#17269)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
This commit is contained in:
@@ -28,7 +28,7 @@ use reth_trie_parallel::{
|
||||
proof_task::{ProofTaskCtx, ProofTaskManager},
|
||||
root::ParallelStateRootError,
|
||||
};
|
||||
use reth_trie_sparse::SparseTrie;
|
||||
use reth_trie_sparse::{SerialSparseTrie, SparseTrie};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{
|
||||
@@ -199,13 +199,14 @@ where
|
||||
// take the sparse trie if it was set
|
||||
let sparse_trie = self.sparse_trie.take();
|
||||
|
||||
let mut sparse_trie_task = SparseTrieTask::new_with_stored_trie(
|
||||
self.executor.clone(),
|
||||
sparse_trie_rx,
|
||||
proof_task.handle(),
|
||||
self.trie_metrics.clone(),
|
||||
sparse_trie,
|
||||
);
|
||||
let mut sparse_trie_task =
|
||||
SparseTrieTask::<_, SerialSparseTrie, SerialSparseTrie>::new_with_stored_trie(
|
||||
self.executor.clone(),
|
||||
sparse_trie_rx,
|
||||
proof_task.handle(),
|
||||
self.trie_metrics.clone(),
|
||||
sparse_trie,
|
||||
);
|
||||
|
||||
// wire the sparse trie to the state root response receiver
|
||||
let (state_root_tx, state_root_rx) = channel();
|
||||
|
||||
@@ -11,7 +11,7 @@ use reth_trie_parallel::root::ParallelStateRootError;
|
||||
use reth_trie_sparse::{
|
||||
blinded::{BlindedProvider, BlindedProviderFactory},
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind},
|
||||
SparseStateTrie, SparseTrie,
|
||||
SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieInterface,
|
||||
};
|
||||
use std::{
|
||||
sync::mpsc,
|
||||
@@ -20,7 +20,7 @@ use std::{
|
||||
use tracing::{debug, trace, trace_span};
|
||||
|
||||
/// A task responsible for populating the sparse trie.
|
||||
pub(super) struct SparseTrieTask<BPF>
|
||||
pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
|
||||
where
|
||||
BPF: BlindedProviderFactory + Send + Sync,
|
||||
BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
|
||||
@@ -34,17 +34,19 @@ where
|
||||
/// Sparse Trie initialized with the blinded provider factory.
|
||||
///
|
||||
/// It's kept as a field on the struct to prevent blocking on de-allocation in [`Self::run`].
|
||||
pub(super) trie: SparseStateTrie,
|
||||
pub(super) trie: SparseStateTrie<A, S>,
|
||||
pub(super) metrics: MultiProofTaskMetrics,
|
||||
/// Blinded node provider factory.
|
||||
blinded_provider_factory: BPF,
|
||||
}
|
||||
|
||||
impl<BPF> SparseTrieTask<BPF>
|
||||
impl<BPF, A, S> SparseTrieTask<BPF, A, S>
|
||||
where
|
||||
BPF: BlindedProviderFactory + Send + Sync + Clone,
|
||||
BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
|
||||
A: SparseTrieInterface + Send + Sync + Default,
|
||||
S: SparseTrieInterface + Send + Sync + Default,
|
||||
{
|
||||
/// Creates a new sparse trie task.
|
||||
pub(super) fn new(
|
||||
@@ -69,7 +71,7 @@ where
|
||||
updates: mpsc::Receiver<SparseTrieUpdate>,
|
||||
blinded_provider_factory: BPF,
|
||||
trie_metrics: MultiProofTaskMetrics,
|
||||
sparse_trie: Option<SparseTrie>,
|
||||
sparse_trie: Option<SparseTrie<A>>,
|
||||
) -> Self {
|
||||
if let Some(sparse_trie) = sparse_trie {
|
||||
Self::with_accounts_trie(
|
||||
@@ -91,7 +93,7 @@ where
|
||||
updates: mpsc::Receiver<SparseTrieUpdate>,
|
||||
blinded_provider_factory: BPF,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
sparse_trie: SparseTrie,
|
||||
sparse_trie: SparseTrie<A>,
|
||||
) -> Self {
|
||||
debug_assert!(sparse_trie.is_blind());
|
||||
let trie = SparseStateTrie::new().with_updates(true).with_accounts_trie(sparse_trie);
|
||||
@@ -106,7 +108,7 @@ where
|
||||
///
|
||||
/// NOTE: This function does not take `self` by value to prevent blocking on [`SparseStateTrie`]
|
||||
/// drop.
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome<A>, ParallelStateRootError> {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut num_iterations = 0;
|
||||
@@ -159,18 +161,18 @@ where
|
||||
/// Outcome of the state root computation, including the state root itself with
|
||||
/// the trie updates.
|
||||
#[derive(Debug)]
|
||||
pub struct StateRootComputeOutcome {
|
||||
pub struct StateRootComputeOutcome<A = SerialSparseTrie> {
|
||||
/// The state root.
|
||||
pub state_root: B256,
|
||||
/// The trie updates.
|
||||
pub trie_updates: TrieUpdates,
|
||||
/// The account state trie.
|
||||
pub trie: SparseTrie,
|
||||
pub trie: SparseTrie<A>,
|
||||
}
|
||||
|
||||
/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
|
||||
pub(crate) fn update_sparse_trie<BPF>(
|
||||
trie: &mut SparseStateTrie,
|
||||
pub(crate) fn update_sparse_trie<BPF, A, S>(
|
||||
trie: &mut SparseStateTrie<A, S>,
|
||||
SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
|
||||
blinded_provider_factory: &BPF,
|
||||
) -> SparseStateTrieResult<Duration>
|
||||
@@ -178,6 +180,8 @@ where
|
||||
BPF: BlindedProviderFactory + Send + Sync,
|
||||
BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
|
||||
A: SparseTrieInterface + Send + Sync + Default,
|
||||
S: SparseTrieInterface + Send + Sync + Default,
|
||||
{
|
||||
trace!(target: "engine::root::sparse", "Updating sparse trie");
|
||||
let started_at = Instant::now();
|
||||
|
||||
Reference in New Issue
Block a user