Update generic name in payload crate (#10669)

This commit is contained in:
malik
2024-09-03 12:44:30 +01:00
committed by GitHub
parent 5fc77551dd
commit 020597da32
4 changed files with 78 additions and 87 deletions

View File

@@ -12,41 +12,41 @@ use tracing::debug;
/// Payload builder events.
#[derive(Clone, Debug)]
pub enum Events<Engine: PayloadTypes> {
pub enum Events<T: PayloadTypes> {
/// The payload attributes as
/// they are received from the CL through the engine api.
Attributes(Engine::PayloadBuilderAttributes),
Attributes(T::PayloadBuilderAttributes),
/// The built payload that has been just built.
/// Triggered by the CL whenever it asks for an execution payload.
/// This event is only thrown if the CL is a validator.
BuiltPayload(Engine::BuiltPayload),
BuiltPayload(T::BuiltPayload),
}
/// Represents a receiver for various payload events.
#[derive(Debug)]
pub struct PayloadEvents<Engine: PayloadTypes> {
pub struct PayloadEvents<T: PayloadTypes> {
/// The receiver for the payload events.
pub receiver: broadcast::Receiver<Events<Engine>>,
pub receiver: broadcast::Receiver<Events<T>>,
}
impl<Engine: PayloadTypes + 'static> PayloadEvents<Engine> {
impl<T: PayloadTypes + 'static> PayloadEvents<T> {
/// Convert this receiver into a stream of `PayloadEvents`.
pub fn into_stream(self) -> BroadcastStream<Events<Engine>> {
pub fn into_stream(self) -> BroadcastStream<Events<T>> {
BroadcastStream::new(self.receiver)
}
/// Asynchronously receives the next payload event.
pub async fn recv(self) -> Option<Result<Events<Engine>, BroadcastStreamRecvError>> {
pub async fn recv(self) -> Option<Result<Events<T>, BroadcastStreamRecvError>> {
let mut event_stream = self.into_stream();
event_stream.next().await
}
/// Returns a new stream that yields all built payloads.
pub fn into_built_payload_stream(self) -> BuiltPayloadStream<Engine> {
pub fn into_built_payload_stream(self) -> BuiltPayloadStream<T> {
BuiltPayloadStream { st: self.into_stream() }
}
/// Returns a new stream that yields received payload attributes
pub fn into_attributes_stream(self) -> PayloadAttributeStream<Engine> {
pub fn into_attributes_stream(self) -> PayloadAttributeStream<T> {
PayloadAttributeStream { st: self.into_stream() }
}
}

View File

@@ -13,17 +13,17 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// A service task that does not build any payloads.
#[derive(Debug)]
pub struct NoopPayloadBuilderService<Engine: PayloadTypes> {
pub struct NoopPayloadBuilderService<T: PayloadTypes> {
/// Receiver half of the command channel.
command_rx: UnboundedReceiverStream<PayloadServiceCommand<Engine>>,
command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
}
impl<Engine> NoopPayloadBuilderService<Engine>
impl<T> NoopPayloadBuilderService<T>
where
Engine: PayloadTypes + 'static,
T: PayloadTypes + 'static,
{
/// Creates a new [`NoopPayloadBuilderService`].
pub fn new() -> (Self, PayloadBuilderHandle<Engine>) {
pub fn new() -> (Self, PayloadBuilderHandle<T>) {
let (service_tx, command_rx) = mpsc::unbounded_channel();
(
Self { command_rx: UnboundedReceiverStream::new(command_rx) },
@@ -32,9 +32,9 @@ where
}
}
impl<Engine> Future for NoopPayloadBuilderService<Engine>
impl<T> Future for NoopPayloadBuilderService<T>
where
Engine: PayloadTypes,
T: PayloadTypes,
{
type Output = ();

View File

@@ -31,15 +31,15 @@ type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderErro
/// A communication channel to the [`PayloadBuilderService`] that can retrieve payloads.
#[derive(Debug)]
pub struct PayloadStore<Engine: PayloadTypes> {
inner: PayloadBuilderHandle<Engine>,
pub struct PayloadStore<T: PayloadTypes> {
inner: PayloadBuilderHandle<T>,
}
// === impl PayloadStore ===
impl<Engine> PayloadStore<Engine>
impl<T> PayloadStore<T>
where
Engine: PayloadTypes + 'static,
T: PayloadTypes + 'static,
{
/// Resolves the payload job and returns the best payload that has been built so far.
///
@@ -48,7 +48,7 @@ where
pub async fn resolve(
&self,
id: PayloadId,
) -> Option<Result<Engine::BuiltPayload, PayloadBuilderError>> {
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.inner.resolve(id).await
}
@@ -58,7 +58,7 @@ where
pub async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<Engine::BuiltPayload, PayloadBuilderError>> {
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.inner.best_payload(id).await
}
@@ -68,25 +68,25 @@ where
pub async fn payload_attributes(
&self,
id: PayloadId,
) -> Option<Result<Engine::PayloadBuilderAttributes, PayloadBuilderError>> {
) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
self.inner.payload_attributes(id).await
}
}
impl<Engine> Clone for PayloadStore<Engine>
impl<T> Clone for PayloadStore<T>
where
Engine: PayloadTypes,
T: PayloadTypes,
{
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl<Engine> From<PayloadBuilderHandle<Engine>> for PayloadStore<Engine>
impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
where
Engine: PayloadTypes,
T: PayloadTypes,
{
fn from(inner: PayloadBuilderHandle<Engine>) -> Self {
fn from(inner: PayloadBuilderHandle<T>) -> Self {
Self { inner }
}
}
@@ -95,22 +95,22 @@ where
///
/// This is the API used to create new payloads and to get the current state of existing ones.
#[derive(Debug)]
pub struct PayloadBuilderHandle<Engine: PayloadTypes> {
pub struct PayloadBuilderHandle<T: PayloadTypes> {
/// Sender half of the message channel to the [`PayloadBuilderService`].
to_service: mpsc::UnboundedSender<PayloadServiceCommand<Engine>>,
to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
}
// === impl PayloadBuilderHandle ===
impl<Engine> PayloadBuilderHandle<Engine>
impl<T> PayloadBuilderHandle<T>
where
Engine: PayloadTypes + 'static,
T: PayloadTypes + 'static,
{
/// Creates a new payload builder handle for the given channel.
///
/// Note: this is only used internally by the [`PayloadBuilderService`] to manage the payload
/// building flow See [`PayloadBuilderService::poll`] for implementation details.
pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<Engine>>) -> Self {
pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
Self { to_service }
}
@@ -118,10 +118,7 @@ where
///
/// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
/// job, See [`PayloadJob::resolve`].
async fn resolve(
&self,
id: PayloadId,
) -> Option<Result<Engine::BuiltPayload, PayloadBuilderError>> {
async fn resolve(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::Resolve(id, tx)).ok()?;
match rx.await.transpose()? {
@@ -136,7 +133,7 @@ where
pub async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<Engine::BuiltPayload, PayloadBuilderError>> {
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
rx.await.ok()?
@@ -148,7 +145,7 @@ where
async fn payload_attributes(
&self,
id: PayloadId,
) -> Option<Result<Engine::PayloadBuilderAttributes, PayloadBuilderError>> {
) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::PayloadAttributes(id, tx)).ok()?;
rx.await.ok()?
@@ -160,7 +157,7 @@ where
/// and returns the receiver instead
pub fn send_new_payload(
&self,
attr: Engine::PayloadBuilderAttributes,
attr: T::PayloadBuilderAttributes,
) -> oneshot::Receiver<Result<PayloadId, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
@@ -174,23 +171,23 @@ where
/// Note: if there's already payload in progress with same identifier, it will be returned.
pub async fn new_payload(
&self,
attr: Engine::PayloadBuilderAttributes,
attr: T::PayloadBuilderAttributes,
) -> Result<PayloadId, PayloadBuilderError> {
self.send_new_payload(attr).await?
}
/// Sends a message to the service to subscribe to payload events.
/// Returns a receiver that will receive them.
pub async fn subscribe(&self) -> Result<PayloadEvents<Engine>, RecvError> {
pub async fn subscribe(&self) -> Result<PayloadEvents<T>, RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
Ok(PayloadEvents { receiver: rx.await? })
}
}
impl<Engine> Clone for PayloadBuilderHandle<Engine>
impl<T> Clone for PayloadBuilderHandle<T>
where
Engine: PayloadTypes,
T: PayloadTypes,
{
fn clone(&self) -> Self {
Self { to_service: self.to_service.clone() }
@@ -207,38 +204,38 @@ where
/// does know nothing about how to build them, it just drives their jobs to completion.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PayloadBuilderService<Gen, St, Engine>
pub struct PayloadBuilderService<Gen, St, T>
where
Engine: PayloadTypes,
T: PayloadTypes,
Gen: PayloadJobGenerator,
Gen::Job: PayloadJob<PayloadAttributes = Engine::PayloadBuilderAttributes>,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
{
/// The type that knows how to create new payloads.
generator: Gen,
/// All active payload jobs.
payload_jobs: Vec<(Gen::Job, PayloadId)>,
/// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
service_tx: mpsc::UnboundedSender<PayloadServiceCommand<Engine>>,
service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
/// Receiver half of the command channel.
command_rx: UnboundedReceiverStream<PayloadServiceCommand<Engine>>,
command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
/// Metrics for the payload builder service
metrics: PayloadBuilderServiceMetrics,
/// Chain events notification stream
chain_events: St,
/// Payload events handler, used to broadcast and subscribe to payload events.
payload_events: broadcast::Sender<Events<Engine>>,
payload_events: broadcast::Sender<Events<T>>,
}
const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
// === impl PayloadBuilderService ===
impl<Gen, St, Engine> PayloadBuilderService<Gen, St, Engine>
impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
where
Engine: PayloadTypes + 'static,
T: PayloadTypes + 'static,
Gen: PayloadJobGenerator,
Gen::Job: PayloadJob<PayloadAttributes = Engine::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<Engine::BuiltPayload>,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
{
/// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
/// with it.
@@ -246,7 +243,7 @@ where
/// This also takes a stream of chain events that will be forwarded to the generator to apply
/// additional logic when new state is committed. See also
/// [`PayloadJobGenerator::on_new_state`].
pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<Engine>) {
pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
let (service_tx, command_rx) = mpsc::unbounded_channel();
let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
@@ -265,7 +262,7 @@ where
}
/// Returns a handle to the service.
pub fn handle(&self) -> PayloadBuilderHandle<Engine> {
pub fn handle(&self) -> PayloadBuilderHandle<T> {
PayloadBuilderHandle::new(self.service_tx.clone())
}
@@ -275,10 +272,7 @@ where
}
/// Returns the best payload for the given identifier that has been built so far.
fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<Engine::BuiltPayload, PayloadBuilderError>> {
fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let res = self
.payload_jobs
.iter()
@@ -293,7 +287,7 @@ where
/// Returns the best payload for the given identifier that has been built so far and terminates
/// the job if requested.
fn resolve(&mut self, id: PayloadId) -> Option<PayloadFuture<Engine::BuiltPayload>> {
fn resolve(&mut self, id: PayloadId) -> Option<PayloadFuture<T::BuiltPayload>> {
trace!(%id, "resolving payload job");
let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
@@ -324,12 +318,12 @@ where
}
}
impl<Gen, St, Engine> PayloadBuilderService<Gen, St, Engine>
impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
where
Engine: PayloadTypes,
T: PayloadTypes,
Gen: PayloadJobGenerator,
Gen::Job: PayloadJob<PayloadAttributes = Engine::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<Engine::BuiltPayload>,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
{
/// Returns the payload attributes for the given payload.
fn payload_attributes(
@@ -350,14 +344,14 @@ where
}
}
impl<Gen, St, Engine> Future for PayloadBuilderService<Gen, St, Engine>
impl<Gen, St, T> Future for PayloadBuilderService<Gen, St, T>
where
Engine: PayloadTypes + 'static,
T: PayloadTypes + 'static,
Gen: PayloadJobGenerator + Unpin + 'static,
<Gen as PayloadJobGenerator>::Job: Unpin + 'static,
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
Gen::Job: PayloadJob<PayloadAttributes = Engine::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<Engine::BuiltPayload>,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
{
type Output = ();
@@ -452,31 +446,28 @@ where
}
/// Message type for the [`PayloadBuilderService`].
pub enum PayloadServiceCommand<Engine: PayloadTypes> {
pub enum PayloadServiceCommand<T: PayloadTypes> {
/// Start building a new payload.
BuildNewPayload(
Engine::PayloadBuilderAttributes,
T::PayloadBuilderAttributes,
oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
),
/// Get the best payload so far
BestPayload(
PayloadId,
oneshot::Sender<Option<Result<Engine::BuiltPayload, PayloadBuilderError>>>,
),
BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
/// Get the payload attributes for the given payload
PayloadAttributes(
PayloadId,
oneshot::Sender<Option<Result<Engine::PayloadBuilderAttributes, PayloadBuilderError>>>,
oneshot::Sender<Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>>>,
),
/// Resolve the payload and return the payload
Resolve(PayloadId, oneshot::Sender<Option<PayloadFuture<Engine::BuiltPayload>>>),
Resolve(PayloadId, oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>),
/// Payload service events
Subscribe(oneshot::Sender<broadcast::Receiver<Events<Engine>>>),
Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
}
impl<Engine> fmt::Debug for PayloadServiceCommand<Engine>
impl<T> fmt::Debug for PayloadServiceCommand<T>
where
Engine: PayloadTypes,
T: PayloadTypes,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {

View File

@@ -15,16 +15,16 @@ use std::{
};
/// Creates a new [`PayloadBuilderService`] for testing purposes.
pub fn test_payload_service<Engine>() -> (
pub fn test_payload_service<T>() -> (
PayloadBuilderService<
TestPayloadJobGenerator,
futures_util::stream::Empty<CanonStateNotification>,
Engine,
T,
>,
PayloadBuilderHandle<Engine>,
PayloadBuilderHandle<T>,
)
where
Engine: PayloadTypes<
T: PayloadTypes<
PayloadBuilderAttributes = EthPayloadBuilderAttributes,
BuiltPayload = EthBuiltPayload,
> + 'static,
@@ -33,9 +33,9 @@ where
}
/// Creates a new [`PayloadBuilderService`] for testing purposes and spawns it in the background.
pub fn spawn_test_payload_service<Engine>() -> PayloadBuilderHandle<Engine>
pub fn spawn_test_payload_service<T>() -> PayloadBuilderHandle<T>
where
Engine: PayloadTypes<
T: PayloadTypes<
PayloadBuilderAttributes = EthPayloadBuilderAttributes,
BuiltPayload = EthBuiltPayload,
> + 'static,