From 020597da32ae5e94a473ab002462f3d5cb004132 Mon Sep 17 00:00:00 2001 From: malik Date: Tue, 3 Sep 2024 12:44:30 +0100 Subject: [PATCH] Update generic name in payload crate (#10669) --- crates/payload/builder/src/events.rs | 20 ++-- crates/payload/builder/src/noop.rs | 14 +-- crates/payload/builder/src/service.rs | 119 +++++++++++------------ crates/payload/builder/src/test_utils.rs | 12 +-- 4 files changed, 78 insertions(+), 87 deletions(-) diff --git a/crates/payload/builder/src/events.rs b/crates/payload/builder/src/events.rs index 57e9365e0f..33e6021bf4 100644 --- a/crates/payload/builder/src/events.rs +++ b/crates/payload/builder/src/events.rs @@ -12,41 +12,41 @@ use tracing::debug; /// Payload builder events. #[derive(Clone, Debug)] -pub enum Events { +pub enum Events { /// The payload attributes as /// they are received from the CL through the engine api. - Attributes(Engine::PayloadBuilderAttributes), + Attributes(T::PayloadBuilderAttributes), /// The built payload that has been just built. /// Triggered by the CL whenever it asks for an execution payload. /// This event is only thrown if the CL is a validator. - BuiltPayload(Engine::BuiltPayload), + BuiltPayload(T::BuiltPayload), } /// Represents a receiver for various payload events. #[derive(Debug)] -pub struct PayloadEvents { +pub struct PayloadEvents { /// The receiver for the payload events. - pub receiver: broadcast::Receiver>, + pub receiver: broadcast::Receiver>, } -impl PayloadEvents { +impl PayloadEvents { /// Convert this receiver into a stream of `PayloadEvents`. - pub fn into_stream(self) -> BroadcastStream> { + pub fn into_stream(self) -> BroadcastStream> { BroadcastStream::new(self.receiver) } /// Asynchronously receives the next payload event. - pub async fn recv(self) -> Option, BroadcastStreamRecvError>> { + pub async fn recv(self) -> Option, BroadcastStreamRecvError>> { let mut event_stream = self.into_stream(); event_stream.next().await } /// Returns a new stream that yields all built payloads. - pub fn into_built_payload_stream(self) -> BuiltPayloadStream { + pub fn into_built_payload_stream(self) -> BuiltPayloadStream { BuiltPayloadStream { st: self.into_stream() } } /// Returns a new stream that yields received payload attributes - pub fn into_attributes_stream(self) -> PayloadAttributeStream { + pub fn into_attributes_stream(self) -> PayloadAttributeStream { PayloadAttributeStream { st: self.into_stream() } } } diff --git a/crates/payload/builder/src/noop.rs b/crates/payload/builder/src/noop.rs index 91ab907321..3fe036cc1b 100644 --- a/crates/payload/builder/src/noop.rs +++ b/crates/payload/builder/src/noop.rs @@ -13,17 +13,17 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// A service task that does not build any payloads. #[derive(Debug)] -pub struct NoopPayloadBuilderService { +pub struct NoopPayloadBuilderService { /// Receiver half of the command channel. - command_rx: UnboundedReceiverStream>, + command_rx: UnboundedReceiverStream>, } -impl NoopPayloadBuilderService +impl NoopPayloadBuilderService where - Engine: PayloadTypes + 'static, + T: PayloadTypes + 'static, { /// Creates a new [`NoopPayloadBuilderService`]. - pub fn new() -> (Self, PayloadBuilderHandle) { + pub fn new() -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); ( Self { command_rx: UnboundedReceiverStream::new(command_rx) }, @@ -32,9 +32,9 @@ where } } -impl Future for NoopPayloadBuilderService +impl Future for NoopPayloadBuilderService where - Engine: PayloadTypes, + T: PayloadTypes, { type Output = (); diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 8946cd5875..47a7f82411 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -31,15 +31,15 @@ type PayloadFuture

= Pin { - inner: PayloadBuilderHandle, +pub struct PayloadStore { + inner: PayloadBuilderHandle, } // === impl PayloadStore === -impl PayloadStore +impl PayloadStore where - Engine: PayloadTypes + 'static, + T: PayloadTypes + 'static, { /// Resolves the payload job and returns the best payload that has been built so far. /// @@ -48,7 +48,7 @@ where pub async fn resolve( &self, id: PayloadId, - ) -> Option> { + ) -> Option> { self.inner.resolve(id).await } @@ -58,7 +58,7 @@ where pub async fn best_payload( &self, id: PayloadId, - ) -> Option> { + ) -> Option> { self.inner.best_payload(id).await } @@ -68,25 +68,25 @@ where pub async fn payload_attributes( &self, id: PayloadId, - ) -> Option> { + ) -> Option> { self.inner.payload_attributes(id).await } } -impl Clone for PayloadStore +impl Clone for PayloadStore where - Engine: PayloadTypes, + T: PayloadTypes, { fn clone(&self) -> Self { Self { inner: self.inner.clone() } } } -impl From> for PayloadStore +impl From> for PayloadStore where - Engine: PayloadTypes, + T: PayloadTypes, { - fn from(inner: PayloadBuilderHandle) -> Self { + fn from(inner: PayloadBuilderHandle) -> Self { Self { inner } } } @@ -95,22 +95,22 @@ where /// /// This is the API used to create new payloads and to get the current state of existing ones. #[derive(Debug)] -pub struct PayloadBuilderHandle { +pub struct PayloadBuilderHandle { /// Sender half of the message channel to the [`PayloadBuilderService`]. - to_service: mpsc::UnboundedSender>, + to_service: mpsc::UnboundedSender>, } // === impl PayloadBuilderHandle === -impl PayloadBuilderHandle +impl PayloadBuilderHandle where - Engine: PayloadTypes + 'static, + T: PayloadTypes + 'static, { /// Creates a new payload builder handle for the given channel. /// /// Note: this is only used internally by the [`PayloadBuilderService`] to manage the payload /// building flow See [`PayloadBuilderService::poll`] for implementation details. - pub const fn new(to_service: mpsc::UnboundedSender>) -> Self { + pub const fn new(to_service: mpsc::UnboundedSender>) -> Self { Self { to_service } } @@ -118,10 +118,7 @@ where /// /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the /// job, See [`PayloadJob::resolve`]. - async fn resolve( - &self, - id: PayloadId, - ) -> Option> { + async fn resolve(&self, id: PayloadId) -> Option> { let (tx, rx) = oneshot::channel(); self.to_service.send(PayloadServiceCommand::Resolve(id, tx)).ok()?; match rx.await.transpose()? { @@ -136,7 +133,7 @@ where pub async fn best_payload( &self, id: PayloadId, - ) -> Option> { + ) -> Option> { let (tx, rx) = oneshot::channel(); self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?; rx.await.ok()? @@ -148,7 +145,7 @@ where async fn payload_attributes( &self, id: PayloadId, - ) -> Option> { + ) -> Option> { let (tx, rx) = oneshot::channel(); self.to_service.send(PayloadServiceCommand::PayloadAttributes(id, tx)).ok()?; rx.await.ok()? @@ -160,7 +157,7 @@ where /// and returns the receiver instead pub fn send_new_payload( &self, - attr: Engine::PayloadBuilderAttributes, + attr: T::PayloadBuilderAttributes, ) -> oneshot::Receiver> { let (tx, rx) = oneshot::channel(); let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx)); @@ -174,23 +171,23 @@ where /// Note: if there's already payload in progress with same identifier, it will be returned. pub async fn new_payload( &self, - attr: Engine::PayloadBuilderAttributes, + attr: T::PayloadBuilderAttributes, ) -> Result { self.send_new_payload(attr).await? } /// Sends a message to the service to subscribe to payload events. /// Returns a receiver that will receive them. - pub async fn subscribe(&self) -> Result, RecvError> { + pub async fn subscribe(&self) -> Result, RecvError> { let (tx, rx) = oneshot::channel(); let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx)); Ok(PayloadEvents { receiver: rx.await? }) } } -impl Clone for PayloadBuilderHandle +impl Clone for PayloadBuilderHandle where - Engine: PayloadTypes, + T: PayloadTypes, { fn clone(&self) -> Self { Self { to_service: self.to_service.clone() } @@ -207,38 +204,38 @@ where /// does know nothing about how to build them, it just drives their jobs to completion. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PayloadBuilderService +pub struct PayloadBuilderService where - Engine: PayloadTypes, + T: PayloadTypes, Gen: PayloadJobGenerator, - Gen::Job: PayloadJob, + Gen::Job: PayloadJob, { /// The type that knows how to create new payloads. generator: Gen, /// All active payload jobs. payload_jobs: Vec<(Gen::Job, PayloadId)>, /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand. - service_tx: mpsc::UnboundedSender>, + service_tx: mpsc::UnboundedSender>, /// Receiver half of the command channel. - command_rx: UnboundedReceiverStream>, + command_rx: UnboundedReceiverStream>, /// Metrics for the payload builder service metrics: PayloadBuilderServiceMetrics, /// Chain events notification stream chain_events: St, /// Payload events handler, used to broadcast and subscribe to payload events. - payload_events: broadcast::Sender>, + payload_events: broadcast::Sender>, } const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20; // === impl PayloadBuilderService === -impl PayloadBuilderService +impl PayloadBuilderService where - Engine: PayloadTypes + 'static, + T: PayloadTypes + 'static, Gen: PayloadJobGenerator, - Gen::Job: PayloadJob, - ::BuiltPayload: Into, + Gen::Job: PayloadJob, + ::BuiltPayload: Into, { /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact /// with it. @@ -246,7 +243,7 @@ where /// This also takes a stream of chain events that will be forwarded to the generator to apply /// additional logic when new state is committed. See also /// [`PayloadJobGenerator::on_new_state`]. - pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) { + pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE); @@ -265,7 +262,7 @@ where } /// Returns a handle to the service. - pub fn handle(&self) -> PayloadBuilderHandle { + pub fn handle(&self) -> PayloadBuilderHandle { PayloadBuilderHandle::new(self.service_tx.clone()) } @@ -275,10 +272,7 @@ where } /// Returns the best payload for the given identifier that has been built so far. - fn best_payload( - &self, - id: PayloadId, - ) -> Option> { + fn best_payload(&self, id: PayloadId) -> Option> { let res = self .payload_jobs .iter() @@ -293,7 +287,7 @@ where /// Returns the best payload for the given identifier that has been built so far and terminates /// the job if requested. - fn resolve(&mut self, id: PayloadId) -> Option> { + fn resolve(&mut self, id: PayloadId) -> Option> { trace!(%id, "resolving payload job"); let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?; @@ -324,12 +318,12 @@ where } } -impl PayloadBuilderService +impl PayloadBuilderService where - Engine: PayloadTypes, + T: PayloadTypes, Gen: PayloadJobGenerator, - Gen::Job: PayloadJob, - ::BuiltPayload: Into, + Gen::Job: PayloadJob, + ::BuiltPayload: Into, { /// Returns the payload attributes for the given payload. fn payload_attributes( @@ -350,14 +344,14 @@ where } } -impl Future for PayloadBuilderService +impl Future for PayloadBuilderService where - Engine: PayloadTypes + 'static, + T: PayloadTypes + 'static, Gen: PayloadJobGenerator + Unpin + 'static, ::Job: Unpin + 'static, St: Stream + Send + Unpin + 'static, - Gen::Job: PayloadJob, - ::BuiltPayload: Into, + Gen::Job: PayloadJob, + ::BuiltPayload: Into, { type Output = (); @@ -452,31 +446,28 @@ where } /// Message type for the [`PayloadBuilderService`]. -pub enum PayloadServiceCommand { +pub enum PayloadServiceCommand { /// Start building a new payload. BuildNewPayload( - Engine::PayloadBuilderAttributes, + T::PayloadBuilderAttributes, oneshot::Sender>, ), /// Get the best payload so far - BestPayload( - PayloadId, - oneshot::Sender>>, - ), + BestPayload(PayloadId, oneshot::Sender>>), /// Get the payload attributes for the given payload PayloadAttributes( PayloadId, - oneshot::Sender>>, + oneshot::Sender>>, ), /// Resolve the payload and return the payload - Resolve(PayloadId, oneshot::Sender>>), + Resolve(PayloadId, oneshot::Sender>>), /// Payload service events - Subscribe(oneshot::Sender>>), + Subscribe(oneshot::Sender>>), } -impl fmt::Debug for PayloadServiceCommand +impl fmt::Debug for PayloadServiceCommand where - Engine: PayloadTypes, + T: PayloadTypes, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { diff --git a/crates/payload/builder/src/test_utils.rs b/crates/payload/builder/src/test_utils.rs index c854fce738..c94ca2cc05 100644 --- a/crates/payload/builder/src/test_utils.rs +++ b/crates/payload/builder/src/test_utils.rs @@ -15,16 +15,16 @@ use std::{ }; /// Creates a new [`PayloadBuilderService`] for testing purposes. -pub fn test_payload_service() -> ( +pub fn test_payload_service() -> ( PayloadBuilderService< TestPayloadJobGenerator, futures_util::stream::Empty, - Engine, + T, >, - PayloadBuilderHandle, + PayloadBuilderHandle, ) where - Engine: PayloadTypes< + T: PayloadTypes< PayloadBuilderAttributes = EthPayloadBuilderAttributes, BuiltPayload = EthBuiltPayload, > + 'static, @@ -33,9 +33,9 @@ where } /// Creates a new [`PayloadBuilderService`] for testing purposes and spawns it in the background. -pub fn spawn_test_payload_service() -> PayloadBuilderHandle +pub fn spawn_test_payload_service() -> PayloadBuilderHandle where - Engine: PayloadTypes< + T: PayloadTypes< PayloadBuilderAttributes = EthPayloadBuilderAttributes, BuiltPayload = EthBuiltPayload, > + 'static,