Payload Events (#6549)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Luca Provini
2024-02-13 15:43:40 +01:00
committed by GitHub
parent 7917279b68
commit 933da735f8
7 changed files with 100 additions and 13 deletions

View File

@@ -16,7 +16,7 @@ pub mod payload;
pub use payload::PayloadOrAttributes;
/// The types that are used by the engine.
pub trait EngineTypes: serde::de::DeserializeOwned + Send + Sync {
pub trait EngineTypes: serde::de::DeserializeOwned + Send + Sync + Clone {
/// The RPC payload attributes type the CL node emits via the engine API.
type PayloadAttributes: PayloadAttributes + Unpin;

View File

@@ -0,0 +1,39 @@
use futures_util::Stream;
use reth_node_api::EngineTypes;
use tokio::sync::broadcast;
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
};
/// Payload builder events.
#[derive(Clone, Debug)]
pub enum Events<Engine: EngineTypes> {
/// The payload attributes as
/// they are received from the CL through the engine api.
Attributes(Engine::PayloadBuilderAttributes),
/// The built payload that has been just built.
/// Triggered by the CL whenever it asks for an execution payload.
/// This event is only thrown if the CL is a validator.
BuiltPayload(Engine::BuiltPayload),
}
/// Represents a receiver for various payload events.
#[derive(Debug)]
pub struct PayloadEvents<Engine: EngineTypes> {
pub receiver: broadcast::Receiver<Events<Engine>>,
}
impl<Engine: EngineTypes + 'static> PayloadEvents<Engine> {
// Convert this receiver into a stream of PayloadEvents.
pub fn into_stream(
self,
) -> impl Stream<Item = Result<Events<Engine>, BroadcastStreamRecvError>> {
BroadcastStream::new(self.receiver)
}
/// Asynchronously receives the next payload event.
pub async fn recv(self) -> Option<Result<Events<Engine>, BroadcastStreamRecvError>> {
let mut event_stream = self.into_stream();
event_stream.next().await
}
}

View File

@@ -102,6 +102,7 @@
pub mod database;
pub mod error;
mod events;
mod metrics;
mod optimism;
mod payload;

View File

@@ -20,7 +20,7 @@ pub struct NoopPayloadBuilderService<Engine: EngineTypes> {
impl<Engine> NoopPayloadBuilderService<Engine>
where
Engine: EngineTypes,
Engine: EngineTypes + 'static,
{
/// Creates a new [NoopPayloadBuilderService].
pub fn new() -> (Self, PayloadBuilderHandle<Engine>) {
@@ -52,6 +52,7 @@ where
PayloadServiceCommand::BestPayload(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::PayloadAttributes(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::Resolve(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::Subscribe(_) => None,
};
}
}

View File

@@ -4,7 +4,10 @@
//! Once a new payload is created, it is continuously updated.
use crate::{
error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator,
error::PayloadBuilderError,
events::{Events, PayloadEvents},
metrics::PayloadBuilderServiceMetrics,
traits::PayloadJobGenerator,
KeepPayloadJobAlive, PayloadJob,
};
use futures_util::{future::FutureExt, Stream, StreamExt};
@@ -17,7 +20,10 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{
broadcast, mpsc,
oneshot::{self, error::RecvError},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, info, trace, warn};
@@ -33,7 +39,7 @@ pub struct PayloadStore<Engine: EngineTypes> {
impl<Engine> PayloadStore<Engine>
where
Engine: EngineTypes,
Engine: EngineTypes + 'static,
{
/// Resolves the payload job and returns the best payload that has been built so far.
///
@@ -98,7 +104,7 @@ pub struct PayloadBuilderHandle<Engine: EngineTypes> {
impl<Engine> PayloadBuilderHandle<Engine>
where
Engine: EngineTypes,
Engine: EngineTypes + 'static,
{
/// Creates a new payload builder handle for the given channel.
///
@@ -170,6 +176,14 @@ where
) -> Result<PayloadId, PayloadBuilderError> {
self.send_new_payload(attr).await?
}
/// Sends a message to the service to subscribe to payload events.
/// Returns a receiver that will receive them.
pub async fn subscribe(&self) -> Result<PayloadEvents<Engine>, RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
Ok(PayloadEvents { receiver: rx.await? })
}
}
impl<Engine> Clone for PayloadBuilderHandle<Engine>
@@ -209,13 +223,17 @@ where
metrics: PayloadBuilderServiceMetrics,
/// Chain events notification stream
chain_events: St,
/// Payload events handler, used to broadcast and subscribe to payload events.
payload_events: broadcast::Sender<Events<Engine>>,
}
const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
// === impl PayloadBuilderService ===
impl<Gen, St, Engine> PayloadBuilderService<Gen, St, Engine>
where
Engine: EngineTypes,
Engine: EngineTypes + 'static,
Gen: PayloadJobGenerator,
Gen::Job: PayloadJob<PayloadAttributes = Engine::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<Engine::BuiltPayload>,
@@ -227,6 +245,8 @@ where
/// additional logic when new state is committed. See also [PayloadJobGenerator::on_new_state].
pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<Engine>) {
let (service_tx, command_rx) = mpsc::unbounded_channel();
let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
let service = Self {
generator,
payload_jobs: Vec::new(),
@@ -234,12 +254,25 @@ where
command_rx: UnboundedReceiverStream::new(command_rx),
metrics: Default::default(),
chain_events,
payload_events,
};
let handle = service.handle();
(service, handle)
}
/// Notifies the service on new attribute event.
pub fn on_new_attributes(
&self,
attributes: &Option<
Result<<Engine as EngineTypes>::PayloadBuilderAttributes, PayloadBuilderError>,
>,
) {
if let Some(Ok(ref attributes)) = attributes {
self.payload_events.send(Events::Attributes(attributes.clone())).ok();
}
}
/// Returns a handle to the service.
pub fn handle(&self) -> PayloadBuilderHandle<Engine> {
PayloadBuilderHandle::new(self.service_tx.clone())
@@ -283,9 +316,13 @@ where
// Since the fees will not be known until the payload future is resolved / awaited, we wrap
// the future in a new future that will update the metrics.
let resolved_metrics = self.metrics.clone();
let payload_events = self.payload_events.clone();
let fut = async move {
let res = fut.await;
if let Ok(ref payload) = res {
payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
resolved_metrics
.set_resolved_revenue(payload.block().number, f64::from(payload.fees()));
}
@@ -324,7 +361,7 @@ where
impl<Gen, St, Engine> Future for PayloadBuilderService<Gen, St, Engine>
where
Engine: EngineTypes,
Engine: EngineTypes + 'static,
Gen: PayloadJobGenerator + Unpin + 'static,
<Gen as PayloadJobGenerator>::Job: Unpin + 'static,
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
@@ -402,11 +439,17 @@ where
let _ = tx.send(this.best_payload(id));
}
PayloadServiceCommand::PayloadAttributes(id, tx) => {
let _ = tx.send(this.payload_attributes(id));
let attributes = this.payload_attributes(id);
this.on_new_attributes(&attributes);
let _ = tx.send(attributes);
}
PayloadServiceCommand::Resolve(id, tx) => {
let _ = tx.send(this.resolve(id));
}
PayloadServiceCommand::Subscribe(tx) => {
let new_rx = this.payload_events.subscribe();
let _ = tx.send(new_rx);
}
}
}
@@ -436,6 +479,8 @@ pub enum PayloadServiceCommand<Engine: EngineTypes> {
),
/// Resolve the payload and return the payload
Resolve(PayloadId, oneshot::Sender<Option<PayloadFuture<Engine::BuiltPayload>>>),
/// Payload service events
Subscribe(oneshot::Sender<broadcast::Receiver<Events<Engine>>>),
}
impl<Engine> fmt::Debug for PayloadServiceCommand<Engine>
@@ -454,6 +499,7 @@ where
f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
}
PayloadServiceCommand::Resolve(f0, _f1) => f.debug_tuple("Resolve").field(&f0).finish(),
PayloadServiceCommand::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
}
}
}

View File

@@ -25,9 +25,9 @@ pub fn test_payload_service<Engine>() -> (
)
where
Engine: EngineTypes<
PayloadBuilderAttributes = EthPayloadBuilderAttributes,
BuiltPayload = EthBuiltPayload,
>,
PayloadBuilderAttributes = EthPayloadBuilderAttributes,
BuiltPayload = EthBuiltPayload,
> + 'static,
{
PayloadBuilderService::new(Default::default(), futures_util::stream::empty())
}

View File

@@ -25,7 +25,7 @@ pub trait PayloadJob: Future<Output = Result<(), PayloadBuilderError>> + Send +
+ Sync
+ 'static;
/// Represents the built payload type that is returned to the CL.
type BuiltPayload: BuiltPayload + std::fmt::Debug;
type BuiltPayload: BuiltPayload + Clone + std::fmt::Debug;
/// Returns the best payload that has been built so far.
///