From ad37490e7dd151d5ac3767099bb4f485e86f603c Mon Sep 17 00:00:00 2001 From: stevencartavia <112043913+stevencartavia@users.noreply.github.com> Date: Wed, 24 Dec 2025 01:56:26 -0600 Subject: [PATCH] feat: integrate newPayload into ethstats (#20584) Co-authored-by: Matthias Seitz --- crates/node/builder/src/launch/common.rs | 40 ++++++++++++++- crates/node/builder/src/launch/engine.rs | 4 +- crates/node/ethstats/src/ethstats.rs | 42 ++++++++++++++-- crates/node/ethstats/src/events.rs | 63 ++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 6 deletions(-) diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 7f0e944c92..0cc7c8c2eb 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -1060,7 +1060,13 @@ where } /// Spawns the [`EthStatsService`] service if configured. - pub async fn spawn_ethstats(&self) -> eyre::Result<()> { + pub async fn spawn_ethstats(&self, mut engine_events: St) -> eyre::Result<()> + where + St: Stream>> + + Send + + Unpin + + 'static, + { let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) }; let network = self.components().network().clone(); @@ -1070,7 +1076,37 @@ where info!(target: "reth::cli", "Starting EthStats service at {}", url); let ethstats = EthStatsService::new(url, network, provider, pool).await?; - tokio::spawn(async move { ethstats.run().await }); + + // If engine events are provided, spawn listener for new payload reporting + let ethstats_for_events = ethstats.clone(); + let task_executor = self.task_executor().clone(); + task_executor.spawn(Box::pin(async move { + while let Some(event) = engine_events.next().await { + use reth_engine_primitives::ConsensusEngineEvent; + match event { + ConsensusEngineEvent::ForkBlockAdded(executed, duration) | + ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => { + let block_hash = executed.recovered_block.num_hash().hash; + let block_number = executed.recovered_block.num_hash().number; + if let Err(e) = ethstats_for_events + .report_new_payload(block_hash, block_number, duration) + .await + { + debug!( + target: "ethstats", + "Failed to report new payload: {}", e + ); + } + } + _ => { + // Ignore other event types for ethstats reporting + } + } + } + })); + + // Spawn main ethstats service + task_executor.spawn(Box::pin(async move { ethstats.run().await })); Ok(()) } diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 134f131b8e..e9dd5344c4 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -375,6 +375,8 @@ impl EngineNodeLauncher { }; ctx.task_executor().spawn_critical("consensus engine", Box::pin(consensus_engine)); + let engine_events_for_ethstats = engine_events.new_listener(); + let full_node = FullNode { evm_config: ctx.components().evm_config().clone(), pool: ctx.components().pool().clone(), @@ -395,7 +397,7 @@ impl EngineNodeLauncher { // Notify on node started on_node_started.on_event(FullNode::clone(&full_node))?; - ctx.spawn_ethstats().await?; + ctx.spawn_ethstats(engine_events_for_ethstats).await?; let handle = NodeHandle { node_exit_future: NodeExitFuture::new( diff --git a/crates/node/ethstats/src/ethstats.rs b/crates/node/ethstats/src/ethstats.rs index df87060d77..1d8648ef47 100644 --- a/crates/node/ethstats/src/ethstats.rs +++ b/crates/node/ethstats/src/ethstats.rs @@ -3,8 +3,8 @@ use crate::{ credentials::EthstatsCredentials, error::EthStatsError, events::{ - AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PendingMsg, - PendingStats, PingMsg, StatsMsg, TxStats, UncleStats, + AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PayloadMsg, + PayloadStats, PendingMsg, PendingStats, PingMsg, StatsMsg, TxStats, UncleStats, }, }; use alloy_consensus::{BlockHeader, Sealable}; @@ -50,7 +50,7 @@ const READ_TIMEOUT: Duration = Duration::from_secs(30); /// authentication, stats reporting, block notifications, and connection management. /// It maintains a persistent `WebSocket` connection and automatically reconnects /// when the connection is lost. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EthStatsService { /// Authentication credentials for the `EthStats` server credentials: EthstatsCredentials, @@ -347,6 +347,42 @@ where Ok(()) } + /// Report new payload information to the `EthStats` server + /// + /// Sends information about payload processing time and block details + /// to the server for monitoring purposes. + pub async fn report_new_payload( + &self, + block_hash: alloy_primitives::B256, + block_number: u64, + processing_time: Duration, + ) -> Result<(), EthStatsError> { + let conn = self.conn.read().await; + let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?; + + let payload_stats = PayloadStats { + number: U256::from(block_number), + hash: block_hash, + processing_time: processing_time.as_millis() as u64, + }; + + let payload_msg = + PayloadMsg { id: self.credentials.node_id.clone(), payload: payload_stats }; + + debug!( + target: "ethstats", + "Reporting new payload: block={}, hash={:?}, processing_time={}ms", + block_number, + block_hash, + processing_time.as_millis() + ); + + let message = payload_msg.generate_new_payload_message(); + conn.write_json(&message).await?; + + Ok(()) + } + /// Convert a block to `EthStats` block statistics format /// /// Extracts relevant information from a block and formats it according diff --git a/crates/node/ethstats/src/events.rs b/crates/node/ethstats/src/events.rs index 08d0c90feb..b6ac064944 100644 --- a/crates/node/ethstats/src/events.rs +++ b/crates/node/ethstats/src/events.rs @@ -281,3 +281,66 @@ impl PingMsg { .to_string() } } + +/// Information reported about a new payload processing event. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PayloadStats { + /// Block number of the payload + pub number: U256, + + /// Hash of the payload block + pub hash: B256, + + /// Time taken to validate the payload in milliseconds + #[serde(rename = "processingTime")] + pub processing_time: u64, +} + +/// Message containing new payload information to be reported to the ethstats monitoring server. +#[derive(Debug, Serialize, Deserialize)] +pub struct PayloadMsg { + /// The node's unique identifier + pub id: String, + + /// The payload information to report + pub payload: PayloadStats, +} + +impl PayloadMsg { + /// Generate a new payload message for the ethstats monitoring server. + pub fn generate_new_payload_message(&self) -> String { + serde_json::json!({ + "emit": ["new-payload", self] + }) + .to_string() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{B256, U256}; + + #[test] + fn test_payload_msg_generation() { + let payload_stats = PayloadStats { + number: U256::from(12345), + hash: B256::from_slice(&[1u8; 32]), + processing_time: 150, + }; + + let payload_msg = PayloadMsg { id: "test-node".to_string(), payload: payload_stats }; + + let message = payload_msg.generate_new_payload_message(); + let parsed: serde_json::Value = serde_json::from_str(&message).expect("Valid JSON"); + + assert_eq!(parsed["emit"][0], "new-payload"); + assert_eq!(parsed["emit"][1]["id"], "test-node"); + assert_eq!(parsed["emit"][1]["payload"]["number"], "0x3039"); // 12345 in hex + assert_eq!(parsed["emit"][1]["payload"]["processingTime"], 150); + + // Verify the structure contains all expected fields + assert!(parsed["emit"][1]["payload"]["hash"].is_string()); + assert!(parsed["emit"][1]["payload"]["processingTime"].is_number()); + } +}