mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
refactor: simplify MultiProofConfig and related structures
- Removed the generic Factory type from MultiProofConfig and related structs, streamlining their definitions and improving code clarity. - Updated methods to reflect the removal of the Factory type, enhancing maintainability. - Adjusted the implementation of PendingMultiproofTask and its associated methods to eliminate unnecessary type parameters, simplifying the codebase.
This commit is contained in:
@@ -192,8 +192,7 @@ where
|
||||
{
|
||||
let (to_sparse_trie, sparse_trie_rx) = channel();
|
||||
// spawn multiproof task, save the trie input
|
||||
let (trie_input, state_root_config) =
|
||||
MultiProofConfig::new_from_input(consistent_view, trie_input);
|
||||
let (trie_input, state_root_config) = MultiProofConfig::from_input(trie_input);
|
||||
self.trie_input = Some(trie_input);
|
||||
|
||||
// Create and spawn the storage proof task
|
||||
@@ -207,7 +206,7 @@ where
|
||||
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
|
||||
let proof_task = match ProofTaskManager::new(
|
||||
self.executor.handle().clone(),
|
||||
state_root_config.consistent_view.clone(),
|
||||
consistent_view.clone(),
|
||||
task_ctx,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
|
||||
@@ -12,7 +12,6 @@ use derive_more::derive::Deref;
|
||||
use metrics::Histogram;
|
||||
use reth_errors::ProviderError;
|
||||
use reth_metrics::Metrics;
|
||||
use reth_provider::{providers::ConsistentDbView, BlockReader, DatabaseProviderFactory};
|
||||
use reth_revm::state::EvmState;
|
||||
use reth_trie::{
|
||||
added_removed_keys::MultiAddedRemovedKeys, prefix_set::TriePrefixSetsMut,
|
||||
@@ -66,9 +65,7 @@ impl SparseTrieUpdate {
|
||||
|
||||
/// Common configuration for multi proof tasks
|
||||
#[derive(Debug, Clone)]
|
||||
pub(super) struct MultiProofConfig<Factory> {
|
||||
/// View over the state in the database.
|
||||
pub consistent_view: ConsistentDbView<Factory>,
|
||||
pub(super) struct MultiProofConfig {
|
||||
/// The sorted collection of cached in-memory intermediate trie nodes that
|
||||
/// can be reused for computation.
|
||||
pub nodes_sorted: Arc<TrieUpdatesSorted>,
|
||||
@@ -80,17 +77,13 @@ pub(super) struct MultiProofConfig<Factory> {
|
||||
pub prefix_sets: Arc<TriePrefixSetsMut>,
|
||||
}
|
||||
|
||||
impl<Factory> MultiProofConfig<Factory> {
|
||||
/// Creates a new state root config from the consistent view and the trie input.
|
||||
impl MultiProofConfig {
|
||||
/// Creates a new state root config from the trie input.
|
||||
///
|
||||
/// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the
|
||||
/// [`TrieInput`].
|
||||
pub(super) fn new_from_input(
|
||||
consistent_view: ConsistentDbView<Factory>,
|
||||
mut input: TrieInput,
|
||||
) -> (TrieInput, Self) {
|
||||
pub(super) fn from_input(mut input: TrieInput) -> (TrieInput, Self) {
|
||||
let config = Self {
|
||||
consistent_view,
|
||||
nodes_sorted: Arc::new(input.nodes.drain_into_sorted()),
|
||||
state_sorted: Arc::new(input.state.drain_into_sorted()),
|
||||
prefix_sets: Arc::new(input.prefix_sets.clone()),
|
||||
@@ -249,14 +242,14 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat
|
||||
|
||||
/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`].
|
||||
#[derive(Debug)]
|
||||
enum PendingMultiproofTask<Factory> {
|
||||
enum PendingMultiproofTask {
|
||||
/// A storage multiproof task input.
|
||||
Storage(StorageMultiproofInput<Factory>),
|
||||
Storage(StorageMultiproofInput),
|
||||
/// A regular multiproof task input.
|
||||
Regular(MultiproofInput<Factory>),
|
||||
Regular(MultiproofInput),
|
||||
}
|
||||
|
||||
impl<Factory> PendingMultiproofTask<Factory> {
|
||||
impl PendingMultiproofTask {
|
||||
/// Returns the proof sequence number of the task.
|
||||
const fn proof_sequence_number(&self) -> u64 {
|
||||
match self {
|
||||
@@ -282,22 +275,22 @@ impl<Factory> PendingMultiproofTask<Factory> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Factory> From<StorageMultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
|
||||
fn from(input: StorageMultiproofInput<Factory>) -> Self {
|
||||
impl From<StorageMultiproofInput> for PendingMultiproofTask {
|
||||
fn from(input: StorageMultiproofInput) -> Self {
|
||||
Self::Storage(input)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Factory> From<MultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
|
||||
fn from(input: MultiproofInput<Factory>) -> Self {
|
||||
impl From<MultiproofInput> for PendingMultiproofTask {
|
||||
fn from(input: MultiproofInput) -> Self {
|
||||
Self::Regular(input)
|
||||
}
|
||||
}
|
||||
|
||||
/// Input parameters for spawning a dedicated storage multiproof calculation.
|
||||
#[derive(Debug)]
|
||||
struct StorageMultiproofInput<Factory> {
|
||||
config: MultiProofConfig<Factory>,
|
||||
struct StorageMultiproofInput {
|
||||
config: MultiProofConfig,
|
||||
source: Option<StateChangeSource>,
|
||||
hashed_state_update: HashedPostState,
|
||||
hashed_address: B256,
|
||||
@@ -307,7 +300,7 @@ struct StorageMultiproofInput<Factory> {
|
||||
multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
|
||||
}
|
||||
|
||||
impl<Factory> StorageMultiproofInput<Factory> {
|
||||
impl StorageMultiproofInput {
|
||||
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
|
||||
fn send_empty_proof(self) {
|
||||
let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
|
||||
@@ -319,8 +312,8 @@ impl<Factory> StorageMultiproofInput<Factory> {
|
||||
|
||||
/// Input parameters for spawning a multiproof calculation.
|
||||
#[derive(Debug)]
|
||||
struct MultiproofInput<Factory> {
|
||||
config: MultiProofConfig<Factory>,
|
||||
struct MultiproofInput {
|
||||
config: MultiProofConfig,
|
||||
source: Option<StateChangeSource>,
|
||||
hashed_state_update: HashedPostState,
|
||||
proof_targets: MultiProofTargets,
|
||||
@@ -329,7 +322,7 @@ struct MultiproofInput<Factory> {
|
||||
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
|
||||
}
|
||||
|
||||
impl<Factory> MultiproofInput<Factory> {
|
||||
impl MultiproofInput {
|
||||
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
|
||||
fn send_empty_proof(self) {
|
||||
let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
|
||||
@@ -344,13 +337,13 @@ impl<Factory> MultiproofInput<Factory> {
|
||||
/// concurrency, further calculation requests are queued and spawn later, after
|
||||
/// availability has been signaled.
|
||||
#[derive(Debug)]
|
||||
pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
|
||||
pub struct MultiproofManager {
|
||||
/// Maximum number of concurrent calculations.
|
||||
max_concurrent: usize,
|
||||
/// Currently running calculations.
|
||||
inflight: usize,
|
||||
/// Queued calculations.
|
||||
pending: VecDeque<PendingMultiproofTask<Factory>>,
|
||||
pending: VecDeque<PendingMultiproofTask>,
|
||||
/// Executor for tasks
|
||||
executor: WorkloadExecutor,
|
||||
/// Handle to the proof task manager used for creating `ParallelProof` instances for storage
|
||||
@@ -374,10 +367,7 @@ pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
|
||||
metrics: MultiProofTaskMetrics,
|
||||
}
|
||||
|
||||
impl<Factory> MultiproofManager<Factory>
|
||||
where
|
||||
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
|
||||
{
|
||||
impl MultiproofManager {
|
||||
/// Creates a new [`MultiproofManager`].
|
||||
fn new(
|
||||
executor: WorkloadExecutor,
|
||||
@@ -404,7 +394,7 @@ where
|
||||
|
||||
/// Spawns a new multiproof calculation or enqueues it for later if
|
||||
/// `max_concurrent` are already inflight.
|
||||
fn spawn_or_queue(&mut self, input: PendingMultiproofTask<Factory>) {
|
||||
fn spawn_or_queue(&mut self, input: PendingMultiproofTask) {
|
||||
// If there are no proof targets, we can just send an empty multiproof back immediately
|
||||
if input.proof_targets_is_empty() {
|
||||
debug!(
|
||||
@@ -438,7 +428,7 @@ where
|
||||
|
||||
/// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage
|
||||
/// multiproof, and dispatching to `spawn_multiproof` otherwise.
|
||||
fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask<Factory>) {
|
||||
fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask) {
|
||||
match input {
|
||||
PendingMultiproofTask::Storage(storage_input) => {
|
||||
self.spawn_storage_proof(storage_input);
|
||||
@@ -450,7 +440,7 @@ where
|
||||
}
|
||||
|
||||
/// Spawns a single storage proof calculation task.
|
||||
fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput<Factory>) {
|
||||
fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput) {
|
||||
let StorageMultiproofInput {
|
||||
config,
|
||||
source,
|
||||
@@ -476,7 +466,7 @@ where
|
||||
"Starting dedicated storage proof calculation",
|
||||
);
|
||||
let start = Instant::now();
|
||||
let proof_result = ParallelProof::<Factory>::new(
|
||||
let proof_result = ParallelProof::new(
|
||||
config.nodes_sorted,
|
||||
config.state_sorted,
|
||||
config.prefix_sets,
|
||||
@@ -524,7 +514,7 @@ where
|
||||
}
|
||||
|
||||
/// Spawns a single multiproof calculation task.
|
||||
fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput<Factory>) {
|
||||
fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput) {
|
||||
let MultiproofInput {
|
||||
config,
|
||||
source,
|
||||
@@ -554,10 +544,8 @@ where
|
||||
let start = Instant::now();
|
||||
|
||||
// Extend prefix sets with targets
|
||||
let frozen_prefix_sets = ParallelProof::<Factory>::extend_prefix_sets_with_targets(
|
||||
&config.prefix_sets,
|
||||
&proof_targets,
|
||||
);
|
||||
let frozen_prefix_sets =
|
||||
ParallelProof::extend_prefix_sets_with_targets(&config.prefix_sets, &proof_targets);
|
||||
|
||||
// Queue account multiproof to worker pool
|
||||
let input = AccountMultiproofInput {
|
||||
@@ -675,13 +663,13 @@ pub(crate) struct MultiProofTaskMetrics {
|
||||
/// Then it updates relevant leaves according to the result of the transaction.
|
||||
/// This feeds updates to the sparse trie task.
|
||||
#[derive(Debug)]
|
||||
pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
|
||||
pub(super) struct MultiProofTask {
|
||||
/// The size of proof targets chunk to spawn in one calculation.
|
||||
///
|
||||
/// If [`None`], then chunking is disabled.
|
||||
chunk_size: Option<usize>,
|
||||
/// Task configuration.
|
||||
config: MultiProofConfig<Factory>,
|
||||
config: MultiProofConfig,
|
||||
/// Receiver for state root related messages.
|
||||
rx: Receiver<MultiProofMessage>,
|
||||
/// Sender for state root related messages.
|
||||
@@ -695,18 +683,15 @@ pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
|
||||
/// Proof sequencing handler.
|
||||
proof_sequencer: ProofSequencer,
|
||||
/// Manages calculation of multiproofs.
|
||||
multiproof_manager: MultiproofManager<Factory>,
|
||||
multiproof_manager: MultiproofManager,
|
||||
/// multi proof task metrics
|
||||
metrics: MultiProofTaskMetrics,
|
||||
}
|
||||
|
||||
impl<Factory> MultiProofTask<Factory>
|
||||
where
|
||||
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
|
||||
{
|
||||
impl MultiProofTask {
|
||||
/// Creates a new multi proof task with the unified message channel
|
||||
pub(super) fn new(
|
||||
config: MultiProofConfig<Factory>,
|
||||
config: MultiProofConfig,
|
||||
executor: WorkloadExecutor,
|
||||
proof_task_handle: ProofTaskManagerHandle,
|
||||
to_sparse_trie: Sender<SparseTrieUpdate>,
|
||||
@@ -1233,43 +1218,29 @@ fn get_proof_targets(
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::map::B256Set;
|
||||
use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
|
||||
use reth_provider::{
|
||||
providers::ConsistentDbView, test_utils::create_test_provider_factory, BlockReader,
|
||||
DatabaseProviderFactory,
|
||||
};
|
||||
use reth_trie::{MultiProof, TrieInput};
|
||||
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
|
||||
use revm_primitives::{B256, U256};
|
||||
use std::sync::Arc;
|
||||
|
||||
fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
|
||||
where
|
||||
F: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
|
||||
{
|
||||
let consistent_view = ConsistentDbView::new(factory, None);
|
||||
let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
|
||||
let state_sorted = Arc::new(input.state.clone().into_sorted());
|
||||
let prefix_sets = Arc::new(input.prefix_sets);
|
||||
|
||||
MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
|
||||
}
|
||||
|
||||
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
|
||||
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
|
||||
where
|
||||
F: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
|
||||
{
|
||||
let executor = WorkloadExecutor::default();
|
||||
let config = create_state_root_config(factory, TrieInput::default());
|
||||
let (_trie_input, config) = MultiProofConfig::from_input(TrieInput::default());
|
||||
let task_ctx = ProofTaskCtx::new(
|
||||
config.nodes_sorted.clone(),
|
||||
config.state_sorted.clone(),
|
||||
config.prefix_sets.clone(),
|
||||
);
|
||||
let proof_task = ProofTaskManager::new(
|
||||
executor.handle().clone(),
|
||||
config.consistent_view.clone(),
|
||||
task_ctx,
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.expect("Failed to create ProofTaskManager");
|
||||
let consistent_view = ConsistentDbView::new(factory, None);
|
||||
let proof_task =
|
||||
ProofTaskManager::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1)
|
||||
.expect("Failed to create ProofTaskManager");
|
||||
let channel = channel();
|
||||
|
||||
MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)
|
||||
|
||||
Reference in New Issue
Block a user