feat: global runtime (#21934)

This commit is contained in:
DaniPopes
2026-02-11 04:45:09 +01:00
committed by GitHub
parent 33467ea6dd
commit 68e4ff1f7d
119 changed files with 1612 additions and 1126 deletions

View File

@@ -141,6 +141,7 @@ test-utils = [
"reth-ethereum-primitives/test-utils",
"reth-node-ethereum/test-utils",
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
[[test]]

View File

@@ -12,8 +12,7 @@ use rand::Rng;
use reth_chainspec::ChainSpec;
use reth_db_common::init::init_genesis;
use reth_engine_tree::tree::{
executor::WorkloadExecutor, precompile_cache::PrecompileCacheMap, PayloadProcessor,
StateProviderBuilder, TreeConfig,
precompile_cache::PrecompileCacheMap, PayloadProcessor, StateProviderBuilder, TreeConfig,
};
use reth_ethereum_primitives::TransactionSigned;
use reth_evm::OnStateHook;
@@ -219,7 +218,7 @@ fn bench_state_root(c: &mut Criterion) {
setup_provider(&factory, &state_updates).expect("failed to setup provider");
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),

View File

@@ -138,7 +138,7 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
self.pipeline_task_spawner.spawn_critical_blocking_task(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;

View File

@@ -1,47 +0,0 @@
//! Executor for mixed I/O and CPU workloads.
use reth_trie_parallel::root::get_tokio_runtime_handle;
use tokio::{runtime::Handle, task::JoinHandle};
/// An executor for mixed I/O and CPU workloads.
///
/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
/// runtime if available or create its own.
#[derive(Debug, Clone)]
pub struct WorkloadExecutor {
inner: WorkloadExecutorInner,
}
impl Default for WorkloadExecutor {
fn default() -> Self {
Self { inner: WorkloadExecutorInner::new() }
}
}
impl WorkloadExecutor {
/// Returns the handle to the tokio runtime
pub(super) const fn handle(&self) -> &Handle {
&self.inner.handle
}
/// Runs the provided function on an executor dedicated to blocking operations.
#[track_caller]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.inner.handle.spawn_blocking(func)
}
}
#[derive(Debug, Clone)]
struct WorkloadExecutorInner {
handle: Handle,
}
impl WorkloadExecutorInner {
fn new() -> Self {
Self { handle: get_tokio_runtime_handle() }
}
}

View File

@@ -15,7 +15,6 @@ use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
use alloy_evm::block::StateChangeSource;
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use executor::WorkloadExecutor;
use metrics::{Counter, Histogram};
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
@@ -34,6 +33,7 @@ use reth_provider::{
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
@@ -55,7 +55,6 @@ use std::{
use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
pub mod executor;
pub mod multiproof;
mod preserved_sparse_trie;
pub mod prewarm;
@@ -109,7 +108,7 @@ where
Evm: ConfigureEvm,
{
/// The executor used by to spawn tasks.
executor: WorkloadExecutor,
executor: Runtime,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
/// Metrics for trie operations
@@ -146,13 +145,13 @@ where
Evm: ConfigureEvm<Primitives = N>,
{
/// Returns a reference to the workload executor driving payload tasks.
pub const fn executor(&self) -> &WorkloadExecutor {
pub const fn executor(&self) -> &Runtime {
&self.executor
}
/// Creates a new payload processor.
pub fn new(
executor: WorkloadExecutor,
executor: Runtime,
evm_config: Evm,
config: &TreeConfig,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
@@ -280,7 +279,7 @@ where
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
&self.executor,
task_ctx,
storage_worker_count,
account_worker_count,
@@ -1001,9 +1000,7 @@ mod tests {
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
},
payload_processor::{evm_state_to_hashed_post_state, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
StateProviderBuilder, TreeConfig,
};
@@ -1105,7 +1102,7 @@ mod tests {
#[test]
fn on_inserted_executed_block_populates_cache() {
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1134,7 +1131,7 @@ mod tests {
#[test]
fn on_inserted_executed_block_skips_on_parent_mismatch() {
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
@@ -1269,7 +1266,7 @@ mod tests {
}
let mut payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),

View File

@@ -1547,17 +1547,11 @@ mod tests {
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
use revm_primitives::{B256, U256};
use std::sync::{Arc, OnceLock};
use tokio::runtime::{Handle, Runtime};
/// Get a handle to the test runtime, creating it if necessary
fn get_test_runtime_handle() -> Handle {
static TEST_RT: OnceLock<Runtime> = OnceLock::new();
TEST_RT
.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
})
.handle()
.clone()
/// Get a test runtime, creating it if necessary
fn get_test_runtime() -> &'static reth_tasks::Runtime {
static TEST_RT: OnceLock<reth_tasks::Runtime> = OnceLock::new();
TEST_RT.get_or_init(reth_tasks::Runtime::test)
}
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
@@ -1573,11 +1567,11 @@ mod tests {
+ Send
+ 'static,
{
let rt_handle = get_test_runtime_handle();
let runtime = get_test_runtime();
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, 1, 1, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();

View File

@@ -15,7 +15,6 @@ use crate::tree::{
cached_state::{CachedStateProvider, SavedCache},
payload_processor::{
bal::{self, total_slots, BALSlotIter},
executor::WorkloadExecutor,
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
},
@@ -37,6 +36,7 @@ use reth_provider::{
StateReader,
};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::Runtime;
use reth_trie::MultiProofTargets;
use std::{
ops::Range,
@@ -78,7 +78,7 @@ where
Evm: ConfigureEvm<Primitives = N>,
{
/// The executor used to spawn execution tasks.
executor: WorkloadExecutor,
executor: Runtime,
/// Shared execution cache.
execution_cache: PayloadExecutionCache,
/// Context provided to execution tasks
@@ -101,7 +101,7 @@ where
{
/// Initializes the task with the given transactions pending execution
pub fn new(
executor: WorkloadExecutor,
executor: Runtime,
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
@@ -667,7 +667,7 @@ where
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &WorkloadExecutor,
task_executor: &Runtime,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: Sender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
@@ -704,7 +704,7 @@ where
fn spawn_bal_worker(
&self,
idx: usize,
executor: &WorkloadExecutor,
executor: &Runtime,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,

View File

@@ -1,6 +1,5 @@
//! Sparse Trie task related functionality.
use super::executor::WorkloadExecutor;
use crate::tree::{
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
@@ -13,6 +12,7 @@ use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_tasks::Runtime;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
@@ -282,7 +282,7 @@ where
{
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
pub(super) fn new_with_trie(
executor: &WorkloadExecutor,
executor: &Runtime,
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,

View File

@@ -4,7 +4,7 @@ use crate::tree::{
cached_state::CachedStateProvider,
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::InstrumentedStateProvider,
payload_processor::{executor::WorkloadExecutor, PayloadProcessor},
payload_processor::PayloadProcessor,
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
@@ -134,6 +134,8 @@ where
validator: V,
/// Changeset cache for in-memory trie changesets
changeset_cache: ChangesetCache,
/// Task runtime for spawning parallel work.
runtime: reth_tasks::Runtime,
}
impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
@@ -166,10 +168,11 @@ where
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
changeset_cache: ChangesetCache,
runtime: reth_tasks::Runtime,
) -> Self {
let precompile_cache_map = PrecompileCacheMap::default();
let payload_processor = PayloadProcessor::new(
WorkloadExecutor::default(),
runtime.clone(),
evm_config.clone(),
&config,
precompile_cache_map.clone(),
@@ -186,6 +189,7 @@ where
metrics: EngineApiMetrics::default(),
validator,
changeset_cache,
runtime,
}
}
@@ -874,7 +878,8 @@ where
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
ParallelStateRoot::new(overlay_factory, prefix_sets).incremental_root_with_updates()
ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
.incremental_root_with_updates()
}
/// Compute state root for the given hashed post state in serial.

View File

@@ -203,6 +203,7 @@ impl TestHarness {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
let tree = EngineApiTreeHandler::new(
@@ -404,6 +405,7 @@ impl ValidatorTestHarness {
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache,
reth_tasks::Runtime::test(),
);
Self { harness, validator, metrics: TestMetrics::default() }