diff --git a/Cargo.lock b/Cargo.lock index e8b62e2646..8bb5dd0105 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5736,6 +5736,7 @@ dependencies = [ "reth-provider", "reth-rpc-api", "reth-rpc-types", + "reth-tasks", "thiserror", "tokio", "tracing", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 529d67713a..13fd00d78e 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -400,6 +400,7 @@ impl Command { self.chain.clone(), beacon_engine_handle, payload_builder.into(), + Box::new(ctx.task_executor.clone()), ); info!(target: "reth::cli", "Engine API handler initialized"); diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index e7c4a4eb88..0b111b33cc 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -30,6 +30,7 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle { MAINNET.clone(), beacon_engine_handle, spawn_test_payload_service().into(), + Box::new(TokioTaskExecutor::default()), ); let module = AuthRpcModule::new(engine_api); module.start_server(config).await.unwrap() diff --git a/crates/rpc/rpc-engine-api/Cargo.toml b/crates/rpc/rpc-engine-api/Cargo.toml index 0f417a3a65..219edaca6f 100644 --- a/crates/rpc/rpc-engine-api/Cargo.toml +++ b/crates/rpc/rpc-engine-api/Cargo.toml @@ -17,6 +17,7 @@ reth-rpc-types = { workspace = true } reth-rpc-api = { path = "../rpc-api" } reth-beacon-consensus = { path = "../../consensus/beacon" } reth-payload-builder = { workspace = true } +reth-tasks = { workspace = true } # async tokio = { workspace = true, features = ["sync"] } diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 4803c4d2d7..f461f4928d 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -11,6 +11,7 @@ use reth_rpc_types::engine::{ ExecutionPayload, ExecutionPayloadBodies, ExecutionPayloadEnvelope, ForkchoiceUpdated, PayloadAttributes, PayloadId, PayloadStatus, TransitionConfiguration, CAPABILITIES, }; +use reth_tasks::TaskSpawner; use std::sync::Arc; use tokio::sync::oneshot; use tracing::trace; @@ -24,6 +25,10 @@ const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024; /// The Engine API implementation that grants the Consensus layer access to data and /// functions in the Execution layer that are crucial for the consensus process. pub struct EngineApi { + inner: Arc>, +} + +struct EngineApiInner { /// The provider to interact with the chain. provider: Provider, /// Consensus configuration @@ -32,6 +37,8 @@ pub struct EngineApi { beacon_consensus: BeaconConsensusEngineHandle, /// The type that can communicate with the payload service to retrieve payloads. payload_store: PayloadStore, + /// For spawning and executing async tasks + task_spawner: Box, } impl EngineApi @@ -44,8 +51,16 @@ where chain_spec: Arc, beacon_consensus: BeaconConsensusEngineHandle, payload_store: PayloadStore, + task_spawner: Box, ) -> Self { - Self { provider, chain_spec, beacon_consensus, payload_store } + let inner = Arc::new(EngineApiInner { + provider, + chain_spec, + beacon_consensus, + payload_store, + task_spawner, + }); + Self { inner } } /// See also @@ -59,7 +74,7 @@ where payload.timestamp.as_u64(), payload.withdrawals.is_some(), )?; - Ok(self.beacon_consensus.new_payload(payload).await?) + Ok(self.inner.beacon_consensus.new_payload(payload).await?) } /// See also @@ -72,7 +87,7 @@ where payload.timestamp.as_u64(), payload.withdrawals.is_some(), )?; - Ok(self.beacon_consensus.new_payload(payload).await?) + Ok(self.inner.beacon_consensus.new_payload(payload).await?) } /// Sends a message to the beacon consensus engine to update the fork choice _without_ @@ -93,7 +108,7 @@ where attrs.withdrawals.is_some(), )?; } - Ok(self.beacon_consensus.fork_choice_updated(state, payload_attrs).await?) + Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?) } /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals, @@ -112,7 +127,7 @@ where attrs.withdrawals.is_some(), )?; } - Ok(self.beacon_consensus.fork_choice_updated(state, payload_attrs).await?) + Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?) } /// Returns the most recent version of the payload that is available in the corresponding @@ -126,6 +141,7 @@ where /// > Provider software MAY stop the corresponding build process after serving this call. pub async fn get_payload_v1(&self, payload_id: PayloadId) -> EngineApiResult { Ok(self + .inner .payload_store .resolve(payload_id) .await @@ -145,6 +161,7 @@ where payload_id: PayloadId, ) -> EngineApiResult { Ok(self + .inner .payload_store .resolve(payload_id) .await @@ -162,31 +179,44 @@ where /// Implementors should take care when acting on the input to this method, specifically /// ensuring that the range is limited properly, and that the range boundaries are computed /// correctly and without panics. - pub fn get_payload_bodies_by_range( + pub async fn get_payload_bodies_by_range( &self, start: BlockNumber, count: u64, ) -> EngineApiResult { - if count > MAX_PAYLOAD_BODIES_LIMIT { - return Err(EngineApiError::PayloadRequestTooLarge { len: count }) - } + let (tx, rx) = oneshot::channel(); + let inner = self.inner.clone(); - if start == 0 || count == 0 { - return Err(EngineApiError::InvalidBodiesRange { start, count }) - } + self.inner.task_spawner.spawn_blocking(Box::pin(async move { + if count > MAX_PAYLOAD_BODIES_LIMIT { + tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok(); + return + } - let mut result = Vec::with_capacity(count as usize); + if start == 0 || count == 0 { + tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok(); + return + } - let end = start.saturating_add(count); - for num in start..end { - let block = self - .provider - .block(BlockHashOrNumber::Number(num)) - .map_err(|err| EngineApiError::Internal(Box::new(err)))?; - result.push(block.map(Into::into)); - } + let mut result = Vec::with_capacity(count as usize); - Ok(result) + let end = start.saturating_add(count); + for num in start..end { + let block_result = inner.provider.block(BlockHashOrNumber::Number(num)); + match block_result { + Ok(block) => { + result.push(block.map(Into::into)); + } + Err(err) => { + tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok(); + return + } + }; + } + tx.send(Ok(result)).ok(); + })); + + rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))? } /// Called to retrieve execution payload bodies by hashes. @@ -202,6 +232,7 @@ where let mut result = Vec::with_capacity(hashes.len()); for hash in hashes { let block = self + .inner .provider .block(BlockHashOrNumber::Hash(hash)) .map_err(|err| EngineApiError::Internal(Box::new(err)))?; @@ -224,6 +255,7 @@ where } = config; let merge_terminal_td = self + .inner .chain_spec .fork(Hardfork::Paris) .ttd() @@ -237,7 +269,7 @@ where }) } - self.beacon_consensus.transition_configuration_exchanged().await; + self.inner.beacon_consensus.transition_configuration_exchanged().await; // Short circuit if communicated block hash is zero if terminal_block_hash.is_zero() { @@ -249,6 +281,7 @@ where // Attempt to look up terminal block hash let local_hash = self + .inner .provider .block_hash(terminal_block_number.as_u64()) .map_err(|err| EngineApiError::Internal(Box::new(err)))?; @@ -276,7 +309,8 @@ where timestamp: u64, has_withdrawals: bool, ) -> EngineApiResult<()> { - let is_shanghai = self.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp); + let is_shanghai = + self.inner.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp); match version { EngineApiMessageVersion::V1 => { @@ -404,7 +438,7 @@ where count: U64, ) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1"); - Ok(EngineApi::get_payload_bodies_by_range(self, start.as_u64(), count.as_u64())?) + Ok(EngineApi::get_payload_bodies_by_range(self, start.as_u64(), count.as_u64()).await?) } /// Handler for `engine_exchangeTransitionConfigurationV1` @@ -439,6 +473,7 @@ mod tests { use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{SealedBlock, H256, MAINNET}; use reth_provider::test_utils::MockEthProvider; + use reth_tasks::TokioTaskExecutor; use std::sync::Arc; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; @@ -447,11 +482,13 @@ mod tests { let provider = Arc::new(MockEthProvider::default()); let payload_store = spawn_test_payload_service(); let (to_engine, engine_rx) = unbounded_channel(); + let task_executor = Box::new(TokioTaskExecutor::default()); let api = EngineApi::new( provider.clone(), chain_spec.clone(), BeaconConsensusEngineHandle::new(to_engine), payload_store.into(), + task_executor, ); let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx }; (handle, api) @@ -491,7 +528,7 @@ mod tests { // test [EngineApiMessage::GetPayloadBodiesByRange] for (start, count) in by_range_tests { - let res = api.get_payload_bodies_by_range(start, count); + let res = api.get_payload_bodies_by_range(start, count).await; assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. })); } } @@ -501,7 +538,7 @@ mod tests { let (_, api) = setup_engine_api(); let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1; - let res = api.get_payload_bodies_by_range(0, request_count); + let res = api.get_payload_bodies_by_range(0, request_count).await; assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. })); } @@ -518,7 +555,7 @@ mod tests { let expected = blocks.iter().cloned().map(|b| Some(b.unseal().into())).collect::>(); - let res = api.get_payload_bodies_by_range(start, count).unwrap(); + let res = api.get_payload_bodies_by_range(start, count).await.unwrap(); assert_eq!(res, expected); } @@ -558,7 +595,7 @@ mod tests { }) .collect::>(); - let res = api.get_payload_bodies_by_range(start, count).unwrap(); + let res = api.get_payload_bodies_by_range(start, count).await.unwrap(); assert_eq!(res, expected); let hashes = blocks.iter().map(|b| b.hash()).collect();