From 823866ac571930f0f8a59cda77cee9e4f7b187df Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Thu, 23 Feb 2023 00:39:37 +0200 Subject: [PATCH] feat(rpc): launch auth server (#1506) Co-authored-by: Georgios Konstantopoulos --- Cargo.lock | 3 + bin/reth/Cargo.toml | 1 + bin/reth/src/args/rpc_server_args.rs | 57 +++++++++++--- bin/reth/src/node/mod.rs | 54 ++++++++++--- crates/rpc/rpc-builder/Cargo.toml | 3 +- crates/rpc/rpc-builder/src/auth.rs | 76 +++++++++++++++++++ crates/rpc/rpc-builder/src/constants.rs | 16 ++++ crates/rpc/rpc-builder/src/cors.rs | 44 +++++++++++ crates/rpc/rpc-builder/src/lib.rs | 71 ++++------------- crates/rpc/rpc-engine-api/src/engine_api.rs | 38 +++++++--- crates/rpc/rpc-engine-api/src/lib.rs | 2 +- crates/rpc/rpc/src/engine/mod.rs | 16 ++-- .../storage/provider/src/test_utils/mock.rs | 20 +++++ crates/transaction-pool/Cargo.toml | 1 + crates/transaction-pool/src/traits.rs | 1 + 15 files changed, 303 insertions(+), 100 deletions(-) create mode 100644 crates/rpc/rpc-builder/src/auth.rs create mode 100644 crates/rpc/rpc-builder/src/constants.rs create mode 100644 crates/rpc/rpc-builder/src/cors.rs diff --git a/Cargo.lock b/Cargo.lock index 677a56f9ae..3b72b16fbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4369,6 +4369,7 @@ dependencies = [ "reth-rlp", "reth-rpc", "reth-rpc-builder", + "reth-rpc-engine-api", "reth-staged-sync", "reth-stages", "reth-tasks", @@ -4944,6 +4945,7 @@ dependencies = [ "reth-provider", "reth-rpc", "reth-rpc-api", + "reth-rpc-engine-api", "reth-rpc-types", "reth-tracing", "reth-transaction-pool", @@ -5099,6 +5101,7 @@ version = "0.1.0" dependencies = [ "aquamarine", "async-trait", + "auto_impl 1.0.1", "bitflags", "fnv", "futures-util", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index d83f4f213c..4e32f4e5a6 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -19,6 +19,7 @@ reth-transaction-pool = { path = "../../crates/transaction-pool", features = ["t reth-consensus = { path = "../../crates/consensus" } reth-executor = { path = "../../crates/executor" } reth-eth-wire = { path = "../../crates/net/eth-wire" } +reth-rpc-engine-api = { path = "../../crates/rpc/rpc-engine-api" } reth-rpc-builder = { path = "../../crates/rpc/rpc-builder" } reth-rpc = { path = "../../crates/rpc/rpc" } reth-rlp = { path = "../../crates/rlp" } diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 7624ed5a16..62d9961cba 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -2,14 +2,15 @@ use crate::dirs::{JwtSecretPath, PlatformPath}; use clap::Args; -use jsonrpsee::core::Error as RpcError; +use jsonrpsee::{core::Error as RpcError, server::ServerHandle}; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{JwtError, JwtSecret}; use reth_rpc_builder::{ - IpcServerBuilder, RethRpcModule, RpcModuleSelection, RpcServerConfig, RpcServerHandle, - ServerBuilder, TransportRpcModuleConfig, DEFAULT_HTTP_RPC_PORT, DEFAULT_IPC_ENDPOINT, + constants, IpcServerBuilder, RethRpcModule, RpcModuleSelection, RpcServerConfig, + RpcServerHandle, ServerBuilder, TransportRpcModuleConfig, }; +use reth_rpc_engine_api::EngineApiHandle; use reth_transaction_pool::TransactionPool; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -48,7 +49,7 @@ pub struct RpcServerArgs { #[arg(long = "ws.addr")] pub ws_addr: Option, - /// Http server port to listen on + /// Ws server port to listen on #[arg(long = "ws.port")] pub ws_port: Option, @@ -64,9 +65,17 @@ pub struct RpcServerArgs { #[arg(long)] pub ipcpath: Option, + /// Auth server address to listen on + #[arg(long = "authrpc.addr")] + pub auth_addr: Option, + + /// Auth server port to listen on + #[arg(long = "authrpc.port")] + pub auth_port: Option, + /// Path to a JWT secret to use for authenticated RPC endpoints #[arg(long = "authrpc.jwtsecret", value_name = "PATH", global = true, required = false)] - authrpc_jwtsecret: Option>, + auth_jwtsecret: Option>, } impl RpcServerArgs { @@ -81,7 +90,7 @@ impl RpcServerArgs { /// duration of the execution, and SHOULD store the hex-encoded secret as a jwt.hex file on /// the filesystem. This file can then be used to provision the counterpart client. pub(crate) fn jwt_secret(&self) -> Result { - let arg = self.authrpc_jwtsecret.as_ref(); + let arg = self.auth_jwtsecret.as_ref(); let path: Option<&Path> = arg.map(|p| p.as_ref()); match path { Some(fpath) => JwtSecret::from_file(fpath), @@ -94,7 +103,7 @@ impl RpcServerArgs { } /// Convenience function for starting a rpc server with configs which extracted from cli args. - pub(crate) async fn start_server( + pub(crate) async fn start_rpc_server( &self, client: Client, pool: Pool, @@ -115,6 +124,27 @@ impl RpcServerArgs { .await } + /// Create Engine API server. + pub(crate) async fn start_auth_server( + &self, + client: Client, + pool: Pool, + network: Network, + handle: EngineApiHandle, + ) -> Result + where + Client: BlockProvider + HeaderProvider + StateProviderFactory + Clone + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, + { + let socket_address = SocketAddr::new( + self.auth_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), + self.auth_port.unwrap_or(constants::DEFAULT_AUTH_PORT), + ); + let secret = self.jwt_secret().map_err(|err| RpcError::Custom(err.to_string()))?; + reth_rpc_builder::auth::launch(client, pool, network, handle, socket_address, secret).await + } + /// Creates the [TransportRpcModuleConfig] from cli args. fn transport_rpc_module_config(&self) -> TransportRpcModuleConfig { let mut config = TransportRpcModuleConfig::default(); @@ -138,7 +168,7 @@ impl RpcServerArgs { if self.http { let socket_address = SocketAddr::new( self.http_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), - self.http_port.unwrap_or(DEFAULT_HTTP_RPC_PORT), + self.http_port.unwrap_or(constants::DEFAULT_HTTP_RPC_PORT), ); config = config .with_http_address(socket_address) @@ -149,7 +179,7 @@ impl RpcServerArgs { if self.ws { let socket_address = SocketAddr::new( self.ws_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), - self.ws_port.unwrap_or(DEFAULT_HTTP_RPC_PORT), + self.ws_port.unwrap_or(constants::DEFAULT_HTTP_RPC_PORT), ); config = config.with_ws_address(socket_address).with_http(ServerBuilder::new()); } @@ -157,7 +187,7 @@ impl RpcServerArgs { if !self.ipcdisable { let ipc_builder = IpcServerBuilder::default(); config = config.with_ipc(ipc_builder).with_ipc_endpoint( - self.ipcpath.as_ref().unwrap_or(&DEFAULT_IPC_ENDPOINT.to_string()), + self.ipcpath.as_ref().unwrap_or(&constants::DEFAULT_IPC_ENDPOINT.to_string()), ); } @@ -226,12 +256,15 @@ mod tests { let config = args.rpc_server_config(); assert_eq!( config.http_address().unwrap(), - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_HTTP_RPC_PORT)) + SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::UNSPECIFIED, + constants::DEFAULT_HTTP_RPC_PORT + )) ); assert_eq!( config.ws_address().unwrap(), SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8888)) ); - assert_eq!(config.ipc_endpoint().unwrap().path(), DEFAULT_IPC_ENDPOINT); + assert_eq!(config.ipc_endpoint().unwrap().path(), constants::DEFAULT_IPC_ENDPOINT); } } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 47496e87fa..d9a03ad3d2 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -36,6 +36,7 @@ use reth_network::{ use reth_network_api::NetworkInfo; use reth_primitives::{BlockNumber, ChainSpec, Head, H256}; use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; +use reth_rpc_engine_api::{EngineApi, EngineApiHandle}; use reth_staged_sync::{ utils::{ chainspec::genesis_value_parser, @@ -50,7 +51,8 @@ use reth_stages::{ }; use reth_tasks::TaskExecutor; use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; -use tracing::{debug, info, trace, warn}; +use tokio::sync::{mpsc::unbounded_channel, watch}; +use tracing::*; /// Start the node #[derive(Debug, Parser)] @@ -124,6 +126,7 @@ impl Command { info!(target: "reth::cli", path = %self.db, "Opening database"); let db = Arc::new(init_db(&self.db)?); + let shareable_db = ShareableDatabase::new(Arc::clone(&db), self.chain.clone()); info!(target: "reth::cli", "Database opened"); self.start_metrics_endpoint()?; @@ -132,7 +135,7 @@ impl Command { init_genesis(db.clone(), self.chain.clone())?; - let consensus = self.init_consensus()?; + let (consensus, forkchoice_state_tx) = self.init_consensus()?; info!(target: "reth::cli", "Consensus engine initialized"); self.init_trusted_nodes(&mut config); @@ -143,20 +146,30 @@ impl Command { let network = self.start_network(network_config, &ctx.task_executor, ()).await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); - // TODO: Use the resolved secret to spawn the Engine API server - // Look at `reth_rpc::AuthLayer` for integration hints - let _secret = self.rpc.jwt_secret(); + let test_transaction_pool = reth_transaction_pool::test_utils::testing_pool(); + info!(target: "reth::cli", "Test transaction pool initialized"); let _rpc_server = self .rpc - .start_server( - ShareableDatabase::new(db.clone(), self.chain.clone()), - reth_transaction_pool::test_utils::testing_pool(), - network.clone(), - ) + .start_rpc_server(shareable_db.clone(), test_transaction_pool.clone(), network.clone()) .await?; info!(target: "reth::cli", "Started RPC server"); + let engine_api_handle = + self.init_engine_api(Arc::clone(&db), forkchoice_state_tx, &ctx.task_executor); + info!(target: "reth::cli", "Engine API handler initialized"); + + let _auth_server = self + .rpc + .start_auth_server( + shareable_db, + test_transaction_pool, + network.clone(), + engine_api_handle, + ) + .await?; + info!(target: "reth::cli", "Started Auth server"); + let (mut pipeline, events) = self .build_networked_pipeline( &mut config, @@ -238,7 +251,7 @@ impl Command { } } - fn init_consensus(&self) -> eyre::Result> { + fn init_consensus(&self) -> eyre::Result<(Arc, watch::Sender)> { let (consensus, notifier) = BeaconConsensus::builder().build(self.chain.clone()); if let Some(tip) = self.tip { @@ -255,7 +268,24 @@ impl Command { warn!(target: "reth::cli", warn_msg); } - Ok(consensus) + Ok((consensus, notifier)) + } + + fn init_engine_api( + &self, + db: Arc>, + forkchoice_state_tx: watch::Sender, + task_executor: &TaskExecutor, + ) -> EngineApiHandle { + let (message_tx, message_rx) = unbounded_channel(); + let engine_api = EngineApi::new( + ShareableDatabase::new(db, self.chain.clone()), + self.chain.clone(), + message_rx, + forkchoice_state_tx, + ); + task_executor.spawn(engine_api); + message_tx } /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 9b1b144b1c..67e32a50ad 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -13,12 +13,13 @@ reth-network-api = { path = "../../net/network-api" } reth-provider = { path = "../../storage/provider" } reth-rpc = { path = "../rpc" } reth-rpc-api = { path = "../rpc-api" } +reth-rpc-engine-api = { path = "../rpc-engine-api" } reth-rpc-types = { path = "../rpc-types" } reth-transaction-pool = { path = "../../transaction-pool" } jsonrpsee = { version = "0.16", features = ["server"] } tower-http = { version = "0.3", features = ["full"] } -tower = {version = "0.4" , features = ["full"] } +tower = { version = "0.4", features = ["full"] } hyper = "0.14" strum = { version = "0.24", features = ["derive"] } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs new file mode 100644 index 0000000000..9adaa96f80 --- /dev/null +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -0,0 +1,76 @@ +use crate::{constants::DEFAULT_AUTH_PORT, RpcServerConfig}; +use hyper::{http::HeaderValue, Method}; +pub use jsonrpsee::server::ServerBuilder; +use jsonrpsee::{ + core::{ + server::{host_filtering::AllowHosts, rpc_module::Methods}, + Error as RpcError, + }, + server::{middleware, Server, ServerHandle}, + RpcModule, +}; +use reth_ipc::server::IpcServer; +pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; +use reth_network_api::{NetworkInfo, Peers}; +use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory}; +use reth_rpc::{ + AdminApi, AuthLayer, DebugApi, EngineApi, EthApi, JwtAuthValidator, JwtSecret, NetApi, + TraceApi, Web3Api, +}; +use reth_rpc_api::servers::*; +use reth_rpc_engine_api::EngineApiHandle; +use reth_transaction_pool::TransactionPool; +use serde::{Deserialize, Serialize, Serializer}; +use std::{ + collections::{hash_map::Entry, HashMap}, + fmt, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + str::FromStr, +}; +use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames}; +use tower::layer::util::{Identity, Stack}; +use tower_http::cors::{AllowOrigin, Any, CorsLayer}; + +/// Configure and launch an auth server with `engine` and `eth` namespaces. +pub async fn launch( + client: Client, + pool: Pool, + network: Network, + handle: EngineApiHandle, + socket_addr: SocketAddr, + secret: JwtSecret, +) -> Result +where + Client: BlockProvider + HeaderProvider + StateProviderFactory + Clone + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, +{ + launch_with_eth_api(EthApi::new(client, pool, network), handle, socket_addr, secret).await +} + +/// Configure and launch an auth server with existing EthApi implementation. +pub async fn launch_with_eth_api( + eth_api: EthApi, + handle: EngineApiHandle, + socket_addr: SocketAddr, + secret: JwtSecret, +) -> Result +where + Client: BlockProvider + HeaderProvider + StateProviderFactory + Clone + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, +{ + // Configure the module and start the server. + let mut module = RpcModule::new(()); + module.merge(EngineApi::new(handle).into_rpc()); + module.merge(eth_api.into_rpc()); + + // Create auth middleware. + let middleware = + tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret))); + + // By default both http and ws are enabled. + let server = ServerBuilder::new().set_middleware(middleware).build(socket_addr).await?; + + server.start(module) +} diff --git a/crates/rpc/rpc-builder/src/constants.rs b/crates/rpc/rpc-builder/src/constants.rs new file mode 100644 index 0000000000..a1b2bc36a8 --- /dev/null +++ b/crates/rpc/rpc-builder/src/constants.rs @@ -0,0 +1,16 @@ +/// The default port for the http server +pub const DEFAULT_HTTP_RPC_PORT: u16 = 8545; + +/// The default port for the ws server +pub const DEFAULT_WS_RPC_PORT: u16 = 8546; + +/// The default port for the auth server. +pub const DEFAULT_AUTH_PORT: u16 = 8551; + +/// The default IPC endpoint +#[cfg(windows)] +pub const DEFAULT_IPC_ENDPOINT: &str = r"\\.\pipe\reth.ipc"; + +/// The default IPC endpoint +#[cfg(not(windows))] +pub const DEFAULT_IPC_ENDPOINT: &str = "/tmp/reth.ipc"; diff --git a/crates/rpc/rpc-builder/src/cors.rs b/crates/rpc/rpc-builder/src/cors.rs new file mode 100644 index 0000000000..7b7492c86d --- /dev/null +++ b/crates/rpc/rpc-builder/src/cors.rs @@ -0,0 +1,44 @@ +use hyper::{http::HeaderValue, Method}; +use tower_http::cors::{AllowOrigin, Any, CorsLayer}; + +/// Error thrown when parsing cors domains went wrong +#[derive(Debug, thiserror::Error)] +pub(crate) enum CorsDomainError { + #[error("{domain} is an invalid header value")] + InvalidHeader { domain: String }, + #[error("Wildcard origin (`*`) cannot be passed as part of a list: {input}")] + WildCardNotAllowed { input: String }, +} + +/// Creates a [CorsLayer] from the given domains +pub(crate) fn create_cors_layer(http_cors_domains: &str) -> Result { + let cors = match http_cors_domains.trim() { + "*" => CorsLayer::new() + .allow_methods([Method::GET, Method::POST]) + .allow_origin(Any) + .allow_headers(Any), + _ => { + let iter = http_cors_domains.split(','); + if iter.clone().any(|o| o == "*") { + return Err(CorsDomainError::WildCardNotAllowed { + input: http_cors_domains.to_string(), + }) + } + + let origins = iter + .map(|domain| { + domain + .parse::() + .map_err(|_| CorsDomainError::InvalidHeader { domain: domain.to_string() }) + }) + .collect::, _>>()?; + + let origin = AllowOrigin::list(origins); + CorsLayer::new() + .allow_methods([Method::GET, Method::POST]) + .allow_origin(origin) + .allow_headers(Any) + } + }; + Ok(cors) +} diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 5c8659a2e1..41cbe57b02 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -52,7 +52,6 @@ //! ``` use hyper::{http::HeaderValue, Method}; -pub use jsonrpsee::server::ServerBuilder; use jsonrpsee::{ core::{ server::{host_filtering::AllowHosts, rpc_module::Methods}, @@ -62,7 +61,6 @@ use jsonrpsee::{ RpcModule, }; use reth_ipc::server::IpcServer; -pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{AdminApi, DebugApi, EthApi, NetApi, TraceApi, Web3Api}; @@ -77,21 +75,20 @@ use std::{ }; use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames}; use tower::layer::util::{Identity, Stack}; -use tower_http::cors::{AllowOrigin, Any, CorsLayer}; +use tower_http::cors::CorsLayer; -/// The default port for the http server -pub const DEFAULT_HTTP_RPC_PORT: u16 = 8545; +pub use jsonrpsee::server::ServerBuilder; +pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; -/// The default port for the ws server -pub const DEFAULT_WS_RPC_PORT: u16 = 8546; +/// Auth server utilities. +pub mod auth; -/// The default IPC endpoint -#[cfg(windows)] -pub const DEFAULT_IPC_ENDPOINT: &str = r"\\.\pipe\reth.ipc"; +/// Common RPC constants. +pub mod constants; +use constants::*; -/// The default IPC endpoint -#[cfg(not(windows))] -pub const DEFAULT_IPC_ENDPOINT: &str = "/tmp/reth.ipc"; +/// Cors utilities. +mod cors; /// Convenience function for starting a server in one step. pub async fn launch( @@ -508,10 +505,10 @@ pub struct RpcServerConfig { http_server_config: Option, /// Cors Domains http_cors_domains: Option, - /// Configs for WS server - ws_server_config: Option, /// Address where to bind the http server to http_addr: Option, + /// Configs for WS server + ws_server_config: Option, /// Address where to bind the ws server to ws_addr: Option, /// Configs for JSON-RPC IPC server @@ -621,7 +618,7 @@ impl RpcServerConfig { ))); if let Some(builder) = self.http_server_config { - if let Some(cors) = self.http_cors_domains.as_deref().map(create_cors_layer) { + if let Some(cors) = self.http_cors_domains.as_deref().map(cors::create_cors_layer) { let cors = cors.map_err(|err| RpcError::Custom(err.to_string()))?; let middleware = tower::ServiceBuilder::new().layer(cors); let http_server = @@ -658,48 +655,6 @@ impl RpcServerConfig { } } -/// Error thrown when parsing cors domains went wrong -#[derive(Debug, thiserror::Error)] -enum CorsDomainError { - #[error("{domain} is an invalid header value")] - InvalidHeader { domain: String }, - #[error("Wildcard origin (`*`) cannot be passed as part of a list: {input}")] - WildCardNotAllowed { input: String }, -} - -/// Creates a [CorsLayer] from the given domains -fn create_cors_layer(http_cors_domains: &str) -> Result { - let cors = match http_cors_domains.trim() { - "*" => CorsLayer::new() - .allow_methods([Method::GET, Method::POST]) - .allow_origin(Any) - .allow_headers(Any), - _ => { - let iter = http_cors_domains.split(','); - if iter.clone().any(|o| o == "*") { - return Err(CorsDomainError::WildCardNotAllowed { - input: http_cors_domains.to_string(), - }) - } - - let origins = iter - .map(|domain| { - domain - .parse::() - .map_err(|_| CorsDomainError::InvalidHeader { domain: domain.to_string() }) - }) - .collect::, _>>()?; - - let origin = AllowOrigin::list(origins); - CorsLayer::new() - .allow_methods([Method::GET, Method::POST]) - .allow_origin(origin) - .allow_headers(Any) - } - }; - Ok(cors) -} - /// Holds modules to be installed per transport type /// /// # Example diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 2fc4d789fb..a635a1f61e 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -10,7 +10,7 @@ use reth_primitives::{ BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, TransactionSigned, H64, U256, }; -use reth_provider::{BlockProvider, HeaderProvider, StateProvider}; +use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory}; use reth_rlp::Decodable; use reth_rpc_types::engine::{ ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, @@ -19,13 +19,15 @@ use reth_rpc_types::engine::{ use std::{ future::Future, pin::Pin, - sync::Arc, task::{ready, Context, Poll}, }; -use tokio::sync::{oneshot, watch}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio_stream::wrappers::UnboundedReceiverStream; -/// The Engine API response sender +/// The Engine API handle. +pub type EngineApiHandle = mpsc::UnboundedSender; + +/// The Engine API response sender. pub type EngineApiSender = oneshot::Sender>; /// The upper limit for payload bodies request. @@ -35,7 +37,7 @@ const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024; /// functions in the Execution layer that are crucial for the consensus process. #[must_use = "EngineApi does nothing unless polled."] pub struct EngineApi { - client: Arc, + client: Client, /// Consensus configuration chain_spec: ChainSpec, message_rx: UnboundedReceiverStream, @@ -45,7 +47,22 @@ pub struct EngineApi { // remote_store: HashMap, } -impl EngineApi { +impl EngineApi { + /// Create new instance of [EngineApi]. + pub fn new( + client: Client, + chain_spec: ChainSpec, + message_rx: mpsc::UnboundedReceiver, + forkchoice_state_tx: watch::Sender, + ) -> Self { + Self { + client, + chain_spec, + message_rx: UnboundedReceiverStream::new(message_rx), + forkchoice_state_tx, + } + } + fn on_message(&mut self, msg: EngineApiMessage) { match msg { EngineApiMessage::GetPayload(payload_id, tx) => { @@ -285,14 +302,14 @@ impl EngineApi { })) } - let mut state_provider = SubState::new(State::new(&*self.client)); + let state_provider = self.client.latest()?; let total_difficulty = parent_td + block.header.difficulty; match executor::execute_and_verify_receipt( &block.unseal(), total_difficulty, None, &self.chain_spec, - &mut state_provider, + &mut SubState::new(State::new(&state_provider)), ) { Ok(_) => Ok(PayloadStatus::new(PayloadStatusEnum::Valid, block_hash)), Err(err) => Ok(PayloadStatus::new( @@ -394,7 +411,7 @@ impl EngineApi { impl Future for EngineApi where - Client: HeaderProvider + BlockProvider + StateProvider + Unpin, + Client: HeaderProvider + BlockProvider + StateProviderFactory + Unpin, { type Output = (); @@ -419,12 +436,13 @@ mod tests { use reth_interfaces::test_utils::generators::random_block; use reth_primitives::{H256, MAINNET}; use reth_provider::test_utils::MockEthProvider; + use std::sync::Arc; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedSender}, watch::Receiver as WatchReceiver, }; - fn setup_engine_api() -> (EngineApiTestHandle, EngineApi) { + fn setup_engine_api() -> (EngineApiTestHandle, EngineApi>) { let chain_spec = MAINNET.clone(); let client = Arc::new(MockEthProvider::default()); let (msg_tx, msg_rx) = unbounded_channel(); diff --git a/crates/rpc/rpc-engine-api/src/lib.rs b/crates/rpc/rpc-engine-api/src/lib.rs index daeab9bb2f..b97f1b6fac 100644 --- a/crates/rpc/rpc-engine-api/src/lib.rs +++ b/crates/rpc/rpc-engine-api/src/lib.rs @@ -17,6 +17,6 @@ mod message; /// Engine API error. mod error; -pub use engine_api::{EngineApi, EngineApiSender}; +pub use engine_api::{EngineApi, EngineApiHandle, EngineApiSender}; pub use error::*; pub use message::{EngineApiMessage, EngineApiMessageVersion}; diff --git a/crates/rpc/rpc/src/engine/mod.rs b/crates/rpc/rpc/src/engine/mod.rs index 677b658c4d..2b917d394b 100644 --- a/crates/rpc/rpc/src/engine/mod.rs +++ b/crates/rpc/rpc/src/engine/mod.rs @@ -8,22 +8,26 @@ use reth_interfaces::consensus::ForkchoiceState; use reth_primitives::{BlockHash, BlockNumber, H64}; use reth_rpc_api::EngineApiServer; use reth_rpc_engine_api::{ - EngineApiError, EngineApiMessage, EngineApiMessageVersion, EngineApiResult, + EngineApiError, EngineApiHandle, EngineApiMessage, EngineApiMessageVersion, EngineApiResult, REQUEST_TOO_LARGE_CODE, UNKNOWN_PAYLOAD_CODE, }; use reth_rpc_types::engine::{ ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, TransitionConfiguration, CAPABILITIES, }; -use tokio::sync::{ - mpsc::UnboundedSender, - oneshot::{self, Receiver}, -}; +use tokio::sync::oneshot::{self, Receiver}; /// The server implementation of Engine API pub struct EngineApi { /// Handle to the consensus engine - engine_tx: UnboundedSender, + engine_tx: EngineApiHandle, +} + +impl EngineApi { + /// Creates a new instance of [EngineApi]. + pub fn new(engine_tx: EngineApiHandle) -> Self { + Self { engine_tx } + } } impl std::fmt::Debug for EngineApi { diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 1c3c6ad022..9a82741e17 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -248,3 +248,23 @@ impl StateProviderFactory for MockEthProvider { todo!() } } + +impl StateProviderFactory for Arc { + type HistorySP<'a> = &'a MockEthProvider where Self: 'a; + type LatestSP<'a> = &'a MockEthProvider where Self: 'a; + + fn latest(&self) -> Result> { + Ok(self) + } + + fn history_by_block_number( + &self, + _block: reth_primitives::BlockNumber, + ) -> Result> { + todo!() + } + + fn history_by_block_hash(&self, _block: BlockHash) -> Result> { + todo!() + } +} diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index 2ecd3f202a..34a09589b3 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -38,6 +38,7 @@ tracing = "0.1" serde = { version = "1.0", features = ["derive", "rc"], optional = true } fnv = "1.0.7" bitflags = "1.3" +auto_impl = "1.0" # ruint # Using the uint! requires the crate to be imported diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index e9bec85652..94a4d35809 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -19,6 +19,7 @@ use serde::{Deserialize, Serialize}; /// Note: This requires `Clone` for convenience, since it is assumed that this will be implemented /// for a wrapped `Arc` type, see also [`Pool`](crate::Pool). #[async_trait::async_trait] +#[auto_impl::auto_impl(Arc)] pub trait TransactionPool: Send + Sync + Clone { /// The transaction type of the pool type Transaction: PoolTransaction;