refactor(tree): remove root.rs (#14778)

This commit is contained in:
Alexey Shekhirin
2025-02-28 16:05:30 +00:00
committed by GitHub
parent bffdda4312
commit ba2797c8be
9 changed files with 87 additions and 1347 deletions

View File

@@ -8,13 +8,19 @@ use alloy_primitives::{Address, B256};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use proptest::test_runner::TestRunner;
use rand::Rng;
use reth_engine_tree::tree::root::{StateRootConfig, StateRootTask};
use reth_chain_state::EthPrimitives;
use reth_chainspec::ChainSpec;
use reth_db_common::init::init_genesis;
use reth_engine_tree::tree::{
executor::WorkloadExecutor, PayloadProcessor, StateProviderBuilder, TreeConfig,
};
use reth_evm::system_calls::{OnStateHook, StateChangeSource};
use reth_evm_ethereum::EthEvmConfig;
use reth_primitives_traits::{Account as RethAccount, StorageEntry};
use reth_provider::{
providers::ConsistentDbView,
test_utils::{create_test_provider_factory, MockNodeTypesWithDB},
AccountReader, HashingWriter, ProviderFactory,
providers::{BlockchainProvider, ConsistentDbView},
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
AccountReader, ChainSpecProvider, HashingWriter, ProviderFactory,
};
use reth_trie::TrieInput;
use revm_primitives::{HashMap, U256};
@@ -197,40 +203,40 @@ fn bench_state_root(c: &mut Criterion) {
|b, params| {
b.iter_with_setup(
|| {
let factory = create_test_provider_factory();
let factory = create_test_provider_factory_with_chain_spec(Arc::new(
ChainSpec::default(),
));
let genesis_hash = init_genesis(&factory).unwrap();
let state_updates = create_bench_state_updates(params);
setup_provider(&factory, &state_updates).expect("failed to setup provider");
let trie_input = TrieInput::from_state(Default::default());
let config = StateRootConfig::new_from_input(
ConsistentDbView::new(factory, None),
trie_input,
let payload_processor = PayloadProcessor::<EthPrimitives, _>::new(
WorkloadExecutor::default(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
);
let num_threads = std::thread::available_parallelism()
.map_or(1, |num| (num.get() / 2).max(1));
let provider = BlockchainProvider::new(factory).unwrap();
let state_root_task_pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("proof-worker-{}", i))
.build()
.expect("Failed to create proof worker thread pool"),
);
(config, state_updates, state_root_task_pool)
(genesis_hash, payload_processor, provider, state_updates)
},
|(config, state_updates, state_root_task_pool)| {
|(genesis_hash, payload_processor, provider, state_updates)| {
black_box({
let task = StateRootTask::new(config, state_root_task_pool);
let mut hook = task.state_hook();
let handle = task.spawn();
let mut handle = payload_processor.spawn(
Default::default(),
Default::default(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
);
let mut state_hook = handle.state_hook();
for (i, update) in state_updates.into_iter().enumerate() {
hook.on_state(StateChangeSource::Transaction(i), &update)
state_hook.on_state(StateChangeSource::Transaction(i), &update);
}
drop(hook);
drop(state_hook);
handle.wait_for_result().expect("task failed")
handle.state_root().expect("task failed")
});
},
)

View File

@@ -1,6 +1,6 @@
//! Engine tree configuration.
use crate::tree::root::has_enough_parallelism;
use crate::tree::payload_processor::executor::has_enough_parallelism;
use alloy_eips::merge::EPOCH_SLOTS;
/// The largest gap for which the tree will be used for sync. See docs for `pipeline_run_threshold`

View File

@@ -3,7 +3,9 @@ use crate::{
chain::FromOrchestrator,
engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
persistence::PersistenceHandle,
tree::{cached_state::CachedStateProvider, metrics::EngineApiMetrics},
tree::{
cached_state::CachedStateProvider, executor::WorkloadExecutor, metrics::EngineApiMetrics,
},
};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
@@ -70,20 +72,16 @@ mod invalid_headers;
mod metrics;
mod payload_processor;
mod persistence_state;
pub mod root;
// TODO(alexey): compare trie updates in `insert_block_inner`
#[allow(unused)]
mod trie_updates;
use crate::tree::{
config::MIN_BLOCKS_FOR_PIPELINE_RUN,
error::AdvancePersistenceError,
payload_processor::{executor::WorkloadExecutor, PayloadProcessor},
};
use crate::tree::{config::MIN_BLOCKS_FOR_PIPELINE_RUN, error::AdvancePersistenceError};
pub use block_buffer::BlockBuffer;
pub use config::TreeConfig;
pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
pub use invalid_headers::InvalidHeaderCache;
pub use payload_processor::*;
pub use persistence_state::PersistenceState;
/// Keeps track of the state of the tree.
@@ -441,7 +439,7 @@ pub struct StateProviderBuilder<N: NodePrimitives, P> {
impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
/// Creates a new state provider from the provider factory, historical block hash and optional
/// overlaid blocks.
fn new(
pub fn new(
provider_factory: P,
historical: B256,
overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
@@ -663,7 +661,7 @@ where
let (incoming_tx, incoming) = std::sync::mpsc::channel();
let payload_processor =
PayloadProcessor::new(WorkloadExecutor::new(), evm_config.clone(), &config);
PayloadProcessor::new(WorkloadExecutor::default(), evm_config.clone(), &config);
Self {
provider,

View File

@@ -1,3 +1,5 @@
//! Executor for mixed I/O and CPU workloads.
use rayon::ThreadPool as RayonPool;
use std::sync::{Arc, OnceLock};
use tokio::{
@@ -11,16 +13,17 @@ use tokio::{
///
/// It will reuse an existing tokio runtime if available or create its own.
#[derive(Debug, Clone)]
pub(crate) struct WorkloadExecutor {
pub struct WorkloadExecutor {
inner: WorkloadExecutorInner,
}
impl WorkloadExecutor {
/// Creates a new instance with default settings.
pub(crate) fn new() -> Self {
impl Default for WorkloadExecutor {
fn default() -> Self {
Self { inner: WorkloadExecutorInner::new(rayon::ThreadPoolBuilder::new().build().unwrap()) }
}
}
impl WorkloadExecutor {
/// Creates a new executor with the given number of threads for cpu bound work (rayon).
#[allow(unused)]
pub(super) fn with_num_cpu_threads(cpu_threads: usize) -> Self {
@@ -69,3 +72,15 @@ impl WorkloadExecutorInner {
Self { handle: get_runtime_handle(), rayon_pool: Arc::new(rayon_pool) }
}
}
/// Determines if the host has enough parallelism to run the payload processor.
///
/// It requires at least 5 parallel threads:
/// - Engine in main thread that spawns the state root task.
/// - Multiproof task in [`super::multiproof::MultiProofTask::run`]
/// - Sparse Trie task in [`super::sparse_trie::SparseTrieTask::run`]
/// - Multiproof computation spawned in [`super::multiproof::MultiproofManager::spawn_multiproof`]
/// - Storage root computation spawned in [`reth_trie_parallel::proof::ParallelProof::multiproof`]
pub(crate) fn has_enough_parallelism() -> bool {
std::thread::available_parallelism().is_ok_and(|num| num.get() >= 5)
}

View File

@@ -3,14 +3,15 @@
use crate::tree::{
cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
payload_processor::{
executor::WorkloadExecutor,
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
sparse_trie::{SparseTrieTask, StateRootComputeOutcome},
sparse_trie::StateRootComputeOutcome,
},
sparse_trie::SparseTrieTask,
StateProviderBuilder, TreeConfig,
};
use alloy_consensus::{transaction::Recovered, BlockHeader};
use alloy_primitives::B256;
use executor::WorkloadExecutor;
use multiproof::*;
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
@@ -35,14 +36,14 @@ use std::{
},
};
pub(crate) mod executor;
pub(crate) mod sparse_trie;
mod multiproof;
mod prewarm;
pub mod executor;
pub mod multiproof;
pub mod prewarm;
pub mod sparse_trie;
/// Entrypoint for executing the payload.
pub(super) struct PayloadProcessor<N, Evm> {
#[derive(Debug, Clone)]
pub struct PayloadProcessor<N, Evm> {
/// The executor used by to spawn tasks.
executor: WorkloadExecutor,
/// The most recent cache used for execution.
@@ -59,7 +60,8 @@ pub(super) struct PayloadProcessor<N, Evm> {
}
impl<N, Evm> PayloadProcessor<N, Evm> {
pub(super) fn new(executor: WorkloadExecutor, evm_config: Evm, config: &TreeConfig) -> Self {
/// Creates a new payload processor.
pub fn new(executor: WorkloadExecutor, evm_config: Evm, config: &TreeConfig) -> Self {
Self {
executor,
execution_cache: Default::default(),
@@ -112,7 +114,7 @@ where
///
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
pub(super) fn spawn<P>(
pub fn spawn<P>(
&self,
header: SealedHeaderFor<N>,
transactions: VecDeque<Recovered<N::SignedTx>>,
@@ -246,7 +248,8 @@ where
}
/// Handle to all the spawned tasks.
pub(super) struct PayloadHandle {
#[derive(Debug)]
pub struct PayloadHandle {
/// Channel for evm state updates
to_multi_proof: Option<Sender<MultiProofMessage>>,
// must include the receiver of the state root wired to the sparse trie
@@ -261,7 +264,7 @@ impl PayloadHandle {
/// # Panics
///
/// If payload processing was started without background tasks.
pub(super) fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
self.state_root
.take()
.expect("state_root is None")
@@ -272,7 +275,7 @@ impl PayloadHandle {
/// Returns a state hook to be used to send state updates to this task.
///
/// If a multiproof task is spawned the hook will notify it about new states.
pub(super) fn state_hook(&self) -> impl OnStateHook {
pub fn state_hook(&self) -> impl OnStateHook {
// convert the channel into a `StateHookSender` that emits an event on drop
let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
@@ -308,6 +311,7 @@ impl PayloadHandle {
}
/// Access to the spawned [`PrewarmCacheTask`].
#[derive(Debug)]
pub(crate) struct CacheTaskHandle {
/// The shared cache the task operates with.
cache: ProviderCaches,
@@ -514,7 +518,7 @@ mod tests {
}
let payload_processor = PayloadProcessor::<EthPrimitives, _>::new(
WorkloadExecutor::new(),
WorkloadExecutor::default(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
);

View File

@@ -1,4 +1,4 @@
//! multi proof task related functionality.
//! Multiproof task related functionality.
use crate::tree::payload_processor::{executor::WorkloadExecutor, sparse_trie::SparseTrieEvent};
use alloy_primitives::map::HashSet;
@@ -31,7 +31,7 @@ use tracing::{debug, error, trace};
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
/// state.
#[derive(Default, Debug)]
pub(crate) struct SparseTrieUpdate {
pub struct SparseTrieUpdate {
/// The state update that was used to calculate the proof
pub(crate) state: HashedPostState,
/// The calculated multiproof
@@ -253,7 +253,7 @@ struct MultiproofInput<Factory> {
/// concurrency, further calculation requests are queued and spawn later, after
/// availability has been signaled.
#[derive(Debug)]
struct MultiproofManager<Factory> {
pub struct MultiproofManager<Factory> {
/// Maximum number of concurrent calculations.
max_concurrent: usize,
/// Currently running calculations.

View File

@@ -1,3 +1,5 @@
//! Caching and prewarming related functionality.
use crate::tree::{
cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
payload_processor::{

View File

@@ -1,4 +1,4 @@
//! Contains the implementation of the sparse trie logic responsible for creating the
//! Sparse Trie task related functionality.
use crate::tree::payload_processor::{
executor::WorkloadExecutor,
@@ -108,8 +108,10 @@ where
/// Outcome of the state root computation, including the state root itself with
/// the trie updates.
#[derive(Debug)]
pub(crate) struct StateRootComputeOutcome {
pub struct StateRootComputeOutcome {
/// The state root.
pub state_root: B256,
/// The trie updates.
pub trie_updates: TrieUpdates,
}

File diff suppressed because it is too large Load Diff