From 1c6c80452e3fcfec133fa97f288ebaab841d62ae Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Sun, 3 Mar 2024 13:02:05 +0100 Subject: [PATCH] chore(task): move blocking pool to `reth-tasks` (#6929) --- Cargo.lock | 3 ++- crates/rpc/rpc-builder/Cargo.toml | 2 +- crates/rpc/rpc-builder/src/auth.rs | 6 +++--- crates/rpc/rpc-builder/src/eth.rs | 3 ++- crates/rpc/rpc-builder/src/lib.rs | 11 +++++++---- crates/rpc/rpc/Cargo.toml | 3 +-- crates/rpc/rpc/src/debug.rs | 3 ++- crates/rpc/rpc/src/eth/api/mod.rs | 5 +---- crates/rpc/rpc/src/eth/api/server.rs | 3 ++- crates/rpc/rpc/src/eth/api/state.rs | 9 +++------ crates/rpc/rpc/src/eth/api/transactions.rs | 13 +++++-------- crates/rpc/rpc/src/eth/bundle.rs | 14 ++++++-------- crates/rpc/rpc/src/lib.rs | 2 -- crates/rpc/rpc/src/trace.rs | 14 ++++++-------- crates/tasks/Cargo.toml | 7 +++++++ crates/tasks/src/lib.rs | 7 +++++++ .../rpc/src/blocking_pool.rs => tasks/src/pool.rs} | 0 17 files changed, 55 insertions(+), 50 deletions(-) rename crates/{rpc/rpc/src/blocking_pool.rs => tasks/src/pool.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index 8227eb1c9a..9351e4227e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6705,7 +6705,6 @@ dependencies = [ "metrics", "pin-project", "rand 0.8.5", - "rayon", "reqwest", "reth-consensus-common", "reth-interfaces", @@ -6933,6 +6932,8 @@ dependencies = [ "dyn-clone", "futures-util", "metrics", + "pin-project", + "rayon", "reth-metrics", "thiserror", "tokio", diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index a5c9f91573..b79b485d34 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -18,7 +18,7 @@ reth-network-api.workspace = true reth-provider.workspace = true reth-rpc.workspace = true reth-rpc-api.workspace = true -reth-tasks.workspace = true +reth-tasks = { workspace = true, features = ["rayon"] } reth-transaction-pool.workspace = true reth-node-api.workspace = true diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index b1fc2389ac..dd8ab70c0a 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -22,11 +22,11 @@ use reth_rpc::{ cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig, FeeHistoryCache, FeeHistoryCacheConfig, }, - AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, - EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, + AuthLayer, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider, + JwtAuthValidator, JwtSecret, }; use reth_rpc_api::servers::*; -use reth_tasks::TaskSpawner; +use reth_tasks::{pool::BlockingTaskPool, TaskSpawner}; use reth_transaction_pool::TransactionPool; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index d6fc987850..a02e15c593 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -7,8 +7,9 @@ use reth_rpc::{ gas_oracle::GasPriceOracleConfig, EthFilterConfig, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP, }, - BlockingTaskPool, EthApi, EthFilter, EthPubSub, + EthApi, EthFilter, EthPubSub, }; +use reth_tasks::pool::BlockingTaskPool; use serde::{Deserialize, Serialize}; /// All handlers for the `eth` namespace diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 95d75256c5..3ae0bd8b4b 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -195,12 +195,15 @@ use reth_rpc::{ gas_oracle::GasPriceOracle, EthBundle, FeeHistoryCache, }, - AdminApi, AuthLayer, BlockingTaskGuard, BlockingTaskPool, Claims, DebugApi, EngineEthApi, - EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, NetApi, - OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api, + AdminApi, AuthLayer, Claims, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, + EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, NetApi, OtterscanApi, RPCApi, RethApi, + TraceApi, TxPoolApi, Web3Api, }; use reth_rpc_api::servers::*; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use reth_tasks::{ + pool::{BlockingTaskGuard, BlockingTaskPool}, + TaskSpawner, TokioTaskExecutor, +}; use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool}; // re-export for convenience pub use crate::eth::{EthConfig, EthHandlers}; diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 42f0f4087d..4d2b23c1d8 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -23,7 +23,7 @@ reth-network-api.workspace = true reth-network.workspace = true reth-rpc-engine-api.workspace = true reth-revm = { workspace = true, features = ["js-tracer"] } -reth-tasks.workspace = true +reth-tasks = { workspace = true, features = ["rayon"] } reth-consensus-common.workspace = true reth-rpc-types-compat.workspace = true revm-inspectors.workspace = true @@ -59,7 +59,6 @@ tokio = { workspace = true, features = ["sync"] } tower = "0.4" tokio-stream = { workspace = true, features = ["sync"] } pin-project.workspace = true -rayon.workspace = true # metrics reth-metrics.workspace = true diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index b5105dcd3f..cb0e0d99c9 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -8,7 +8,7 @@ use crate::{ EthTransactions, }, result::{internal_rpc_err, ToRpcResult}, - BlockingTaskGuard, EthApiSpec, + EthApiSpec, }; use alloy_rlp::{Decodable, Encodable}; use async_trait::async_trait; @@ -29,6 +29,7 @@ use reth_rpc_types::{ }, BlockError, Bundle, RichBlock, StateContext, TransactionRequest, }; +use reth_tasks::pool::BlockingTaskGuard; use revm::{ db::CacheDB, primitives::{db::DatabaseCommit, BlockEnv, CfgEnvWithHandlerCfg, Env, EnvWithHandlerCfg}, diff --git a/crates/rpc/rpc/src/eth/api/mod.rs b/crates/rpc/rpc/src/eth/api/mod.rs index 6a20a06644..0d229e0a78 100644 --- a/crates/rpc/rpc/src/eth/api/mod.rs +++ b/crates/rpc/rpc/src/eth/api/mod.rs @@ -21,12 +21,11 @@ use reth_primitives::{ Address, BlockId, BlockNumberOrTag, ChainInfo, SealedBlockWithSenders, SealedHeader, B256, U256, U64, }; - use reth_provider::{ BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderBox, StateProviderFactory, }; use reth_rpc_types::{SyncInfo, SyncStatus}; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use reth_tasks::{pool::BlockingTaskPool, TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::TransactionPool; use revm_primitives::{CfgEnv, SpecId}; use std::{ @@ -35,7 +34,6 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; - use tokio::sync::{oneshot, Mutex}; mod block; @@ -50,7 +48,6 @@ mod sign; mod state; mod transactions; -use crate::BlockingTaskPool; pub use transactions::{EthTransactions, TransactionSource}; /// `Eth` API trait. diff --git a/crates/rpc/rpc/src/eth/api/server.rs b/crates/rpc/rpc/src/eth/api/server.rs index 73d7377761..d6adca5623 100644 --- a/crates/rpc/rpc/src/eth/api/server.rs +++ b/crates/rpc/rpc/src/eth/api/server.rs @@ -437,7 +437,7 @@ mod tests { cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache, FeeHistoryCacheConfig, }, - BlockingTaskPool, EthApi, + EthApi, }; use jsonrpsee::types::error::INVALID_PARAMS_CODE; use reth_interfaces::test_utils::{generators, generators::Rng}; @@ -453,6 +453,7 @@ mod tests { }; use reth_rpc_api::EthApiServer; use reth_rpc_types::FeeHistory; + use reth_tasks::pool::BlockingTaskPool; use reth_transaction_pool::test_utils::{testing_pool, TestPool}; fn build_test_eth_api< diff --git a/crates/rpc/rpc/src/eth/api/state.rs b/crates/rpc/rpc/src/eth/api/state.rs index 30f477bc45..298afbc8b0 100644 --- a/crates/rpc/rpc/src/eth/api/state.rs +++ b/crates/rpc/rpc/src/eth/api/state.rs @@ -118,16 +118,13 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{ - eth::{ - cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache, - FeeHistoryCacheConfig, - }, - BlockingTaskPool, + use crate::eth::{ + cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache, FeeHistoryCacheConfig, }; use reth_node_ethereum::EthEvmConfig; use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, StorageKey, StorageValue}; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider, NoopProvider}; + use reth_tasks::pool::BlockingTaskPool; use reth_transaction_pool::test_utils::testing_pool; use std::collections::HashMap; diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index 307847adfd..da982f1ca7 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -65,7 +65,7 @@ pub(crate) type StateCacheDB = CacheDB>; /// Commonly used transaction related functions for the [EthApi] type in the `eth_` namespace. /// /// Async functions that are spawned onto the -/// [BlockingTaskPool](crate::blocking_pool::BlockingTaskPool) begin with `spawn_` +/// [BlockingTaskPool](reth_tasks::pool::BlockingTaskPool) begin with `spawn_` /// /// /// ## Calls @@ -276,7 +276,7 @@ pub trait EthTransactions: Send + Sync { /// the database that points to the beginning of the transaction. /// /// Note: Implementers should use a threadpool where blocking is allowed, such as - /// [BlockingTaskPool](crate::blocking_pool::BlockingTaskPool). + /// [BlockingTaskPool](reth_tasks::pool::BlockingTaskPool). async fn spawn_trace_transaction_in_block( &self, hash: B256, @@ -1499,17 +1499,14 @@ pub(crate) fn build_transaction_receipt_with_block_receipts( #[cfg(test)] mod tests { use super::*; - use crate::{ - eth::{ - cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache, - FeeHistoryCacheConfig, - }, - BlockingTaskPool, + use crate::eth::{ + cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache, FeeHistoryCacheConfig, }; use reth_network_api::noop::NoopNetwork; use reth_node_ethereum::EthEvmConfig; use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, hex_literal::hex}; use reth_provider::test_utils::NoopProvider; + use reth_tasks::pool::BlockingTaskPool; use reth_transaction_pool::test_utils::testing_pool; #[tokio::test] diff --git a/crates/rpc/rpc/src/eth/bundle.rs b/crates/rpc/rpc/src/eth/bundle.rs index 386c141528..8bed3b6c6a 100644 --- a/crates/rpc/rpc/src/eth/bundle.rs +++ b/crates/rpc/rpc/src/eth/bundle.rs @@ -1,13 +1,10 @@ //! `Eth` bundle implementation and helpers. -use crate::{ - eth::{ - error::{EthApiError, EthResult, RpcInvalidTransactionError}, - revm_utils::FillableTransaction, - utils::recover_raw_transaction, - EthTransactions, - }, - BlockingTaskGuard, +use crate::eth::{ + error::{EthApiError, EthResult, RpcInvalidTransactionError}, + revm_utils::FillableTransaction, + utils::recover_raw_transaction, + EthTransactions, }; use jsonrpsee::core::RpcResult; use reth_primitives::{ @@ -18,6 +15,7 @@ use reth_primitives::{ use reth_revm::database::StateProviderDatabase; use reth_rpc_api::EthCallBundleApiServer; use reth_rpc_types::{EthCallBundle, EthCallBundleResponse, EthCallBundleTransactionResult}; +use reth_tasks::pool::BlockingTaskGuard; use revm::{ db::CacheDB, primitives::{ResultAndState, TxEnv}, diff --git a/crates/rpc/rpc/src/lib.rs b/crates/rpc/rpc/src/lib.rs index cb068ad787..f2c5c0a294 100644 --- a/crates/rpc/rpc/src/lib.rs +++ b/crates/rpc/rpc/src/lib.rs @@ -37,7 +37,6 @@ mod trace; mod txpool; mod web3; pub use admin::AdminApi; -pub use blocking_pool::{BlockingTaskGuard, BlockingTaskPool}; pub use debug::DebugApi; pub use engine::{EngineApi, EngineEthApi}; pub use eth::{EthApi, EthApiSpec, EthFilter, EthPubSub, EthSubscriptionIdProvider}; @@ -49,5 +48,4 @@ pub use rpc::RPCApi; pub use trace::TraceApi; pub use txpool::TxPoolApi; pub use web3::Web3Api; -pub mod blocking_pool; pub mod result; diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index d559eacfff..9b59b66640 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -1,11 +1,8 @@ -use crate::{ - eth::{ - error::{EthApiError, EthResult}, - revm_utils::{inspect, inspect_and_return_db, prepare_call_env, EvmOverrides}, - utils::recover_raw_transaction, - EthTransactions, - }, - BlockingTaskGuard, +use crate::eth::{ + error::{EthApiError, EthResult}, + revm_utils::{inspect, inspect_and_return_db, prepare_call_env, EvmOverrides}, + utils::recover_raw_transaction, + EthTransactions, }; use async_trait::async_trait; use jsonrpsee::core::RpcResult as Result; @@ -24,6 +21,7 @@ use reth_rpc_types::{ trace::{filter::TraceFilter, parity::*, tracerequest::TraceCallRequest}, BlockError, BlockOverrides, Index, TransactionRequest, }; +use reth_tasks::pool::BlockingTaskGuard; use revm::{ db::{CacheDB, DatabaseCommit}, primitives::EnvWithHandlerCfg, diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index ebd0df6bff..4abeba7703 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -27,5 +27,12 @@ tracing.workspace = true thiserror.workspace = true dyn-clone = "1.0" +# feature `rayon` +rayon = { workspace = true, optional = true } +pin-project = { workspace = true, optional = true } + [dev-dependencies] tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } + +[features] +rayon = ["dep:rayon", "pin-project"] diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index c651196f81..02271b96fa 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -1,4 +1,8 @@ //! Reth task management. +//! +//! # Feature Flags +//! +//! - `rayon`: Enable rayon thread pool for blocking tasks. #![doc( html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", @@ -38,6 +42,9 @@ use tracing_futures::Instrument; pub mod metrics; pub mod shutdown; +#[cfg(feature = "rayon")] +pub mod pool; + /// A type that can spawn tasks. /// /// The main purpose of this type is to abstract over [TaskExecutor] so it's more convenient to diff --git a/crates/rpc/rpc/src/blocking_pool.rs b/crates/tasks/src/pool.rs similarity index 100% rename from crates/rpc/rpc/src/blocking_pool.rs rename to crates/tasks/src/pool.rs