From 198e457a12c56edd4a324a3201ad5f6711a8c0d3 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 10 Feb 2026 15:07:31 +0100 Subject: [PATCH] feat(rpc): add `subscribeFinalizedChainNotifications` endpoint (#22011) Co-authored-by: Amp --- .changelog/warm-geese-build.md | 7 +++ crates/rpc/rpc-api/src/reth.rs | 15 +++++- crates/rpc/rpc-builder/src/lib.rs | 9 +++- crates/rpc/rpc/src/reth.rs | 90 ++++++++++++++++++++++++++++++- 4 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 .changelog/warm-geese-build.md diff --git a/.changelog/warm-geese-build.md b/.changelog/warm-geese-build.md new file mode 100644 index 0000000000..98ec2cdce4 --- /dev/null +++ b/.changelog/warm-geese-build.md @@ -0,0 +1,7 @@ +--- +reth-rpc-api: minor +reth-rpc-builder: patch +reth-rpc: minor +--- + +Added `subscribeFinalizedChainNotifications` RPC endpoint that buffers committed chain notifications and emits them once a new finalized block is received. diff --git a/crates/rpc/rpc-api/src/reth.rs b/crates/rpc/rpc-api/src/reth.rs index a70fa30d89..bfc4fa8db2 100644 --- a/crates/rpc/rpc-api/src/reth.rs +++ b/crates/rpc/rpc-api/src/reth.rs @@ -2,7 +2,7 @@ use alloy_eips::BlockId; use alloy_primitives::{map::AddressMap, U256}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -// Required for the subscription attribute below +// Required for the subscription attributes below use reth_chain_state as _; /// Reth API namespace for reth-specific methods @@ -33,4 +33,17 @@ pub trait RethApi { item = alloy_eips::BlockNumHash )] async fn reth_subscribe_persisted_block(&self) -> jsonrpsee::core::SubscriptionResult; + + /// Subscribe to finalized chain notifications. + /// + /// Buffers committed chain notifications and emits them once a new finalized block is received. + /// Each notification contains all committed chain segments up to the finalized block. + #[subscription( + name = "subscribeFinalizedChainNotifications", + unsubscribe = "unsubscribeFinalizedChainNotifications", + item = Vec + )] + async fn reth_subscribe_finalized_chain_notifications( + &self, + ) -> jsonrpsee::core::SubscriptionResult; } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 6010a9fe83..8cc029e3f9 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -103,7 +103,9 @@ pub use eth::EthHandlers; mod metrics; use crate::middleware::RethRpcMiddleware; pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService}; -use reth_chain_state::{CanonStateSubscriptions, PersistedBlockSubscriptions}; +use reth_chain_state::{ + CanonStateSubscriptions, ForkChoiceSubscriptions, PersistedBlockSubscriptions, +}; use reth_rpc::eth::sim_bundle::EthSimBundle; // Rpc rate limiter @@ -311,6 +313,7 @@ where N: NodePrimitives, Provider: FullRpcProvider + CanonStateSubscriptions + + ForkChoiceSubscriptions
+ PersistedBlockSubscriptions + AccountReader + ChangeSetReader, @@ -656,7 +659,8 @@ where Transaction = N::SignedTx, > + AccountReader + ChangeSetReader - + CanonStateSubscriptions + + CanonStateSubscriptions + + ForkChoiceSubscriptions
+ PersistedBlockSubscriptions, Network: NetworkInfo + Peers + Clone + 'static, EthApi: EthApiServer< @@ -845,6 +849,7 @@ where N: NodePrimitives, Provider: FullRpcProvider + CanonStateSubscriptions + + ForkChoiceSubscriptions
+ PersistedBlockSubscriptions + AccountReader + ChangeSetReader, diff --git a/crates/rpc/rpc/src/reth.rs b/crates/rpc/rpc/src/reth.rs index ae3f59b2bf..6f4e79d291 100644 --- a/crates/rpc/rpc/src/reth.rs +++ b/crates/rpc/rpc/src/reth.rs @@ -1,12 +1,17 @@ use std::{future::Future, sync::Arc}; +use alloy_consensus::BlockHeader; use alloy_eips::BlockId; use alloy_primitives::{map::AddressMap, U256}; use async_trait::async_trait; use futures::{Stream, StreamExt}; use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink}; -use reth_chain_state::{CanonStateSubscriptions, PersistedBlockSubscriptions}; +use reth_chain_state::{ + CanonStateNotification, CanonStateSubscriptions, ForkChoiceSubscriptions, + PersistedBlockSubscriptions, +}; use reth_errors::RethResult; +use reth_primitives_traits::{NodePrimitives, SealedHeader}; use reth_rpc_api::RethApiServer; use reth_rpc_eth_types::{EthApiError, EthResult}; use reth_storage_api::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory}; @@ -92,6 +97,7 @@ where + ChangeSetReader + StateProviderFactory + CanonStateSubscriptions + + ForkChoiceSubscriptions
::BlockHeader> + PersistedBlockSubscriptions + 'static, { @@ -126,6 +132,23 @@ where Ok(()) } + + /// Handler for `reth_subscribeFinalizedChainNotifications` + async fn reth_subscribe_finalized_chain_notifications( + &self, + pending: PendingSubscriptionSink, + ) -> jsonrpsee::core::SubscriptionResult { + let sink = pending.accept().await?; + let canon_stream = self.provider().canonical_state_stream(); + let finalized_stream = self.provider().finalized_block_stream(); + self.inner.task_spawner.spawn(Box::pin(finalized_chain_notifications( + sink, + canon_stream, + finalized_stream, + ))); + + Ok(()) + } } /// Pipes all stream items to the subscription sink. @@ -158,6 +181,71 @@ where } } +/// Buffers committed chain notifications and emits them when a new finalized block is received. +async fn finalized_chain_notifications( + sink: SubscriptionSink, + mut canon_stream: reth_chain_state::CanonStateNotificationStream, + mut finalized_stream: reth_chain_state::ForkChoiceStream>, +) where + N: NodePrimitives, +{ + let mut buffered: Vec> = Vec::new(); + + loop { + tokio::select! { + _ = sink.closed() => { + break + } + maybe_canon = canon_stream.next() => { + let Some(notification) = maybe_canon else { break }; + match ¬ification { + CanonStateNotification::Commit { .. } => { + buffered.push(notification); + } + CanonStateNotification::Reorg { .. } => { + buffered.clear(); + } + } + } + maybe_finalized = finalized_stream.next() => { + let Some(finalized_header) = maybe_finalized else { break }; + let finalized_num = finalized_header.number(); + + let mut committed = Vec::new(); + buffered.retain(|n| { + if *n.committed().range().end() <= finalized_num { + committed.push(n.clone()); + false + } else { + true + } + }); + + if committed.is_empty() { + continue; + } + + committed.sort_by_key(|n| *n.committed().range().start()); + + let msg = match SubscriptionMessage::new( + sink.method_name(), + sink.subscription_id(), + &committed, + ) { + Ok(msg) => msg, + Err(err) => { + tracing::error!(target: "rpc::reth", %err, "Failed to serialize finalized chain notification"); + break + } + }; + if sink.send(msg).await.is_err() { + break; + } + } + } + } +} + impl std::fmt::Debug for RethApi { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RethApi").finish_non_exhaustive()