From 08eae76bec0ab090720cbe361e229100db7731e3 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 14 Apr 2023 18:59:18 +0200 Subject: [PATCH] refactor: simplify engine Api (#2240) --- Cargo.lock | 6 +- bin/reth/src/args/rpc_server_args.rs | 11 +- bin/reth/src/node/mod.rs | 37 +- crates/consensus/beacon/src/engine/message.rs | 20 +- crates/consensus/beacon/src/engine/mod.rs | 45 +- crates/payload/builder/src/payload.rs | 32 +- crates/payload/builder/src/service.rs | 6 + crates/rpc/rpc-builder/src/auth.rs | 15 +- crates/rpc/rpc-engine-api/Cargo.toml | 7 +- crates/rpc/rpc-engine-api/src/engine_api.rs | 479 +++++++++++------- crates/rpc/rpc-engine-api/src/error.rs | 23 +- crates/rpc/rpc-engine-api/src/lib.rs | 4 +- crates/rpc/rpc-engine-api/src/message.rs | 33 -- crates/rpc/rpc-types/src/eth/engine/error.rs | 6 +- .../rpc/rpc-types/src/eth/engine/payload.rs | 6 +- crates/rpc/rpc/src/engine.rs | 229 +-------- 16 files changed, 405 insertions(+), 554 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a54c429ac2..fcbefd6db0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5164,17 +5164,19 @@ name = "reth-rpc-engine-api" version = "0.1.0" dependencies = [ "assert_matches", - "futures", + "async-trait", "jsonrpsee-core", "jsonrpsee-types", "reth-beacon-consensus", "reth-interfaces", + "reth-payload-builder", "reth-primitives", "reth-provider", + "reth-rpc-api", "reth-rpc-types", "thiserror", "tokio", - "tokio-stream", + "tracing", ] [[package]] diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 6b1e123994..c63bf111ad 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -4,7 +4,7 @@ use crate::dirs::{JwtSecretPath, PlatformPath}; use clap::Args; use jsonrpsee::server::ServerHandle; use reth_network_api::{NetworkInfo, Peers}; -use reth_primitives::ChainSpec; + use reth_provider::{ BlockProvider, CanonStateSubscriptions, EvmEnvProvider, HeaderProvider, StateProviderFactory, }; @@ -13,13 +13,12 @@ use reth_rpc_builder::{ constants, error::RpcError, IpcServerBuilder, RethRpcModule, RpcModuleSelection, RpcServerConfig, RpcServerHandle, ServerBuilder, TransportRpcModuleConfig, }; -use reth_rpc_engine_api::EngineApiHandle; +use reth_rpc_engine_api::EngineApi; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, - sync::Arc, }; /// Parameters for configuring the rpc more granularity via CLI @@ -156,8 +155,7 @@ impl RpcServerArgs { pool: Pool, network: Network, executor: Tasks, - chain_spec: Arc, - handle: EngineApiHandle, + engine_api: EngineApi, ) -> Result where Client: BlockProvider @@ -181,8 +179,7 @@ impl RpcServerArgs { pool, network, executor, - chain_spec, - handle, + engine_api, socket_address, secret, ) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 4a0be8041a..0c0778ce89 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -43,7 +43,7 @@ use reth_primitives::{BlockHashOrNumber, Chain, ChainSpec, Head, Header, SealedH use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; -use reth_rpc_engine_api::{EngineApi, EngineApiHandle}; +use reth_rpc_engine_api::EngineApi; use reth_staged_sync::{ utils::{ chainspec::genesis_value_parser, @@ -64,10 +64,7 @@ use std::{ path::PathBuf, sync::Arc, }; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedSender}, - oneshot, watch, -}; +use tokio::sync::{mpsc::unbounded_channel, oneshot, watch}; use tracing::*; use crate::dirs::MaybePlatformPath; @@ -307,12 +304,16 @@ impl Command { blockchain_tree.clone(), consensus_engine_rx, self.debug.max_block, - payload_builder, + payload_builder.clone(), ); info!(target: "reth::cli", "Consensus engine initialized"); - let engine_api_handle = - self.init_engine_api(Arc::clone(&db), consensus_engine_tx.clone(), &ctx.task_executor); + let engine_api = EngineApi::new( + ShareableDatabase::new(db, self.chain.clone()), + self.chain.clone(), + consensus_engine_tx.clone(), + payload_builder.into(), + ); info!(target: "reth::cli", "Engine API handler initialized"); let launch_rpc = self @@ -335,8 +336,7 @@ impl Command { transaction_pool.clone(), network.clone(), ctx.task_executor.clone(), - self.chain.clone(), - engine_api_handle, + engine_api, ) .inspect(|_| { info!(target: "reth::cli", "Started Auth server"); @@ -441,23 +441,6 @@ impl Command { Ok(()) } - fn init_engine_api( - &self, - db: Arc>, - engine_tx: UnboundedSender, - task_executor: &TaskExecutor, - ) -> EngineApiHandle { - let (message_tx, message_rx) = unbounded_channel(); - let engine_api = EngineApi::new( - ShareableDatabase::new(db, self.chain.clone()), - self.chain.clone(), - message_rx, - engine_tx, - ); - task_executor.spawn_critical("engine API task", engine_api); - message_tx - } - /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected /// to that network. async fn start_network( diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index c4b136fffd..e2eefbbc17 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -1,15 +1,12 @@ use crate::BeaconEngineResult; use reth_interfaces::consensus::ForkchoiceState; use reth_rpc_types::engine::{ - ExecutionPayload, ExecutionPayloadEnvelope, ForkchoiceUpdated, PayloadAttributes, PayloadId, - PayloadStatus, + ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, }; use tokio::sync::oneshot; -/// Beacon engine sender. -pub type BeaconEngineSender = oneshot::Sender>; - -/// A message for the beacon engine from other components of the node. +/// A message for the beacon engine from other components of the node (engine RPC API invoked by the +/// consensus layer). #[derive(Debug)] pub enum BeaconEngineMessage { /// Message with new payload. @@ -17,7 +14,7 @@ pub enum BeaconEngineMessage { /// The execution payload received by Engine API. payload: ExecutionPayload, /// The sender for returning payload status result. - tx: BeaconEngineSender, + tx: oneshot::Sender>, }, /// Message with updated forkchoice state. ForkchoiceUpdated { @@ -26,13 +23,6 @@ pub enum BeaconEngineMessage { /// The payload attributes for block building. payload_attrs: Option, /// The sender for returning forkchoice updated result. - tx: BeaconEngineSender, - }, - /// Message with get payload parameters. - GetPayload { - /// The payload id. - payload_id: PayloadId, - /// The sender for returning payload result. - tx: BeaconEngineSender, + tx: oneshot::Sender>, }, } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 0219f20f8c..1813b31ad8 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -12,8 +12,8 @@ use reth_metrics_derive::Metrics; use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; use reth_primitives::{BlockNumber, Header, SealedBlock, H256}; use reth_rpc_types::engine::{ - EngineRpcError, ExecutionPayload, ExecutionPayloadEnvelope, ForkchoiceUpdated, - PayloadAttributes, PayloadId, PayloadStatus, PayloadStatusEnum, + EngineRpcError, ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, + PayloadStatusEnum, }; use reth_stages::{stages::FINISH, Pipeline}; use reth_tasks::TaskSpawner; @@ -30,7 +30,7 @@ mod error; pub use error::{BeaconEngineError, BeaconEngineResult}; mod message; -pub use message::{BeaconEngineMessage, BeaconEngineSender}; +pub use message::BeaconEngineMessage; mod pipeline_state; pub use pipeline_state::PipelineState; @@ -45,8 +45,6 @@ struct Metrics { forkchoice_updated_messages: Counter, /// The total count of new payload messages received. new_payload_messages: Counter, - /// The total count of get payload messages received. - get_payload_messages: Counter, } /// The beacon consensus engine is the driver that switches between historical and live sync. @@ -276,25 +274,6 @@ where .with_payload_id(payload_id)) } - /// Called to receive the execution payload associated with a payload build process. - pub fn on_get_payload( - &self, - _payload_id: PayloadId, - ) -> Result { - // TODO: Client software SHOULD stop the updating process when either a call to - // engine_getPayload with the build process's payloadId is made or SECONDS_PER_SLOT (12s in - // the Mainnet configuration) have passed since the point in time identified by the - // timestamp parameter. - - // for now just return the output from the payload store - // match self.payload_builder.get_execution_payload(payload_id) { - // Some(payload) => Ok(payload), - // None => Err(EngineRpcError::UnknownPayload.into()), - // } - - todo!() - } - /// When the Consensus layer receives a new block via the consensus gossip protocol, /// the transactions in the block are sent to the execution layer in the form of a /// [`ExecutionPayload`]. The Execution layer executes the transactions and validates the @@ -484,24 +463,6 @@ where }; let _ = tx.send(Ok(response)); } - BeaconEngineMessage::GetPayload { payload_id, tx } => { - this.metrics.get_payload_messages.increment(1); - match this.on_get_payload(payload_id) { - Ok(response) => { - // good response, send it back - let _ = tx.send(Ok(response)); - } - Err(BeaconEngineError::EngineApi(error)) => { - // specific error that we should report back to the client - error!(target: "consensus::engine", ?error, "Sending engine api error response"); - let _ = tx.send(Err(BeaconEngineError::EngineApi(error))); - } - Err(error) => { - error!(target: "consensus::engine", ?error, "Error getting get payload response"); - return Poll::Ready(Err(error)) - } - }; - } } } diff --git a/crates/payload/builder/src/payload.rs b/crates/payload/builder/src/payload.rs index 13c3aff8ea..9d564d9cca 100644 --- a/crates/payload/builder/src/payload.rs +++ b/crates/payload/builder/src/payload.rs @@ -4,7 +4,9 @@ use reth_consensus_common::validation::calculate_next_block_base_fee; use reth_primitives::{Address, ChainSpec, Header, SealedBlock, Withdrawal, H256, U256}; use reth_revm_primitives::config::revm_spec_by_timestamp_after_merge; use reth_rlp::Encodable; -use reth_rpc_types::engine::{PayloadAttributes, PayloadId}; +use reth_rpc_types::engine::{ + ExecutionPayload, ExecutionPayloadEnvelope, PayloadAttributes, PayloadId, +}; use revm_primitives::{BlockEnv, CfgEnv}; /// Contains the built payload. @@ -12,7 +14,7 @@ use revm_primitives::{BlockEnv, CfgEnv}; /// According to the [engine API specification](https://github.com/ethereum/execution-apis/blob/main/src/engine/README.md) the execution layer should build the initial version of the payload with an empty transaction set and then keep update it in order to maximize the revenue. /// Therefore, the empty-block here is always available and full-block will be set/updated /// afterwards. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BuiltPayload { /// Identifier of the payload pub(crate) id: PayloadId, @@ -44,6 +46,32 @@ impl BuiltPayload { pub fn fees(&self) -> U256 { self.fees } + + /// Converts the type into the response expected by `engine_getPayloadV1` + pub fn into_v1_payload(self) -> ExecutionPayload { + self.into() + } + + /// Converts the type into the response expected by `engine_getPayloadV2` + pub fn into_v2_payload(self) -> ExecutionPayloadEnvelope { + self.into() + } +} + +// V1 engine_getPayloadV1 response +impl From for ExecutionPayload { + fn from(value: BuiltPayload) -> Self { + value.block.into() + } +} + +// V2 engine_getPayloadV2 response +impl From for ExecutionPayloadEnvelope { + fn from(value: BuiltPayload) -> Self { + let BuiltPayload { block, fees, .. } = value; + + ExecutionPayloadEnvelope { block_value: fees, payload: block.into() } + } } /// Container type for all components required to build a payload. diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 23a9dfec10..90e1344109 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -34,6 +34,12 @@ impl PayloadStore { } } +impl From for PayloadStore { + fn from(inner: PayloadBuilderHandle) -> Self { + Self { inner } + } +} + /// A communication channel to the [PayloadBuilderService]. /// /// This is the API used to create new payloads and to get the current state of existing ones. diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 811e34c17c..9c723f30fd 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -2,16 +2,15 @@ use crate::error::{RpcError, ServerKind}; pub use jsonrpsee::server::ServerBuilder; use jsonrpsee::server::{RpcModule, ServerHandle}; use reth_network_api::{NetworkInfo, Peers}; -use reth_primitives::ChainSpec; + use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{ eth::cache::EthStateCache, AuthLayer, EngineApi, EthApi, EthFilter, JwtAuthValidator, JwtSecret, }; use reth_rpc_api::servers::*; -use reth_rpc_engine_api::EngineApiHandle; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; /// Configure and launch an auth server with `engine` and a _new_ `eth` namespace. #[allow(clippy::too_many_arguments)] @@ -20,8 +19,7 @@ pub async fn launch( pool: Pool, network: Network, executor: Tasks, - chain_spec: Arc, - handle: EngineApiHandle, + engine_api: EngineApi, socket_addr: SocketAddr, secret: JwtSecret, ) -> Result @@ -41,15 +39,14 @@ where let eth_cache = EthStateCache::spawn_with(client.clone(), Default::default(), executor); let eth_api = EthApi::new(client.clone(), pool.clone(), network, eth_cache); let eth_filter = EthFilter::new(client, pool); - launch_with_eth_api(eth_api, chain_spec, eth_filter, handle, socket_addr, secret).await + launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await } /// Configure and launch an auth server with existing EthApi implementation. pub async fn launch_with_eth_api( eth_api: EthApi, - chain_spec: Arc, eth_filter: EthFilter, - handle: EngineApiHandle, + engine_api: EngineApi, socket_addr: SocketAddr, secret: JwtSecret, ) -> Result @@ -66,7 +63,7 @@ where { // Configure the module and start the server. let mut module = RpcModule::new(()); - module.merge(EngineApi::new(chain_spec, handle).into_rpc()).expect("No conflicting methods"); + module.merge(engine_api.into_rpc()).expect("No conflicting methods"); module.merge(eth_api.into_rpc()).expect("No conflicting methods"); module.merge(eth_filter.into_rpc()).expect("No conflicting methods"); diff --git a/crates/rpc/rpc-engine-api/Cargo.toml b/crates/rpc/rpc-engine-api/Cargo.toml index 67bcb17e9c..4b916c6361 100644 --- a/crates/rpc/rpc-engine-api/Cargo.toml +++ b/crates/rpc/rpc-engine-api/Cargo.toml @@ -12,19 +12,22 @@ reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } reth-provider = { path = "../../storage/provider" } reth-rpc-types = { path = "../rpc-types" } +reth-rpc-api = { path = "../rpc-api" } reth-beacon-consensus = { path = "../../consensus/beacon" } +reth-payload-builder = { path = "../../payload/builder" } # async -futures = "0.3" tokio = { version = "1", features = ["sync"] } -tokio-stream = "0.1" # misc +async-trait = "0.1" thiserror = "1.0.37" jsonrpsee-types = "0.16" jsonrpsee-core = "0.16" +tracing = "0.1" [dev-dependencies] reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } reth-provider = { path = "../../storage/provider", features = ["test-utils"] } +reth-payload-builder = { path = "../../payload/builder", features = ["test-utils"] } assert_matches = "1.5.0" diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 1d818e4b0b..d6bba4bf5c 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -1,23 +1,19 @@ -use crate::{EngineApiError, EngineApiMessage, EngineApiResult}; -use futures::StreamExt; +use crate::{EngineApiError, EngineApiMessageVersion, EngineApiResult}; +use async_trait::async_trait; +use jsonrpsee_core::RpcResult as Result; use reth_beacon_consensus::BeaconEngineMessage; -use reth_primitives::{BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork}; +use reth_interfaces::consensus::ForkchoiceState; +use reth_payload_builder::PayloadStore; +use reth_primitives::{BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, U64}; use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; -use reth_rpc_types::engine::{ExecutionPayloadBodies, TransitionConfiguration}; -use std::{ - future::Future, - pin::Pin, - sync::Arc, - task::{ready, Context, Poll}, +use reth_rpc_api::EngineApiServer; +use reth_rpc_types::engine::{ + ExecutionPayload, ExecutionPayloadBodies, ExecutionPayloadEnvelope, ForkchoiceUpdated, + PayloadAttributes, PayloadId, PayloadStatus, TransitionConfiguration, CAPABILITIES, }; -use tokio::sync::{ - mpsc::{self, UnboundedSender}, - oneshot, -}; -use tokio_stream::wrappers::UnboundedReceiverStream; - -/// The Engine API handle. -pub type EngineApiHandle = mpsc::UnboundedSender; +use std::sync::Arc; +use tokio::sync::{mpsc::UnboundedSender, oneshot}; +use tracing::trace; /// The Engine API response sender. pub type EngineApiSender = oneshot::Sender>; @@ -27,56 +23,119 @@ const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024; /// The Engine API implementation that grants the Consensus layer access to data and /// functions in the Execution layer that are crucial for the consensus process. -#[must_use = "EngineApi does nothing unless polled."] pub struct EngineApi { + /// The client to interact with the chain. client: Client, /// Consensus configuration chain_spec: Arc, - message_rx: UnboundedReceiverStream, - engine_tx: UnboundedSender, + /// The channel to send messages to the beacon consensus engine. + to_beacon_consensus: UnboundedSender, + /// The type that can communicate with the payload service to retrieve payloads. + payload_store: PayloadStore, } -impl - EngineApi +impl EngineApi +where + Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvider + 'static, { /// Create new instance of [EngineApi]. pub fn new( client: Client, chain_spec: Arc, - message_rx: mpsc::UnboundedReceiver, - engine_tx: UnboundedSender, + to_beacon_consensus: UnboundedSender, + payload_store: PayloadStore, ) -> Self { - Self { client, chain_spec, message_rx: UnboundedReceiverStream::new(message_rx), engine_tx } + Self { client, chain_spec, to_beacon_consensus, payload_store } } - fn on_message(&mut self, msg: EngineApiMessage) { - match msg { - EngineApiMessage::ExchangeTransitionConfiguration(config, tx) => { - let _ = tx.send(self.exchange_transition_configuration(config)); - } - EngineApiMessage::GetPayload(payload_id, tx) => { - // forward message to the consensus engine - let _ = self.engine_tx.send(BeaconEngineMessage::GetPayload { payload_id, tx }); - } - EngineApiMessage::GetPayloadBodiesByHash(hashes, tx) => { - let _ = tx.send(self.get_payload_bodies_by_hash(hashes)); - } - EngineApiMessage::GetPayloadBodiesByRange(start, count, tx) => { - let _ = tx.send(self.get_payload_bodies_by_range(start, count)); - } - EngineApiMessage::NewPayload(payload, tx) => { - // forward message to the consensus engine - let _ = self.engine_tx.send(BeaconEngineMessage::NewPayload { payload, tx }); - } - EngineApiMessage::ForkchoiceUpdated(state, payload_attrs, tx) => { - // forward message to the consensus engine - let _ = self.engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs, - tx, - }); - } + /// See also + /// Caution: This should not accept the `withdrawals` field + pub async fn new_payload_v1( + &self, + payload: ExecutionPayload, + ) -> EngineApiResult { + self.validate_withdrawals_presence( + EngineApiMessageVersion::V1, + payload.timestamp.as_u64(), + payload.withdrawals.is_some(), + )?; + let (tx, rx) = oneshot::channel(); + self.to_beacon_consensus.send(BeaconEngineMessage::NewPayload { payload, tx })?; + Ok(rx.await??) + } + + /// See also + pub async fn new_payload_v2( + &self, + payload: ExecutionPayload, + ) -> EngineApiResult { + self.validate_withdrawals_presence( + EngineApiMessageVersion::V2, + payload.timestamp.as_u64(), + payload.withdrawals.is_some(), + )?; + let (tx, rx) = oneshot::channel(); + self.to_beacon_consensus.send(BeaconEngineMessage::NewPayload { payload, tx })?; + Ok(rx.await??) + } + + /// See also + /// + /// Caution: This should not accept the `withdrawals` field + pub async fn fork_choice_updated_v1( + &self, + state: ForkchoiceState, + payload_attrs: Option, + ) -> EngineApiResult { + if let Some(ref attrs) = payload_attrs { + self.validate_withdrawals_presence( + EngineApiMessageVersion::V1, + attrs.timestamp.as_u64(), + attrs.withdrawals.is_some(), + )?; } + let (tx, rx) = oneshot::channel(); + self.to_beacon_consensus.send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs, + tx, + })?; + Ok(rx.await??) + } + + /// Returns the most recent version of the payload that is available in the corresponding + /// payload build process at the time of receiving this call. + /// + /// See also + /// + /// Caution: This should not return the `withdrawals` field + /// + /// Note: + /// > Client software MAY stop the corresponding build process after serving this call. + pub async fn get_payload_v1(&self, payload_id: PayloadId) -> EngineApiResult { + self.payload_store + .get_payload(payload_id) + .await + .map(|payload| (*payload).clone().into_v1_payload()) + .ok_or(EngineApiError::UnknownPayload) + } + + /// Returns the most recent version of the payload that is available in the corresponding + /// payload build process at the time of receiving this call. + /// + /// See also + /// + /// Note: + /// > Client software MAY stop the corresponding build process after serving this call. + async fn get_payload_v2( + &self, + payload_id: PayloadId, + ) -> EngineApiResult { + self.payload_store + .get_payload(payload_id) + .await + .map(|payload| (*payload).clone().into_v2_payload()) + .ok_or(EngineApiError::UnknownPayload) } /// Called to retrieve execution payload bodies by range. @@ -180,25 +239,156 @@ impl Future for EngineApi -where - Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvider + Unpin, -{ - type Output = (); + /// Validates the presence of the `withdrawals` field according to the payload timestamp. + /// After Shanghai, withdrawals field must be [Some]. + /// Before Shanghai, withdrawals field must be [None]; + fn validate_withdrawals_presence( + &self, + version: EngineApiMessageVersion, + timestamp: u64, + has_withdrawals: bool, + ) -> EngineApiResult<()> { + let is_shanghai = self.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - loop { - match ready!(this.message_rx.poll_next_unpin(cx)) { - Some(msg) => this.on_message(msg), - None => { - // channel closed - return Poll::Ready(()) + match version { + EngineApiMessageVersion::V1 => { + if has_withdrawals { + return Err(EngineApiError::WithdrawalsNotSupportedInV1) + } + if is_shanghai { + return Err(EngineApiError::NoWithdrawalsPostShanghai) } } - } + EngineApiMessageVersion::V2 => { + if is_shanghai && !has_withdrawals { + return Err(EngineApiError::NoWithdrawalsPostShanghai) + } + if !is_shanghai && has_withdrawals { + return Err(EngineApiError::HasWithdrawalsPreShanghai) + } + } + }; + + Ok(()) + } +} + +#[async_trait] +impl EngineApiServer for EngineApi +where + Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvider + 'static, +{ + /// Handler for `engine_newPayloadV1` + /// See also + /// Caution: This should not accept the `withdrawals` field + async fn new_payload_v1(&self, payload: ExecutionPayload) -> Result { + trace!(target: "rpc::eth", "Serving engine_newPayloadV1"); + Ok(EngineApi::new_payload_v1(self, payload).await?) + } + + /// Handler for `engine_newPayloadV1` + /// See also + async fn new_payload_v2(&self, payload: ExecutionPayload) -> Result { + trace!(target: "rpc::eth", "Serving engine_newPayloadV1"); + Ok(EngineApi::new_payload_v2(self, payload).await?) + } + + /// Handler for `engine_forkchoiceUpdatedV1` + /// See also + /// + /// Caution: This should not accept the `withdrawals` field + async fn fork_choice_updated_v1( + &self, + fork_choice_state: ForkchoiceState, + payload_attributes: Option, + ) -> Result { + trace!(target: "rpc::eth", "Serving engine_forkchoiceUpdatedV1"); + Ok(EngineApi::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await?) + } + + /// Handler for `engine_forkchoiceUpdatedV2` + /// See also + async fn fork_choice_updated_v2( + &self, + fork_choice_state: ForkchoiceState, + payload_attributes: Option, + ) -> Result { + trace!(target: "rpc::eth", "Serving engine_forkchoiceUpdatedV2"); + Ok(EngineApi::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await?) + } + + /// Handler for `engine_getPayloadV1` + /// + /// Returns the most recent version of the payload that is available in the corresponding + /// payload build process at the time of receiving this call. + /// + /// See also + /// + /// Caution: This should not return the `withdrawals` field + /// + /// Note: + /// > Client software MAY stop the corresponding build process after serving this call. + async fn get_payload_v1(&self, payload_id: PayloadId) -> Result { + trace!(target: "rpc::eth", "Serving engine_getPayloadV1"); + Ok(EngineApi::get_payload_v1(self, payload_id).await?) + } + + /// Handler for `engine_getPayloadV2` + /// + /// Returns the most recent version of the payload that is available in the corresponding + /// payload build process at the time of receiving this call. + /// + /// See also + /// + /// Note: + /// > Client software MAY stop the corresponding build process after serving this call. + async fn get_payload_v2(&self, payload_id: PayloadId) -> Result { + trace!(target: "rpc::eth", "Serving engine_getPayloadV2"); + Ok(EngineApi::get_payload_v2(self, payload_id).await?) + } + + /// Handler for `engine_getPayloadBodiesByHashV1` + /// See also + async fn get_payload_bodies_by_hash_v1( + &self, + block_hashes: Vec, + ) -> Result { + trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1"); + Ok(EngineApi::get_payload_bodies_by_hash(self, block_hashes)?) + } + + /// Handler for `engine_getPayloadBodiesByRangeV1` + /// See also + async fn get_payload_bodies_by_range_v1( + &self, + start: U64, + count: U64, + ) -> Result { + trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1"); + Ok(EngineApi::get_payload_bodies_by_range(self, start.as_u64(), count.as_u64())?) + } + + /// Handler for `engine_exchangeTransitionConfigurationV1` + /// See also + async fn exchange_transition_configuration( + &self, + config: TransitionConfiguration, + ) -> Result { + trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1"); + Ok(EngineApi::exchange_transition_configuration(self, config)?) + } + + /// Handler for `engine_exchangeCapabilitiesV1` + /// See also + async fn exchange_capabilities(&self, _capabilities: Vec) -> Result> { + Ok(CAPABILITIES.into_iter().map(str::to_owned).collect()) + } +} + +impl std::fmt::Debug for EngineApi { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EngineApi").finish_non_exhaustive() } } @@ -206,62 +396,42 @@ where mod tests { use super::*; use assert_matches::assert_matches; - use reth_interfaces::{consensus::ForkchoiceState, test_utils::generators::random_block}; + use reth_interfaces::test_utils::generators::random_block; + use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{SealedBlock, H256, MAINNET}; use reth_provider::test_utils::MockEthProvider; use std::sync::Arc; - use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; fn setup_engine_api() -> (EngineApiTestHandle, EngineApi>) { let chain_spec = Arc::new(MAINNET.clone()); let client = Arc::new(MockEthProvider::default()); - let (msg_tx, msg_rx) = unbounded_channel(); - let (engine_tx, engine_rx) = mpsc::unbounded_channel(); - let api = EngineApi { - client: client.clone(), - chain_spec: chain_spec.clone(), - message_rx: UnboundedReceiverStream::new(msg_rx), - engine_tx, - }; - let handle = EngineApiTestHandle { chain_spec, client, msg_tx, engine_rx }; + let payload_store = spawn_test_payload_service(); + let (to_beacon_consensus, engine_rx) = unbounded_channel(); + let api = EngineApi::new( + client.clone(), + chain_spec.clone(), + to_beacon_consensus, + payload_store.into(), + ); + let handle = EngineApiTestHandle { chain_spec, client, from_api: engine_rx }; (handle, api) } struct EngineApiTestHandle { chain_spec: Arc, client: Arc, - msg_tx: UnboundedSender, - engine_rx: UnboundedReceiver, - } - - impl EngineApiTestHandle { - fn send_message(&self, msg: EngineApiMessage) { - self.msg_tx.send(msg).expect("failed to send engine msg"); - } + from_api: UnboundedReceiver, } #[tokio::test] async fn forwards_responses_to_consensus_engine() { let (mut handle, api) = setup_engine_api(); - tokio::spawn(api); - let (result_tx, _result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::NewPayload(SealedBlock::default().into(), result_tx)); - assert_matches!( - handle.engine_rx.recv().await, - Some(BeaconEngineMessage::NewPayload { .. }) - ); - - let (result_tx, _result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ForkchoiceUpdated( - ForkchoiceState::default(), - None, - result_tx, - )); - assert_matches!( - handle.engine_rx.recv().await, - Some(BeaconEngineMessage::ForkchoiceUpdated { .. }) - ); + tokio::spawn(async move { + api.new_payload_v1(SealedBlock::default().into()).await.unwrap(); + }); + assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. })); } // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash` @@ -271,8 +441,7 @@ mod tests { #[tokio::test] async fn invalid_params() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); + let (_, api) = setup_engine_api(); let by_range_tests = [ // (start, count) @@ -283,45 +452,23 @@ mod tests { // test [EngineApiMessage::GetPayloadBodiesByRange] for (start, count) in by_range_tests { - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::GetPayloadBodiesByRange( - start, count, result_tx, - )); - assert_matches!( - result_rx.await, - Ok(Err(EngineApiError::InvalidBodiesRange { .. })) - ); + let res = api.get_payload_bodies_by_range(start, count); + assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. })); } } #[tokio::test] async fn request_too_large() { - let (handle, api) = setup_engine_api(); - tokio::spawn(api); + let (_, api) = setup_engine_api(); let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1; - - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::GetPayloadBodiesByRange( - 0, - request_count, - result_tx, - )); - assert_matches!( - result_rx.await, - Ok(Err(EngineApiError::PayloadRequestTooLarge { .. })) - ); - - let (result_tx, result_rx) = oneshot::channel(); - let hashes = std::iter::repeat(H256::default()).take(request_count as usize).collect(); - handle.send_message(EngineApiMessage::GetPayloadBodiesByHash(hashes, result_tx)); - assert_matches!(result_rx.await, Ok(Err(EngineApiError::PayloadRequestTooLarge { .. }))) + let res = api.get_payload_bodies_by_range(0, request_count); + assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. })); } #[tokio::test] async fn returns_payload_bodies() { let (handle, api) = setup_engine_api(); - tokio::spawn(api); let (start, count) = (1, 10); let blocks = random_block_range(start..start + count, H256::default(), 0..2); @@ -330,20 +477,13 @@ mod tests { let expected = blocks.iter().cloned().map(|b| Some(b.unseal().into())).collect::>(); - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::GetPayloadBodiesByRange(start, count, result_tx)); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected)); - - let (result_tx, result_rx) = oneshot::channel(); - let hashes = blocks.iter().map(|b| b.hash()).collect(); - handle.send_message(EngineApiMessage::GetPayloadBodiesByHash(hashes, result_tx)); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected)); + let res = api.get_payload_bodies_by_range(start, count).unwrap(); + assert_eq!(res, expected); } #[tokio::test] async fn returns_payload_bodies_with_gaps() { let (handle, api) = setup_engine_api(); - tokio::spawn(api); let (start, count) = (1, 100); let blocks = random_block_range(start..start + count, H256::default(), 0..2); @@ -375,14 +515,12 @@ mod tests { }) .collect::>(); - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::GetPayloadBodiesByRange(start, count, result_tx)); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected)); + let res = api.get_payload_bodies_by_range(start, count).unwrap(); + assert_eq!(res, expected); - let (result_tx, result_rx) = oneshot::channel(); let hashes = blocks.iter().map(|b| b.hash()).collect(); - handle.send_message(EngineApiMessage::GetPayloadBodiesByHash(hashes, result_tx)); - assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected)); + let res = api.get_payload_bodies_by_hash(hashes).unwrap(); + assert_eq!(res, expected); } } @@ -394,7 +532,6 @@ mod tests { #[tokio::test] async fn terminal_td_mismatch() { let (handle, api) = setup_engine_api(); - tokio::spawn(api); let transition_config = TransitionConfiguration { terminal_total_difficulty: handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() + @@ -402,15 +539,11 @@ mod tests { ..Default::default() }; - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )); + let res = api.exchange_transition_configuration(transition_config.clone()); assert_matches!( - result_rx.await, - Ok(Err(EngineApiError::TerminalTD { execution, consensus })) + res, + Err(EngineApiError::TerminalTD { execution, consensus }) if execution == handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() && consensus == U256::from(transition_config.terminal_total_difficulty) ); } @@ -418,7 +551,6 @@ mod tests { #[tokio::test] async fn terminal_block_hash_mismatch() { let (handle, api) = setup_engine_api(); - tokio::spawn(api); let terminal_block_number = 1000; let consensus_terminal_block = random_block(terminal_block_number, None, None, None); @@ -431,14 +563,11 @@ mod tests { }; // Unknown block number - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )); + let res = api.exchange_transition_configuration(transition_config.clone()); + assert_matches!( - result_rx.await, - Ok(Err(EngineApiError::TerminalBlockHash { execution, consensus })) + res, + Err(EngineApiError::TerminalBlockHash { execution, consensus }) if execution.is_none() && consensus == transition_config.terminal_block_hash ); @@ -448,15 +577,11 @@ mod tests { execution_terminal_block.clone().unseal(), ); - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )); + let res = api.exchange_transition_configuration(transition_config.clone()); assert_matches!( - result_rx.await, - Ok(Err(EngineApiError::TerminalBlockHash { execution, consensus })) + res, + Err(EngineApiError::TerminalBlockHash { execution, consensus }) if execution == Some(execution_terminal_block.hash()) && consensus == transition_config.terminal_block_hash ); } @@ -464,7 +589,6 @@ mod tests { #[tokio::test] async fn configurations_match() { let (handle, api) = setup_engine_api(); - tokio::spawn(api); let terminal_block_number = 1000; let terminal_block = random_block(terminal_block_number, None, None, None); @@ -475,15 +599,10 @@ mod tests { terminal_block_number: terminal_block_number.into(), }; - handle.client.add_block(terminal_block.hash(), terminal_block.clone().unseal()); + handle.client.add_block(terminal_block.hash(), terminal_block.unseal()); - let (result_tx, result_rx) = oneshot::channel(); - handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )); - - assert_matches!(result_rx.await, Ok(Ok(config)) => assert_eq!(config, transition_config)); + let config = api.exchange_transition_configuration(transition_config.clone()).unwrap(); + assert_eq!(config, transition_config); } } } diff --git a/crates/rpc/rpc-engine-api/src/error.rs b/crates/rpc/rpc-engine-api/src/error.rs index 9b5aa52bd7..af34b0d8c2 100644 --- a/crates/rpc/rpc-engine-api/src/error.rs +++ b/crates/rpc/rpc-engine-api/src/error.rs @@ -2,6 +2,7 @@ use jsonrpsee_types::error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE}; use reth_beacon_consensus::BeaconEngineError; use reth_primitives::{H256, U256}; use thiserror::Error; +use tokio::sync::{mpsc, oneshot}; /// The Engine API result type pub type EngineApiResult = Result; @@ -12,11 +13,13 @@ pub const UNKNOWN_PAYLOAD_CODE: i32 = -38001; pub const REQUEST_TOO_LARGE_CODE: i32 = -38004; /// Error returned by [`EngineApi`][crate::EngineApi] +/// +/// Note: This is a high fidelity error type which can be converted to an RPC error that adheres to the spec: #[derive(Error, Debug)] pub enum EngineApiError { /// Unknown payload requested. #[error("Unknown payload")] - PayloadUnknown, + UnknownPayload, /// The payload body request length is too large. #[error("Payload request too large: {len}")] PayloadRequestTooLarge { @@ -66,6 +69,21 @@ pub enum EngineApiError { /// Encountered an internal error. #[error(transparent)] Internal(Box), + /// Failed to send message due ot closed channel + #[error("Closed channel")] + ChannelClosed, +} + +impl From> for EngineApiError { + fn from(_: mpsc::error::SendError) -> Self { + EngineApiError::ChannelClosed + } +} + +impl From for EngineApiError { + fn from(_: oneshot::error::RecvError) -> Self { + EngineApiError::ChannelClosed + } } impl From for jsonrpsee_types::error::CallError { @@ -75,13 +93,14 @@ impl From for jsonrpsee_types::error::CallError { EngineApiError::WithdrawalsNotSupportedInV1 | EngineApiError::NoWithdrawalsPostShanghai | EngineApiError::HasWithdrawalsPreShanghai => INVALID_PARAMS_CODE, - EngineApiError::PayloadUnknown => UNKNOWN_PAYLOAD_CODE, + EngineApiError::UnknownPayload => UNKNOWN_PAYLOAD_CODE, EngineApiError::PayloadRequestTooLarge { .. } => REQUEST_TOO_LARGE_CODE, // Any other server error _ => INTERNAL_ERROR_CODE, }; jsonrpsee_types::error::CallError::Custom(jsonrpsee_types::error::ErrorObject::owned( code, + // TODO properly convert to rpc error error.to_string(), None::<()>, )) diff --git a/crates/rpc/rpc-engine-api/src/lib.rs b/crates/rpc/rpc-engine-api/src/lib.rs index bf6a568eac..49d08e4eb3 100644 --- a/crates/rpc/rpc-engine-api/src/lib.rs +++ b/crates/rpc/rpc-engine-api/src/lib.rs @@ -17,6 +17,6 @@ mod message; /// Engine API error. mod error; -pub use engine_api::{EngineApi, EngineApiHandle, EngineApiSender}; +pub use engine_api::{EngineApi, EngineApiSender}; pub use error::*; -pub use message::{EngineApiMessage, EngineApiMessageVersion}; +pub use message::EngineApiMessageVersion; diff --git a/crates/rpc/rpc-engine-api/src/message.rs b/crates/rpc/rpc-engine-api/src/message.rs index c748d7075f..9d000ed744 100644 --- a/crates/rpc/rpc-engine-api/src/message.rs +++ b/crates/rpc/rpc-engine-api/src/message.rs @@ -1,36 +1,3 @@ -use crate::EngineApiSender; -use reth_beacon_consensus::BeaconEngineSender; -use reth_interfaces::consensus::ForkchoiceState; -use reth_primitives::{BlockHash, BlockNumber}; -use reth_rpc_types::engine::{ - ExecutionPayload, ExecutionPayloadBodies, ExecutionPayloadEnvelope, ForkchoiceUpdated, - PayloadAttributes, PayloadId, PayloadStatus, TransitionConfiguration, -}; - -/// Message type for communicating with [`EngineApi`][crate::EngineApi]. -#[derive(Debug)] -pub enum EngineApiMessage { - /// Get payload message - GetPayload(PayloadId, BeaconEngineSender), - /// Get payload bodies by range message - GetPayloadBodiesByRange(BlockNumber, u64, EngineApiSender), - /// Get payload bodies by hash message - GetPayloadBodiesByHash(Vec, EngineApiSender), - /// Exchange transition configuration message - ExchangeTransitionConfiguration( - TransitionConfiguration, - EngineApiSender, - ), - /// New payload message - NewPayload(ExecutionPayload, BeaconEngineSender), - /// Forkchoice updated message - ForkchoiceUpdated( - ForkchoiceState, - Option, - BeaconEngineSender, - ), -} - /// The version of Engine API message. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EngineApiMessageVersion { diff --git a/crates/rpc/rpc-types/src/eth/engine/error.rs b/crates/rpc/rpc-types/src/eth/engine/error.rs index b028b4dd80..36feba4dfd 100644 --- a/crates/rpc/rpc-types/src/eth/engine/error.rs +++ b/crates/rpc/rpc-types/src/eth/engine/error.rs @@ -1,6 +1,10 @@ //! Commonly used errors for the `engine_` namespace. -/// List of Engine API errors, see +/// List of Engine API errors used in RPC, see +/// +/// Note: These are all errors that can be returned by the `engine_` namespace in the error case. +/// +/// TODO: get rid of this #[derive(Debug, Copy, PartialEq, Eq, Clone, thiserror::Error)] pub enum EngineRpcError { /// Invalid JSON was received by the server. diff --git a/crates/rpc/rpc-types/src/eth/engine/payload.rs b/crates/rpc/rpc-types/src/eth/engine/payload.rs index 7ddea04a15..4451149310 100644 --- a/crates/rpc/rpc-types/src/eth/engine/payload.rs +++ b/crates/rpc/rpc-types/src/eth/engine/payload.rs @@ -7,6 +7,9 @@ use reth_primitives::{ use reth_rlp::{Decodable, Encodable}; use serde::{ser::SerializeMap, Deserialize, Serialize, Serializer}; +/// The execution payload body response that allows for `null` values. +pub type ExecutionPayloadBodies = Vec>; + /// And 8-byte identifier for an execution payload. #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] pub struct PayloadId(H64); @@ -228,9 +231,6 @@ impl From for ExecutionPayloadBody { } } -/// The execution payload body response that allows for `null` values. -pub type ExecutionPayloadBodies = Vec>; - /// This structure contains the attributes required to initiate a payload build process in the /// context of an `engine_forkchoiceUpdated` call. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] diff --git a/crates/rpc/rpc/src/engine.rs b/crates/rpc/rpc/src/engine.rs index 4c2332b928..e9d286b421 100644 --- a/crates/rpc/rpc/src/engine.rs +++ b/crates/rpc/rpc/src/engine.rs @@ -1,227 +1,2 @@ -use async_trait::async_trait; -use jsonrpsee::core::{Error, RpcResult as Result}; -use reth_interfaces::consensus::ForkchoiceState; -use reth_primitives::{BlockHash, ChainSpec, Hardfork, U64}; -use reth_rpc_api::EngineApiServer; -use reth_rpc_engine_api::{ - EngineApiError, EngineApiHandle, EngineApiMessage, EngineApiMessageVersion, EngineApiResult, -}; -use reth_rpc_types::engine::{ - ExecutionPayload, ExecutionPayloadBodies, ExecutionPayloadEnvelope, ForkchoiceUpdated, - PayloadAttributes, PayloadId, PayloadStatus, TransitionConfiguration, CAPABILITIES, -}; -use std::sync::Arc; -use tokio::sync::oneshot::{self, Receiver}; - -/// The server implementation of Engine API -pub struct EngineApi { - /// Chain spec - chain_spec: Arc, - /// Handle to the engine API implementation. - engine_tx: EngineApiHandle, -} - -impl EngineApi { - /// Creates a new instance of [EngineApi]. - pub fn new(chain_spec: Arc, engine_tx: EngineApiHandle) -> Self { - Self { chain_spec, engine_tx } - } -} - -impl std::fmt::Debug for EngineApi { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("EngineApi").finish_non_exhaustive() - } -} - -impl EngineApi { - /// Validates the presence of the `withdrawals` field according to the payload timestamp. - /// After Shanghai, withdrawals field must be [Some]. - /// Before Shanghai, withdrawals field must be [None]; - fn validate_withdrawals_presence( - &self, - version: EngineApiMessageVersion, - timestamp: u64, - has_withdrawals: bool, - ) -> EngineApiResult<()> { - let is_shanghai = self.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp); - - match version { - EngineApiMessageVersion::V1 => { - if has_withdrawals { - return Err(EngineApiError::WithdrawalsNotSupportedInV1) - } - if is_shanghai { - return Err(EngineApiError::NoWithdrawalsPostShanghai) - } - } - EngineApiMessageVersion::V2 => { - if is_shanghai && !has_withdrawals { - return Err(EngineApiError::NoWithdrawalsPostShanghai) - } - if !is_shanghai && has_withdrawals { - return Err(EngineApiError::HasWithdrawalsPreShanghai) - } - } - }; - - Ok(()) - } - - async fn delegate_request>( - &self, - msg: EngineApiMessage, - rx: Receiver>, - ) -> Result { - let _ = self.engine_tx.send(msg); - Ok(rx.await.map_err(|err| Error::Custom(err.to_string()))?.map_err(Into::into)?) - } -} - -#[async_trait] -impl EngineApiServer for EngineApi { - /// Handler for `engine_newPayloadV1` - /// See also - /// Caution: This should not accept the `withdrawals` field - async fn new_payload_v1(&self, payload: ExecutionPayload) -> Result { - self.validate_withdrawals_presence( - EngineApiMessageVersion::V1, - payload.timestamp.as_u64(), - payload.withdrawals.is_some(), - )?; - let (tx, rx) = oneshot::channel(); - self.delegate_request(EngineApiMessage::NewPayload(payload, tx), rx).await - } - - /// Handler for `engine_newPayloadV1` - /// See also - async fn new_payload_v2(&self, payload: ExecutionPayload) -> Result { - self.validate_withdrawals_presence( - EngineApiMessageVersion::V2, - payload.timestamp.as_u64(), - payload.withdrawals.is_some(), - )?; - let (tx, rx) = oneshot::channel(); - self.delegate_request(EngineApiMessage::NewPayload(payload, tx), rx).await - } - - /// Handler for `engine_forkchoiceUpdatedV1` - /// See also - /// - /// Caution: This should not accept the `withdrawals` field - async fn fork_choice_updated_v1( - &self, - fork_choice_state: ForkchoiceState, - payload_attributes: Option, - ) -> Result { - if let Some(ref attrs) = payload_attributes { - self.validate_withdrawals_presence( - EngineApiMessageVersion::V1, - attrs.timestamp.as_u64(), - attrs.withdrawals.is_some(), - )?; - } - let (tx, rx) = oneshot::channel(); - self.delegate_request( - EngineApiMessage::ForkchoiceUpdated(fork_choice_state, payload_attributes, tx), - rx, - ) - .await - } - - /// Handler for `engine_forkchoiceUpdatedV2` - /// See also - async fn fork_choice_updated_v2( - &self, - fork_choice_state: ForkchoiceState, - payload_attributes: Option, - ) -> Result { - if let Some(ref attrs) = payload_attributes { - self.validate_withdrawals_presence( - EngineApiMessageVersion::V2, - attrs.timestamp.as_u64(), - attrs.withdrawals.is_some(), - )?; - } - let (tx, rx) = oneshot::channel(); - self.delegate_request( - EngineApiMessage::ForkchoiceUpdated(fork_choice_state, payload_attributes, tx), - rx, - ) - .await - } - - /// Handler for `engine_getPayloadV1` - /// - /// Returns the most recent version of the payload that is available in the corresponding - /// payload build process at the time of receiving this call. - /// - /// See also - /// - /// Caution: This should not return the `withdrawals` field - /// - /// Note: - /// > Client software MAY stop the corresponding build process after serving this call. - async fn get_payload_v1(&self, payload_id: PayloadId) -> Result { - let (tx, rx) = oneshot::channel(); - self.delegate_request(EngineApiMessage::GetPayload(payload_id, tx), rx) - .await - .map(ExecutionPayloadEnvelope::into_v1_payload) - } - - /// Handler for `engine_getPayloadV2` - /// - /// Returns the most recent version of the payload that is available in the corresponding - /// payload build process at the time of receiving this call. - /// - /// See also - /// - /// Note: - /// > Client software MAY stop the corresponding build process after serving this call. - async fn get_payload_v2(&self, payload_id: PayloadId) -> Result { - let (tx, rx) = oneshot::channel(); - self.delegate_request(EngineApiMessage::GetPayload(payload_id, tx), rx).await - } - - /// Handler for `engine_getPayloadBodiesByHashV1` - /// See also - async fn get_payload_bodies_by_hash_v1( - &self, - block_hashes: Vec, - ) -> Result { - let (tx, rx) = oneshot::channel(); - self.delegate_request(EngineApiMessage::GetPayloadBodiesByHash(block_hashes, tx), rx).await - } - - /// Handler for `engine_getPayloadBodiesByRangeV1` - /// See also - async fn get_payload_bodies_by_range_v1( - &self, - start: U64, - count: U64, - ) -> Result { - let (tx, rx) = oneshot::channel(); - self.delegate_request( - EngineApiMessage::GetPayloadBodiesByRange(start.as_u64(), count.as_u64(), tx), - rx, - ) - .await - } - - /// Handler for `engine_exchangeTransitionConfigurationV1` - /// See also - async fn exchange_transition_configuration( - &self, - config: TransitionConfiguration, - ) -> Result { - let (tx, rx) = oneshot::channel(); - self.delegate_request(EngineApiMessage::ExchangeTransitionConfiguration(config, tx), rx) - .await - } - - /// Handler for `engine_exchangeCapabilitiesV1` - /// See also - async fn exchange_capabilities(&self, _capabilities: Vec) -> Result> { - Ok(CAPABILITIES.into_iter().map(str::to_owned).collect()) - } -} +/// Re-export for convenience +pub use reth_rpc_engine_api::EngineApi;