From 8d32fd788bfa8c2e041bd1b93ecca37093a09c5a Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Fri, 18 Oct 2024 14:45:51 +0400 Subject: [PATCH] feat: allow awaiting payload in progress (#11823) Co-authored-by: Matthias Seitz --- crates/e2e-test-utils/src/payload.rs | 2 +- crates/engine/local/src/miner.rs | 9 ++- crates/payload/basic/src/lib.rs | 15 +++- crates/payload/builder/src/lib.rs | 6 +- crates/payload/builder/src/noop.rs | 2 +- crates/payload/builder/src/service.rs | 79 +++++++++++----------- crates/payload/builder/src/test_utils.rs | 7 +- crates/payload/builder/src/traits.rs | 20 +++++- crates/payload/primitives/src/lib.rs | 22 ++++++ crates/payload/primitives/src/traits.rs | 40 +++++------ examples/custom-payload-builder/src/job.rs | 6 +- 11 files changed, 127 insertions(+), 81 deletions(-) diff --git a/crates/e2e-test-utils/src/payload.rs b/crates/e2e-test-utils/src/payload.rs index 1f9a89307b..946d9af575 100644 --- a/crates/e2e-test-utils/src/payload.rs +++ b/crates/e2e-test-utils/src/payload.rs @@ -28,7 +28,7 @@ impl PayloadTestContext { ) -> eyre::Result { self.timestamp += 1; let attributes: E::PayloadBuilderAttributes = attributes_generator(self.timestamp); - self.payload_builder.new_payload(attributes.clone()).await.unwrap(); + self.payload_builder.send_new_payload(attributes.clone()).await.unwrap()?; Ok(attributes) } diff --git a/crates/engine/local/src/miner.rs b/crates/engine/local/src/miner.rs index f20d70b148..8bcb7083aa 100644 --- a/crates/engine/local/src/miner.rs +++ b/crates/engine/local/src/miner.rs @@ -9,7 +9,7 @@ use reth_chainspec::EthereumHardforks; use reth_engine_primitives::EngineTypes; use reth_payload_builder::PayloadBuilderHandle; use reth_payload_primitives::{ - BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadTypes, + BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadKind, PayloadTypes, }; use reth_provider::{BlockReader, ChainSpecProvider}; use reth_rpc_types_compat::engine::payload::block_to_payload; @@ -202,10 +202,9 @@ where let payload_id = res.payload_id.ok_or_eyre("No payload id")?; - // wait for some time to let the payload be built - tokio::time::sleep(Duration::from_millis(200)).await; - - let Some(Ok(payload)) = self.payload_builder.best_payload(payload_id).await else { + let Some(Ok(payload)) = + self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await + else { eyre::bail!("No payload") }; diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 835f20f3ef..7416283c1f 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -17,7 +17,9 @@ use reth_chainspec::{ChainSpec, EthereumHardforks}; use reth_payload_builder::{ database::CachedReads, KeepPayloadJobAlive, PayloadId, PayloadJob, PayloadJobGenerator, }; -use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError}; +use reth_payload_primitives::{ + BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError, PayloadKind, +}; use reth_primitives::{ constants::{RETH_CLIENT_VERSION, SLOT_DURATION}, proofs, BlockNumberOrTag, SealedBlock, Withdrawals, @@ -474,7 +476,10 @@ where Ok(self.config.attributes.clone()) } - fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { + fn resolve_kind( + &mut self, + kind: PayloadKind, + ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { let best_payload = self.best_payload.take(); if best_payload.is_none() && self.pending_block.is_none() { @@ -530,7 +535,11 @@ where }; } - let fut = ResolveBestPayload { best_payload, maybe_better, empty_payload }; + let fut = ResolveBestPayload { + best_payload, + maybe_better, + empty_payload: empty_payload.filter(|_| kind != PayloadKind::WaitForPending), + }; (fut, KeepPayloadJobAlive::No) } diff --git a/crates/payload/builder/src/lib.rs b/crates/payload/builder/src/lib.rs index 70b4296da4..0df15f5b0d 100644 --- a/crates/payload/builder/src/lib.rs +++ b/crates/payload/builder/src/lib.rs @@ -28,7 +28,7 @@ //! use std::pin::Pin; //! use std::task::{Context, Poll}; //! use alloy_primitives::U256; -//! use reth_payload_builder::{EthBuiltPayload, PayloadBuilderError, KeepPayloadJobAlive, EthPayloadBuilderAttributes, PayloadJob, PayloadJobGenerator}; +//! use reth_payload_builder::{EthBuiltPayload, PayloadBuilderError, KeepPayloadJobAlive, EthPayloadBuilderAttributes, PayloadJob, PayloadJobGenerator, PayloadKind}; //! use reth_primitives::{Block, Header}; //! //! /// The generator type that creates new jobs that builds empty blocks. @@ -73,7 +73,7 @@ //! Ok(self.attributes.clone()) //! } //! -//! fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { +//! fn resolve_kind(&mut self, _kind: PayloadKind) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { //! let payload = self.best_payload(); //! (futures_util::future::ready(payload), KeepPayloadJobAlive::No) //! } @@ -112,7 +112,7 @@ pub mod noop; pub mod test_utils; pub use alloy_rpc_types::engine::PayloadId; -pub use reth_payload_primitives::PayloadBuilderError; +pub use reth_payload_primitives::{PayloadBuilderError, PayloadKind}; pub use service::{ PayloadBuilderHandle, PayloadBuilderService, PayloadServiceCommand, PayloadStore, }; diff --git a/crates/payload/builder/src/noop.rs b/crates/payload/builder/src/noop.rs index 06da7dcfad..cbf21f1ceb 100644 --- a/crates/payload/builder/src/noop.rs +++ b/crates/payload/builder/src/noop.rs @@ -51,7 +51,7 @@ where } PayloadServiceCommand::BestPayload(_, tx) => tx.send(None).ok(), PayloadServiceCommand::PayloadAttributes(_, tx) => tx.send(None).ok(), - PayloadServiceCommand::Resolve(_, tx) => tx.send(None).ok(), + PayloadServiceCommand::Resolve(_, _, tx) => tx.send(None).ok(), PayloadServiceCommand::Subscribe(_) => None, }; } diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 1ebf6770c9..853c69e90d 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -11,7 +11,7 @@ use alloy_rpc_types::engine::PayloadId; use futures_util::{future::FutureExt, Stream, StreamExt}; use reth_payload_primitives::{ BuiltPayload, Events, PayloadBuilder, PayloadBuilderAttributes, PayloadBuilderError, - PayloadEvents, PayloadTypes, + PayloadEvents, PayloadKind, PayloadTypes, }; use reth_provider::CanonStateNotification; use std::{ @@ -45,11 +45,20 @@ where /// /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the /// job, See [`PayloadJob::resolve`]. + pub async fn resolve_kind( + &self, + id: PayloadId, + kind: PayloadKind, + ) -> Option> { + self.inner.resolve_kind(id, kind).await + } + + /// Resolves the payload job and returns the best payload that has been built so far. pub async fn resolve( &self, id: PayloadId, ) -> Option> { - self.inner.resolve(id).await + self.resolve_kind(id, PayloadKind::Earliest).await } /// Returns the best payload for the given identifier. @@ -110,16 +119,13 @@ where type PayloadType = T; type Error = PayloadBuilderError; - async fn send_and_resolve_payload( + fn send_new_payload( &self, attr: ::PayloadBuilderAttributes, - ) -> Result::BuiltPayload>, Self::Error> { - let rx = self.send_new_payload(attr); - let id = rx.await??; - + ) -> Receiver> { let (tx, rx) = oneshot::channel(); - let _ = self.to_service.send(PayloadServiceCommand::Resolve(id, tx)); - rx.await?.ok_or(PayloadBuilderError::MissingPayload) + let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx)); + rx } /// Note: this does not resolve the job if it's still in progress. @@ -132,21 +138,17 @@ where rx.await.ok()? } - fn send_new_payload( + async fn resolve_kind( &self, - attr: ::PayloadBuilderAttributes, - ) -> Receiver> { + id: PayloadId, + kind: PayloadKind, + ) -> Option> { let (tx, rx) = oneshot::channel(); - let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx)); - rx - } - - /// Note: if there's already payload in progress with same identifier, it will be returned. - async fn new_payload( - &self, - attr: ::PayloadBuilderAttributes, - ) -> Result { - self.send_new_payload(attr).await? + self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?; + match rx.await.transpose()? { + Ok(fut) => Some(fut.await), + Err(e) => Some(Err(e.into())), + } } async fn subscribe(&self) -> Result, Self::Error> { @@ -168,19 +170,6 @@ where Self { to_service } } - /// Resolves the payload job and returns the best payload that has been built so far. - /// - /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the - /// job, See [`PayloadJob::resolve`]. - async fn resolve(&self, id: PayloadId) -> Option> { - let (tx, rx) = oneshot::channel(); - self.to_service.send(PayloadServiceCommand::Resolve(id, tx)).ok()?; - match rx.await.transpose()? { - Ok(fut) => Some(fut.await), - Err(e) => Some(Err(e.into())), - } - } - /// Returns the payload attributes associated with the given identifier. /// /// Note: this returns the attributes of the payload and does not resolve the job. @@ -296,11 +285,15 @@ where /// Returns the best payload for the given identifier that has been built so far and terminates /// the job if requested. - fn resolve(&mut self, id: PayloadId) -> Option> { + fn resolve( + &mut self, + id: PayloadId, + kind: PayloadKind, + ) -> Option> { trace!(%id, "resolving payload job"); let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?; - let (fut, keep_alive) = self.payload_jobs[job].0.resolve(); + let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind); if keep_alive == KeepPayloadJobAlive::No { let (_, id) = self.payload_jobs.swap_remove(job); @@ -437,8 +430,8 @@ where let attributes = this.payload_attributes(id); let _ = tx.send(attributes); } - PayloadServiceCommand::Resolve(id, tx) => { - let _ = tx.send(this.resolve(id)); + PayloadServiceCommand::Resolve(id, strategy, tx) => { + let _ = tx.send(this.resolve(id, strategy)); } PayloadServiceCommand::Subscribe(tx) => { let new_rx = this.payload_events.subscribe(); @@ -469,7 +462,11 @@ pub enum PayloadServiceCommand { oneshot::Sender>>, ), /// Resolve the payload and return the payload - Resolve(PayloadId, oneshot::Sender>>), + Resolve( + PayloadId, + /* kind: */ PayloadKind, + oneshot::Sender>>, + ), /// Payload service events Subscribe(oneshot::Sender>>), } @@ -489,7 +486,7 @@ where Self::PayloadAttributes(f0, f1) => { f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish() } - Self::Resolve(f0, _f1) => f.debug_tuple("Resolve").field(&f0).finish(), + Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(), Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(), } } diff --git a/crates/payload/builder/src/test_utils.rs b/crates/payload/builder/src/test_utils.rs index 55b9b84f45..6990dc9b17 100644 --- a/crates/payload/builder/src/test_utils.rs +++ b/crates/payload/builder/src/test_utils.rs @@ -7,7 +7,7 @@ use crate::{ use alloy_primitives::U256; use reth_chain_state::ExecutedBlock; -use reth_payload_primitives::{PayloadBuilderError, PayloadTypes}; +use reth_payload_primitives::{PayloadBuilderError, PayloadKind, PayloadTypes}; use reth_primitives::Block; use reth_provider::CanonStateNotification; use std::{ @@ -96,7 +96,10 @@ impl PayloadJob for TestPayloadJob { Ok(self.attr.clone()) } - fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { + fn resolve_kind( + &mut self, + _kind: PayloadKind, + ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { let fut = futures_util::future::ready(self.best_payload()); (fut, KeepPayloadJobAlive::No) } diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index 8d448eeff5..62dadeb45d 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -1,6 +1,8 @@ //! Trait abstractions used by the payload crate. -use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError}; +use reth_payload_primitives::{ + BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError, PayloadKind, +}; use reth_provider::CanonStateNotification; use std::future::Future; @@ -53,7 +55,21 @@ pub trait PayloadJob: Future> + Send + /// If this returns [`KeepPayloadJobAlive::Yes`], then the [`PayloadJob`] will be polled /// once more. If this returns [`KeepPayloadJobAlive::No`] then the [`PayloadJob`] will be /// dropped after this call. - fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive); + /// + /// The [`PayloadKind`] determines how the payload should be resolved in the + /// `ResolvePayloadFuture`. [`PayloadKind::Earliest`] should return the earliest available + /// payload (as fast as possible), e.g. racing an empty payload job against a pending job if + /// there's no payload available yet. [`PayloadKind::WaitForPending`] is allowed to wait + /// until a built payload is available. + fn resolve_kind( + &mut self, + kind: PayloadKind, + ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive); + + /// Resolves the payload as fast as possible. + fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { + self.resolve_kind(PayloadKind::Earliest) + } } /// Whether the payload job should be kept alive or terminated after the payload was requested by diff --git a/crates/payload/primitives/src/lib.rs b/crates/payload/primitives/src/lib.rs index 8173cae344..08aa428000 100644 --- a/crates/payload/primitives/src/lib.rs +++ b/crates/payload/primitives/src/lib.rs @@ -342,6 +342,28 @@ pub enum EngineApiMessageVersion { V4, } +/// Determines how we should choose the payload to return. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum PayloadKind { + /// Returns the next best available payload (the earliest available payload). + /// This does not wait for a real for pending job to finish if there's no best payload yet and + /// is allowed to race various payload jobs (empty, pending best) against each other and + /// returns whichever job finishes faster. + /// + /// This should be used when it's more important to return a valid payload as fast as possible. + /// For example, the engine API timeout for `engine_getPayload` is 1s and clients should rather + /// return an empty payload than indefinitely waiting for the pending payload job to finish and + /// risk missing the deadline. + #[default] + Earliest, + /// Only returns once we have at least one built payload. + /// + /// Compared to [`PayloadKind::Earliest`] this does not race an empty payload job against the + /// already in progress one, and returns the best available built payload or awaits the job in + /// progress. + WaitForPending, +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/payload/primitives/src/traits.rs b/crates/payload/primitives/src/traits.rs index 494ed68aa4..ce98fcad32 100644 --- a/crates/payload/primitives/src/traits.rs +++ b/crates/payload/primitives/src/traits.rs @@ -1,4 +1,4 @@ -use crate::{PayloadBuilderError, PayloadEvents, PayloadTypes}; +use crate::{PayloadEvents, PayloadKind, PayloadTypes}; use alloy_primitives::{Address, B256, U256}; use alloy_rpc_types::{ engine::{PayloadAttributes as EthPayloadAttributes, PayloadId}, @@ -7,12 +7,8 @@ use alloy_rpc_types::{ use op_alloy_rpc_types_engine::OpPayloadAttributes; use reth_chain_state::ExecutedBlock; use reth_primitives::{SealedBlock, Withdrawals}; -use std::{future::Future, pin::Pin}; use tokio::sync::oneshot; -pub(crate) type PayloadFuture

= - Pin> + Send + Sync>>; - /// A type that can request, subscribe to and resolve payloads. #[async_trait::async_trait] pub trait PayloadBuilder: Send + Unpin { @@ -21,12 +17,13 @@ pub trait PayloadBuilder: Send + Unpin { /// The error type returned by the builder. type Error; - /// Sends a message to the service to start building a new payload for the given payload - /// attributes and returns a future that resolves to the payload. - async fn send_and_resolve_payload( + /// Sends a message to the service to start building a new payload for the given payload. + /// + /// Returns a receiver that will receive the payload id. + fn send_new_payload( &self, attr: ::PayloadBuilderAttributes, - ) -> Result::BuiltPayload>, Self::Error>; + ) -> oneshot::Receiver>; /// Returns the best payload for the given identifier. async fn best_payload( @@ -34,22 +31,21 @@ pub trait PayloadBuilder: Send + Unpin { id: PayloadId, ) -> Option::BuiltPayload, Self::Error>>; - /// Sends a message to the service to start building a new payload for the given payload. - /// - /// This is the same as [`PayloadBuilder::new_payload`] but does not wait for the result - /// and returns the receiver instead - fn send_new_payload( + /// Resolves the payload job and returns the best payload that has been built so far. + async fn resolve_kind( &self, - attr: ::PayloadBuilderAttributes, - ) -> oneshot::Receiver>; + id: PayloadId, + kind: PayloadKind, + ) -> Option::BuiltPayload, Self::Error>>; - /// Starts building a new payload for the given payload attributes. - /// - /// Returns the identifier of the payload. - async fn new_payload( + /// Resolves the payload job as fast and possible and returns the best payload that has been + /// built so far. + async fn resolve( &self, - attr: ::PayloadBuilderAttributes, - ) -> Result; + id: PayloadId, + ) -> Option::BuiltPayload, Self::Error>> { + self.resolve_kind(id, PayloadKind::Earliest).await + } /// Sends a message to the service to subscribe to payload events. /// Returns a receiver that will receive them. diff --git a/examples/custom-payload-builder/src/job.rs b/examples/custom-payload-builder/src/job.rs index 26b594be94..0141982595 100644 --- a/examples/custom-payload-builder/src/job.rs +++ b/examples/custom-payload-builder/src/job.rs @@ -3,6 +3,7 @@ use reth::{ providers::StateProviderFactory, tasks::TaskSpawner, transaction_pool::TransactionPool, }; use reth_basic_payload_builder::{PayloadBuilder, PayloadConfig}; +use reth_node_api::PayloadKind; use reth_payload_builder::{KeepPayloadJobAlive, PayloadBuilderError, PayloadJob}; use std::{ @@ -52,7 +53,10 @@ where Ok(self.config.attributes.clone()) } - fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { + fn resolve_kind( + &mut self, + _kind: PayloadKind, + ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { let payload = self.best_payload(); (futures_util::future::ready(payload), KeepPayloadJobAlive::No) }