From 534d23eed8798807de9bfe6b5f9027bd5c7e6eb0 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 19 Apr 2023 08:33:17 +0200 Subject: [PATCH] feat: process payload job initiation async (#2295) --- crates/consensus/beacon/src/engine/error.rs | 13 ++- crates/consensus/beacon/src/engine/message.rs | 110 +++++++++++++++++- crates/consensus/beacon/src/engine/mod.rs | 76 ++++++------ crates/payload/builder/src/service.rs | 17 +-- crates/rpc/rpc-engine-api/src/error.rs | 5 +- .../rpc-types/src/eth/engine/forkchoice.rs | 54 +++++++++ 6 files changed, 223 insertions(+), 52 deletions(-) diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs index 8ce2726c25..58b7a22668 100644 --- a/crates/consensus/beacon/src/engine/error.rs +++ b/crates/consensus/beacon/src/engine/error.rs @@ -1,5 +1,5 @@ use reth_payload_builder::error::PayloadBuilderError; -use reth_rpc_types::engine::{EngineRpcError, PayloadError}; +use reth_rpc_types::engine::{EngineRpcError, ForkchoiceUpdateError, PayloadError}; use reth_stages::PipelineError; use thiserror::Error; @@ -48,3 +48,14 @@ impl From for BeaconEngineError { Self::Common(e.into()) } } + +/// Represents error cases for an applied forkchoice update. +#[derive(Error, Debug, Eq, PartialEq)] +pub enum BeaconForkChoiceUpdateError { + /// Thrown when a forkchoice update resulted in an error. + #[error("Forkchoice update error: {0}")] + ForkchoiceUpdateError(#[from] ForkchoiceUpdateError), + /// Thrown when the engine task stopped + #[error("beacon consensus engine task stopped")] + EngineUnavailable, +} diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index e2eefbbc17..f3eff843fd 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -1,10 +1,116 @@ use crate::BeaconEngineResult; +use futures::{future::Either, FutureExt}; use reth_interfaces::consensus::ForkchoiceState; +use reth_payload_builder::error::PayloadBuilderError; use reth_rpc_types::engine::{ - ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, + ExecutionPayload, ForkChoiceUpdateResult, ForkchoiceUpdateError, ForkchoiceUpdated, + PayloadAttributes, PayloadId, PayloadStatus, +}; +use std::{ + future::Future, + pin::Pin, + task::{ready, Context, Poll}, }; use tokio::sync::oneshot; +/// Represents the outcome of forkchoice update. +/// +/// This is a future that resolves to [ForkChoiceUpdateResult] +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct OnForkChoiceUpdated { + is_valid_update: bool, + /// Returns the result of the forkchoice update. + fut: Either, PendingPayloadId>, +} + +// === impl OnForkChoiceUpdated === + +impl OnForkChoiceUpdated { + /// Returns true if this update is valid + pub(crate) fn is_valid_update(&self) -> bool { + self.is_valid_update + } + + /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no + /// payload attributes were provided. + pub(crate) fn valid(status: PayloadStatus) -> Self { + Self { + is_valid_update: status.is_valid(), + fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))), + } + } + /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the + /// given state is considered invalid + pub(crate) fn invalid_state() -> Self { + Self { + is_valid_update: false, + fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))), + } + } + + /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update was successful but + /// payload attributes were invalid. + pub(crate) fn invalid_payload_attributes() -> Self { + Self { + // This is valid because this is only reachable if the state and payload is valid + is_valid_update: true, + fut: Either::Left(futures::future::ready(Err( + ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes, + ))), + } + } + + /// If the forkchoice update was successful and no payload attributes were provided, this method + pub(crate) fn updated_with_pending_payload_id( + payload_status: PayloadStatus, + pending_payload_id: oneshot::Receiver>, + ) -> Self { + Self { + is_valid_update: payload_status.is_valid(), + fut: Either::Right(PendingPayloadId { + payload_status: Some(payload_status), + pending_payload_id, + }), + } + } +} + +impl Future for OnForkChoiceUpdated { + type Output = ForkChoiceUpdateResult; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.get_mut().fut.poll_unpin(cx) + } +} + +/// A future that returns the payload id of a yet to be initiated payload job after a successful +/// forkchoice update +#[derive(Debug)] +struct PendingPayloadId { + payload_status: Option, + pending_payload_id: oneshot::Receiver>, +} + +impl Future for PendingPayloadId { + type Output = ForkChoiceUpdateResult; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + let res = ready!(this.pending_payload_id.poll_unpin(cx)); + match res { + Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated { + payload_status: this.payload_status.take().expect("Polled after completion"), + payload_id: Some(payload_id), + })), + Err(_) | Ok(Err(_)) => { + // failed to initiate a payload build job + Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes)) + } + } + } +} + /// A message for the beacon engine from other components of the node (engine RPC API invoked by the /// consensus layer). #[derive(Debug)] @@ -23,6 +129,6 @@ pub enum BeaconEngineMessage { /// The payload attributes for block building. payload_attrs: Option, /// The sender for returning forkchoice updated result. - tx: oneshot::Sender>, + tx: oneshot::Sender, }, } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 871b646514..50bdfa502d 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1,5 +1,5 @@ use crate::engine::metrics::Metrics; -use futures::{Future, FutureExt, StreamExt}; +use futures::{Future, FutureExt, StreamExt, TryFutureExt}; use reth_db::{database::Database, tables, transaction::DbTx}; use reth_interfaces::{ blockchain_tree::{BlockStatus, BlockchainTreeEngine}, @@ -11,8 +11,7 @@ use reth_interfaces::{ use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; use reth_primitives::{BlockNumber, Header, SealedBlock, H256}; use reth_rpc_types::engine::{ - EngineRpcError, ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, - PayloadStatusEnum, + ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, }; use reth_stages::{stages::FINISH, Pipeline}; use reth_tasks::TaskSpawner; @@ -33,10 +32,11 @@ mod message; pub use message::BeaconEngineMessage; mod error; -pub use error::{BeaconEngineError, BeaconEngineResult}; +pub use error::{BeaconEngineError, BeaconEngineResult, BeaconForkChoiceUpdateError}; mod metrics; mod pipeline_state; +use crate::engine::message::OnForkChoiceUpdated; pub use pipeline_state::PipelineState; /// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus @@ -75,19 +75,21 @@ impl BeaconConsensusEngineHandle { &self, state: ForkchoiceState, payload_attrs: Option, - ) -> BeaconEngineResult { - self.send_fork_choice_updated(state, payload_attrs) - .await - .map_err(|_| BeaconEngineError::EngineUnavailable)? + ) -> Result { + Ok(self + .send_fork_choice_updated(state, payload_attrs) + .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable) + .await? + .await?) } /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to /// wait for a response. - pub fn send_fork_choice_updated( + fn send_fork_choice_updated( &self, state: ForkchoiceState, payload_attrs: Option, - ) -> oneshot::Receiver> { + ) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { state, @@ -238,14 +240,10 @@ where &mut self, state: ForkchoiceState, attrs: Option, - ) -> Result { + ) -> Result { trace!(target: "consensus::engine", ?state, "Received new forkchoice state"); if state.head_block_hash.is_zero() { - return Ok(ForkchoiceUpdated::new(PayloadStatus::from_status( - PayloadStatusEnum::Invalid { - validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(), - }, - ))) + return Ok(OnForkChoiceUpdated::invalid_state()) } // TODO: check PoW / EIP-3675 terminal block conditions for the fork choice head @@ -276,7 +274,7 @@ where .expect("was canonicalized, so it exists"); if let Some(attrs) = attrs { - return self.process_payload_attributes(attrs, header, state) + return Ok(self.process_payload_attributes(attrs, header, state)) } // TODO: most recent valid block in the branch defined by payload and its @@ -313,7 +311,7 @@ where }; trace!(target: "consensus::engine", ?state, ?status, "Returning forkchoice status"); - Ok(ForkchoiceUpdated::new(status)) + Ok(OnForkChoiceUpdated::valid(status)) } /// Validates the payload attributes with respect to the header and fork choice state. @@ -322,7 +320,7 @@ where attrs: PayloadAttributes, header: Header, state: ForkchoiceState, - ) -> Result { + ) -> OnForkChoiceUpdated { // 7. Client software MUST ensure that payloadAttributes.timestamp is // greater than timestamp of a block referenced by // forkchoiceState.headBlockHash. If this condition isn't held client @@ -330,11 +328,7 @@ where // MUST NOT begin a payload build process. In such an event, the // forkchoiceState update MUST NOT be rolled back. if attrs.timestamp <= header.timestamp.into() { - return Ok(ForkchoiceUpdated::new(PayloadStatus::from_status( - PayloadStatusEnum::Invalid { - validation_error: EngineRpcError::InvalidPayloadAttributes.to_string(), - }, - ))) + return OnForkChoiceUpdated::invalid_payload_attributes() } // 8. Client software MUST begin a payload build process building on top of @@ -344,7 +338,7 @@ where // building section. let attributes = PayloadBuilderAttributes::new(header.parent_hash, attrs); // TODO(mattsse) this needs to be handled asynchronously - let payload_id = self.payload_builder.send_new_payload(attributes); + let pending_payload_id = self.payload_builder.send_new_payload(attributes); // Client software MUST respond to this method call in the following way: // { @@ -357,11 +351,10 @@ where // } // // if the payload is deemed VALID and the build process has begun. - Ok(ForkchoiceUpdated::new(PayloadStatus::new( - PayloadStatusEnum::Valid, - Some(state.head_block_hash), - )) - .with_payload_id(payload_id)) + OnForkChoiceUpdated::updated_with_pending_payload_id( + PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)), + pending_payload_id, + ) } /// When the Consensus layer receives a new block via the consensus gossip protocol, @@ -522,16 +515,15 @@ where match msg { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { this.metrics.forkchoice_updated_messages.increment(1); - let response = match this.on_forkchoice_updated(state, payload_attrs) { + let on_updated = match this.on_forkchoice_updated(state, payload_attrs) { Ok(response) => response, Err(error) => { error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response"); return Poll::Ready(Err(error)) } }; - let is_valid_response = - matches!(response.payload_status.status, PayloadStatusEnum::Valid); - let _ = tx.send(Ok(response)); + let is_valid_response = on_updated.is_valid_update(); + let _ = tx.send(on_updated); // Terminate the sync early if it's reached the maximum user // configured block. @@ -643,6 +635,7 @@ enum PipelineTarget { #[cfg(test)] mod tests { use super::*; + use crate::engine::error::BeaconForkChoiceUpdateError; use assert_matches::assert_matches; use reth_blockchain_tree::{ blockchain_tree::{ @@ -713,7 +706,7 @@ mod tests { async fn send_forkchoice_updated( &self, state: ForkchoiceState, - ) -> BeaconEngineResult { + ) -> Result { self.engine_handle.fork_choice_updated(state, None).await } @@ -722,7 +715,7 @@ mod tests { async fn send_forkchoice_retry_on_syncing( &self, state: ForkchoiceState, - ) -> BeaconEngineResult { + ) -> Result { loop { let result = self.engine_handle.fork_choice_updated(state, None).await?; if !result.is_syncing() { @@ -926,6 +919,7 @@ mod tests { mod fork_choice_updated { use super::*; use reth_interfaces::test_utils::generators::random_block; + use reth_rpc_types::engine::ForkchoiceUpdateError; #[tokio::test] async fn empty_head() { @@ -945,10 +939,12 @@ mod tests { let mut engine_rx = spawn_consensus_engine(consensus_engine); let res = env.send_forkchoice_updated(ForkchoiceState::default()).await; - let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { - validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(), - }); - assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); + assert_eq!( + res, + Err(BeaconForkChoiceUpdateError::ForkchoiceUpdateError( + ForkchoiceUpdateError::InvalidState + )) + ); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 90e1344109..75603044fb 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -61,12 +61,15 @@ impl PayloadBuilderHandle { /// Sends a message to the service to start building a new payload for the given payload. /// - /// This is the same as [PayloadBuilderHandle::new_payload] but does not wait for the result. - pub fn send_new_payload(&self, attr: PayloadBuilderAttributes) -> PayloadId { - let id = attr.payload_id(); - let (tx, _) = oneshot::channel(); + /// This is the same as [PayloadBuilderHandle::new_payload] but does not wait for the result and + /// returns the receiver instead + pub fn send_new_payload( + &self, + attr: PayloadBuilderAttributes, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx)); - id + rx } /// Starts building a new payload for the given payload attributes. @@ -78,9 +81,7 @@ impl PayloadBuilderHandle { &self, attr: PayloadBuilderAttributes, ) -> Result { - let (tx, rx) = oneshot::channel(); - let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx)); - rx.await? + self.send_new_payload(attr).await? } } diff --git a/crates/rpc/rpc-engine-api/src/error.rs b/crates/rpc/rpc-engine-api/src/error.rs index af34b0d8c2..323e405392 100644 --- a/crates/rpc/rpc-engine-api/src/error.rs +++ b/crates/rpc/rpc-engine-api/src/error.rs @@ -1,5 +1,5 @@ use jsonrpsee_types::error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE}; -use reth_beacon_consensus::BeaconEngineError; +use reth_beacon_consensus::{BeaconEngineError, BeaconForkChoiceUpdateError}; use reth_primitives::{H256, U256}; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; @@ -66,6 +66,9 @@ pub enum EngineApiError { /// Beacon consensus engine error. #[error(transparent)] ConsensusEngine(#[from] BeaconEngineError), + /// An error occurred while processing the fork choice update. + #[error(transparent)] + ForkChoiceUpdate(#[from] BeaconForkChoiceUpdateError), /// Encountered an internal error. #[error(transparent)] Internal(Box), diff --git a/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs index 617de8145c..b97341225d 100644 --- a/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs +++ b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs @@ -3,6 +3,21 @@ use crate::engine::PayloadId; use reth_primitives::H256; use serde::{Deserialize, Serialize}; +/// invalid forkchoice state error code. +pub const INVALID_FORK_CHOICE_STATE_ERROR: i32 = -38002; + +/// invalid payload attributes error code. +pub const INVALID_PAYLOAD_ATTRIBUTES_ERROR: i32 = -38003; + +/// invalid forkchoice state error message. +pub const INVALID_FORK_CHOICE_STATE_ERROR_MSG: &str = "Invalid forkchoice state"; + +/// invalid payload attributes error message. +pub const INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG: &str = "Invalid payload attributes"; + +/// Represents possible variants of a processed forkchoice update. +pub type ForkChoiceUpdateResult = Result; + /// This structure encapsulates the fork choice state #[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -12,10 +27,49 @@ pub struct ForkchoiceState { pub finalized_block_hash: H256, } +/// A standalone forkchoice update result for RPC. +#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)] +pub enum ForkchoiceUpdateError { + /// The forkchoice update has been processed, but the requested contained invalid + /// [PayloadAttributes](crate::engine::PayloadAttributes). + /// + /// This is returned as an error because the payload attributes are invalid and the payload is not valid, See + #[error("Invalid payload attributes")] + UpdatedInvalidPayloadAttributes, + /// The given [ForkchoiceState] is invalid or inconsistent. + #[error("Invalid forkchoice state")] + InvalidState, +} + +impl From for jsonrpsee_types::error::ErrorObject<'static> { + fn from(value: ForkchoiceUpdateError) -> Self { + match value { + ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes => { + jsonrpsee_types::error::ErrorObject::owned( + INVALID_PAYLOAD_ATTRIBUTES_ERROR, + INVALID_PAYLOAD_ATTRIBUTES_ERROR_MSG, + None::<()>, + ) + } + ForkchoiceUpdateError::InvalidState => jsonrpsee_types::error::ErrorObject::owned( + INVALID_FORK_CHOICE_STATE_ERROR, + INVALID_FORK_CHOICE_STATE_ERROR_MSG, + None::<()>, + ), + } + } +} + +/// Represents a successfully _processed_ forkchoice state update. +/// +/// Note: this can still be INVALID if the provided payload was invalid. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ForkchoiceUpdated { + /// Represents the outcome of the validation of the payload, independently of the payload being + /// valid or not. pub payload_status: PayloadStatus, + /// The identifier of the payload build process that was successfully initiated. pub payload_id: Option, }