mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-10 23:15:34 -05:00
114 lines
3.5 KiB
Rust
114 lines
3.5 KiB
Rust
//! Example of how to subscribe to beacon chain events via SSE.
|
|
//!
|
|
//! See also [ethereum-beacon-API eventstream](https://ethereum.github.io/beacon-APIs/#/Events/eventstream)
|
|
//!
|
|
//! Run with
|
|
//!
|
|
//! ```not_rust
|
|
//! cargo run -p beacon-api-sse -- node
|
|
//! ```
|
|
//!
|
|
//! This launches a regular reth instance and subscribes to payload attributes event stream.
|
|
//!
|
|
//! **NOTE**: This expects that the CL client is running an http server on `localhost:5052` and is
|
|
//! configured to emit payload attributes events.
|
|
//!
|
|
//! See lighthouse beacon Node API: <https://lighthouse-book.sigmaprime.io/api-bn.html#beacon-node-api>
|
|
use clap::Parser;
|
|
use futures_util::stream::StreamExt;
|
|
use mev_share_sse::{client::EventStream, EventClient};
|
|
use reth::{
|
|
cli::{
|
|
components::RethNodeComponents,
|
|
ext::{RethCliExt, RethNodeCommandConfig},
|
|
Cli,
|
|
},
|
|
rpc::types::beacon::events::PayloadAttributesEvent,
|
|
tasks::TaskSpawner,
|
|
};
|
|
use std::net::{IpAddr, Ipv4Addr};
|
|
use tracing::{info, warn};
|
|
|
|
fn main() {
|
|
Cli::<BeaconEventsExt>::parse().run().unwrap();
|
|
}
|
|
|
|
/// The type that tells the reth CLI what extensions to use
|
|
#[derive(Debug, Default)]
|
|
#[non_exhaustive]
|
|
struct BeaconEventsExt;
|
|
|
|
impl RethCliExt for BeaconEventsExt {
|
|
/// This tells the reth CLI to install additional CLI arguments
|
|
type Node = BeaconEventsConfig;
|
|
}
|
|
|
|
/// Our custom cli args extension that adds one flag to reth default CLI.
|
|
#[derive(Debug, Clone, clap::Parser)]
|
|
struct BeaconEventsConfig {
|
|
/// Beacon Node http server address
|
|
#[arg(long = "cl.addr", default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))]
|
|
pub cl_addr: IpAddr,
|
|
/// Beacon Node http server port to listen on
|
|
#[arg(long = "cl.port", default_value_t = 5052)]
|
|
pub cl_port: u16,
|
|
}
|
|
|
|
impl BeaconEventsConfig {
|
|
/// Returns the http url of the beacon node
|
|
pub fn http_base_url(&self) -> String {
|
|
format!("http://{}:{}", self.cl_addr, self.cl_port)
|
|
}
|
|
|
|
/// Returns the URL to the events endpoint
|
|
pub fn events_url(&self) -> String {
|
|
format!("{}/eth/v1/events", self.http_base_url())
|
|
}
|
|
|
|
/// Service that subscribes to beacon chain payload attributes events
|
|
async fn run(self) {
|
|
let client = EventClient::default();
|
|
|
|
let mut subscription = self.new_payload_attributes_subscription(&client).await;
|
|
|
|
while let Some(event) = subscription.next().await {
|
|
info!("Received payload attributes: {:?}", event);
|
|
}
|
|
}
|
|
|
|
// It can take a bit until the CL endpoint is live so we retry a few times
|
|
async fn new_payload_attributes_subscription(
|
|
&self,
|
|
client: &EventClient,
|
|
) -> EventStream<PayloadAttributesEvent> {
|
|
let payloads_url = format!("{}?topics=payload_attributes", self.events_url());
|
|
loop {
|
|
match client.subscribe(&payloads_url).await {
|
|
Ok(subscription) => return subscription,
|
|
Err(err) => {
|
|
warn!("Failed to subscribe to payload attributes events: {:?}\nRetrying in 5 seconds...", err);
|
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl RethNodeCommandConfig for BeaconEventsConfig {
|
|
fn on_node_started<Reth: RethNodeComponents>(&mut self, components: &Reth) -> eyre::Result<()> {
|
|
components.task_executor().spawn(Box::pin(self.clone().run()));
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn parse_config() {
|
|
let args = BeaconEventsConfig::try_parse_from(["reth"]);
|
|
assert!(args.is_ok());
|
|
}
|
|
}
|