diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 4094ef49ef..1372f47de7 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -5,7 +5,7 @@ use crate::{ error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, - BuiltPayload, PayloadBuilderAttributes, PayloadJob, + BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob, }; use futures_util::{future::FutureExt, StreamExt}; use reth_rpc_types::engine::PayloadId; @@ -28,12 +28,25 @@ pub struct PayloadStore { // === impl PayloadStore === impl PayloadStore { - /// Returns the best payload for the given identifier. - pub async fn get_payload( + /// Resolves the payload job and returns the best payload that has been built so far. + /// + /// Note: depending on the installed [PayloadJobGenerator], this may or may not terminate the + /// job, See [PayloadJob::resolve]. + pub async fn resolve( &self, id: PayloadId, ) -> Option, PayloadBuilderError>> { - self.inner.get_payload(id).await + self.inner.resolve(id).await + } + + /// Returns the best payload for the given identifier. + /// + /// Note: this merely returns the best payload so far and does not resolve the job. + pub async fn best_payload( + &self, + id: PayloadId, + ) -> Option, PayloadBuilderError>> { + self.inner.best_payload(id).await } } @@ -55,13 +68,29 @@ pub struct PayloadBuilderHandle { // === impl PayloadBuilderHandle === impl PayloadBuilderHandle { - /// Returns the best payload for the given identifier. - pub async fn get_payload( + /// Resolves the payload job and returns the best payload that has been built so far. + /// + /// Note: depending on the installed [PayloadJobGenerator], this may or may not terminate the + /// job, See [PayloadJob::resolve]. + pub async fn resolve( &self, id: PayloadId, ) -> Option, PayloadBuilderError>> { let (tx, rx) = oneshot::channel(); - self.to_service.send(PayloadServiceCommand::GetPayload(id, tx)).ok()?; + self.to_service.send(PayloadServiceCommand::Resolve(id, tx)).ok()?; + match rx.await.transpose()? { + Ok(fut) => Some(fut.await), + Err(e) => Some(Err(e.into())), + } + } + + /// Returns the best payload for the given identifier. + pub async fn best_payload( + &self, + id: PayloadId, + ) -> Option, PayloadBuilderError>> { + let (tx, rx) = oneshot::channel(); + self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?; rx.await.ok()? } @@ -141,10 +170,27 @@ where self.payload_jobs.iter().any(|(_, job_id)| *job_id == id) } - /// Returns the best payload for the given identifier. - fn get_payload(&self, id: PayloadId) -> Option, PayloadBuilderError>> { + /// Returns the best payload for the given identifier that has been built so far. + fn best_payload( + &self, + id: PayloadId, + ) -> Option, PayloadBuilderError>> { self.payload_jobs.iter().find(|(_, job_id)| *job_id == id).map(|(j, _)| j.best_payload()) } + + /// Returns the best payload for the given identifier that has been built so far and terminates + /// the job if requested. + fn resolve(&mut self, id: PayloadId) -> Option { + let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?; + let (fut, keep_alive) = self.payload_jobs[job].0.resolve(); + + if keep_alive == KeepPayloadJobAlive::No { + let (_, id) = self.payload_jobs.remove(job); + trace!(%id, "terminated resolved job"); + } + + Some(Box::pin(fut)) + } } impl Future for PayloadBuilderService @@ -214,8 +260,11 @@ where // return the id of the payload let _ = tx.send(res); } - PayloadServiceCommand::GetPayload(id, tx) => { - let _ = tx.send(this.get_payload(id)); + PayloadServiceCommand::BestPayload(id, tx) => { + let _ = tx.send(this.best_payload(id)); + } + PayloadServiceCommand::Resolve(id, tx) => { + let _ = tx.send(this.resolve(id)); } } } @@ -227,14 +276,18 @@ where } } +type PayloadFuture = + Pin, PayloadBuilderError>> + Send>>; + /// Message type for the [PayloadBuilderService]. -#[derive(Debug)] enum PayloadServiceCommand { /// Start building a new payload. BuildNewPayload( PayloadBuilderAttributes, oneshot::Sender>, ), - /// Get the current payload. - GetPayload(PayloadId, oneshot::Sender, PayloadBuilderError>>>), + /// Get the best payload so far + BestPayload(PayloadId, oneshot::Sender, PayloadBuilderError>>>), + /// Resolve the payload and return the payload + Resolve(PayloadId, oneshot::Sender>), } diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index 2b0ffe41fa..7e0d699bf0 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -17,7 +17,8 @@ pub trait PayloadJob: Future> + Send + /// Represents the future that resolves the block that's returned to the CL. type ResolvePayloadFuture: Future, PayloadBuilderError>> + Send - + Sync; + + Sync + + 'static; /// Returns the best payload that has been built so far. /// diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 543e120cb0..26bb056416 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -127,7 +127,7 @@ where pub async fn get_payload_v1(&self, payload_id: PayloadId) -> EngineApiResult { Ok(self .payload_store - .get_payload(payload_id) + .resolve(payload_id) .await .ok_or(EngineApiError::UnknownPayload)? .map(|payload| (*payload).clone().into_v1_payload())?) @@ -146,7 +146,7 @@ where ) -> EngineApiResult { Ok(self .payload_store - .get_payload(payload_id) + .resolve(payload_id) .await .ok_or(EngineApiError::UnknownPayload)? .map(|payload| (*payload).clone().into_v2_payload())?)