feat: integrate newPayload into ethstats (#20584)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
stevencartavia
2025-12-24 01:56:26 -06:00
committed by GitHub
parent 334d9f2a76
commit ad37490e7d
4 changed files with 143 additions and 6 deletions

View File

@@ -1060,7 +1060,13 @@ where
}
/// Spawns the [`EthStatsService`] service if configured.
pub async fn spawn_ethstats(&self) -> eyre::Result<()> {
pub async fn spawn_ethstats<St>(&self, mut engine_events: St) -> eyre::Result<()>
where
St: Stream<Item = reth_engine_primitives::ConsensusEngineEvent<PrimitivesTy<T::Types>>>
+ Send
+ Unpin
+ 'static,
{
let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
let network = self.components().network().clone();
@@ -1070,7 +1076,37 @@ where
info!(target: "reth::cli", "Starting EthStats service at {}", url);
let ethstats = EthStatsService::new(url, network, provider, pool).await?;
tokio::spawn(async move { ethstats.run().await });
// If engine events are provided, spawn listener for new payload reporting
let ethstats_for_events = ethstats.clone();
let task_executor = self.task_executor().clone();
task_executor.spawn(Box::pin(async move {
while let Some(event) = engine_events.next().await {
use reth_engine_primitives::ConsensusEngineEvent;
match event {
ConsensusEngineEvent::ForkBlockAdded(executed, duration) |
ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => {
let block_hash = executed.recovered_block.num_hash().hash;
let block_number = executed.recovered_block.num_hash().number;
if let Err(e) = ethstats_for_events
.report_new_payload(block_hash, block_number, duration)
.await
{
debug!(
target: "ethstats",
"Failed to report new payload: {}", e
);
}
}
_ => {
// Ignore other event types for ethstats reporting
}
}
}
}));
// Spawn main ethstats service
task_executor.spawn(Box::pin(async move { ethstats.run().await }));
Ok(())
}

View File

@@ -375,6 +375,8 @@ impl EngineNodeLauncher {
};
ctx.task_executor().spawn_critical("consensus engine", Box::pin(consensus_engine));
let engine_events_for_ethstats = engine_events.new_listener();
let full_node = FullNode {
evm_config: ctx.components().evm_config().clone(),
pool: ctx.components().pool().clone(),
@@ -395,7 +397,7 @@ impl EngineNodeLauncher {
// Notify on node started
on_node_started.on_event(FullNode::clone(&full_node))?;
ctx.spawn_ethstats().await?;
ctx.spawn_ethstats(engine_events_for_ethstats).await?;
let handle = NodeHandle {
node_exit_future: NodeExitFuture::new(

View File

@@ -3,8 +3,8 @@ use crate::{
credentials::EthstatsCredentials,
error::EthStatsError,
events::{
AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PendingMsg,
PendingStats, PingMsg, StatsMsg, TxStats, UncleStats,
AuthMsg, BlockMsg, BlockStats, HistoryMsg, LatencyMsg, NodeInfo, NodeStats, PayloadMsg,
PayloadStats, PendingMsg, PendingStats, PingMsg, StatsMsg, TxStats, UncleStats,
},
};
use alloy_consensus::{BlockHeader, Sealable};
@@ -50,7 +50,7 @@ const READ_TIMEOUT: Duration = Duration::from_secs(30);
/// authentication, stats reporting, block notifications, and connection management.
/// It maintains a persistent `WebSocket` connection and automatically reconnects
/// when the connection is lost.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct EthStatsService<Network, Provider, Pool> {
/// Authentication credentials for the `EthStats` server
credentials: EthstatsCredentials,
@@ -347,6 +347,42 @@ where
Ok(())
}
/// Report new payload information to the `EthStats` server
///
/// Sends information about payload processing time and block details
/// to the server for monitoring purposes.
pub async fn report_new_payload(
&self,
block_hash: alloy_primitives::B256,
block_number: u64,
processing_time: Duration,
) -> Result<(), EthStatsError> {
let conn = self.conn.read().await;
let conn = conn.as_ref().ok_or(EthStatsError::NotConnected)?;
let payload_stats = PayloadStats {
number: U256::from(block_number),
hash: block_hash,
processing_time: processing_time.as_millis() as u64,
};
let payload_msg =
PayloadMsg { id: self.credentials.node_id.clone(), payload: payload_stats };
debug!(
target: "ethstats",
"Reporting new payload: block={}, hash={:?}, processing_time={}ms",
block_number,
block_hash,
processing_time.as_millis()
);
let message = payload_msg.generate_new_payload_message();
conn.write_json(&message).await?;
Ok(())
}
/// Convert a block to `EthStats` block statistics format
///
/// Extracts relevant information from a block and formats it according

View File

@@ -281,3 +281,66 @@ impl PingMsg {
.to_string()
}
}
/// Information reported about a new payload processing event.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PayloadStats {
/// Block number of the payload
pub number: U256,
/// Hash of the payload block
pub hash: B256,
/// Time taken to validate the payload in milliseconds
#[serde(rename = "processingTime")]
pub processing_time: u64,
}
/// Message containing new payload information to be reported to the ethstats monitoring server.
#[derive(Debug, Serialize, Deserialize)]
pub struct PayloadMsg {
/// The node's unique identifier
pub id: String,
/// The payload information to report
pub payload: PayloadStats,
}
impl PayloadMsg {
/// Generate a new payload message for the ethstats monitoring server.
pub fn generate_new_payload_message(&self) -> String {
serde_json::json!({
"emit": ["new-payload", self]
})
.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::{B256, U256};
#[test]
fn test_payload_msg_generation() {
let payload_stats = PayloadStats {
number: U256::from(12345),
hash: B256::from_slice(&[1u8; 32]),
processing_time: 150,
};
let payload_msg = PayloadMsg { id: "test-node".to_string(), payload: payload_stats };
let message = payload_msg.generate_new_payload_message();
let parsed: serde_json::Value = serde_json::from_str(&message).expect("Valid JSON");
assert_eq!(parsed["emit"][0], "new-payload");
assert_eq!(parsed["emit"][1]["id"], "test-node");
assert_eq!(parsed["emit"][1]["payload"]["number"], "0x3039"); // 12345 in hex
assert_eq!(parsed["emit"][1]["payload"]["processingTime"], 150);
// Verify the structure contains all expected fields
assert!(parsed["emit"][1]["payload"]["hash"].is_string());
assert!(parsed["emit"][1]["payload"]["processingTime"].is_number());
}
}