feat(rpc): add subscribeFinalizedChainNotifications endpoint (#22011)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Matthias Seitz
2026-02-10 15:07:31 +01:00
committed by GitHub
parent c727c61101
commit 198e457a12
4 changed files with 117 additions and 4 deletions

View File

@@ -0,0 +1,7 @@
---
reth-rpc-api: minor
reth-rpc-builder: patch
reth-rpc: minor
---
Added `subscribeFinalizedChainNotifications` RPC endpoint that buffers committed chain notifications and emits them once a new finalized block is received.

View File

@@ -2,7 +2,7 @@ use alloy_eips::BlockId;
use alloy_primitives::{map::AddressMap, U256};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
// Required for the subscription attribute below
// Required for the subscription attributes below
use reth_chain_state as _;
/// Reth API namespace for reth-specific methods
@@ -33,4 +33,17 @@ pub trait RethApi {
item = alloy_eips::BlockNumHash
)]
async fn reth_subscribe_persisted_block(&self) -> jsonrpsee::core::SubscriptionResult;
/// Subscribe to finalized chain notifications.
///
/// Buffers committed chain notifications and emits them once a new finalized block is received.
/// Each notification contains all committed chain segments up to the finalized block.
#[subscription(
name = "subscribeFinalizedChainNotifications",
unsubscribe = "unsubscribeFinalizedChainNotifications",
item = Vec<reth_chain_state::CanonStateNotification>
)]
async fn reth_subscribe_finalized_chain_notifications(
&self,
) -> jsonrpsee::core::SubscriptionResult;
}

View File

@@ -103,7 +103,9 @@ pub use eth::EthHandlers;
mod metrics;
use crate::middleware::RethRpcMiddleware;
pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
use reth_chain_state::{CanonStateSubscriptions, PersistedBlockSubscriptions};
use reth_chain_state::{
CanonStateSubscriptions, ForkChoiceSubscriptions, PersistedBlockSubscriptions,
};
use reth_rpc::eth::sim_bundle::EthSimBundle;
// Rpc rate limiter
@@ -311,6 +313,7 @@ where
N: NodePrimitives,
Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
+ CanonStateSubscriptions<Primitives = N>
+ ForkChoiceSubscriptions<Header = N::BlockHeader>
+ PersistedBlockSubscriptions
+ AccountReader
+ ChangeSetReader,
@@ -656,7 +659,8 @@ where
Transaction = N::SignedTx,
> + AccountReader
+ ChangeSetReader
+ CanonStateSubscriptions
+ CanonStateSubscriptions<Primitives = N>
+ ForkChoiceSubscriptions<Header = N::BlockHeader>
+ PersistedBlockSubscriptions,
Network: NetworkInfo + Peers + Clone + 'static,
EthApi: EthApiServer<
@@ -845,6 +849,7 @@ where
N: NodePrimitives,
Provider: FullRpcProvider<Block = N::Block>
+ CanonStateSubscriptions<Primitives = N>
+ ForkChoiceSubscriptions<Header = N::BlockHeader>
+ PersistedBlockSubscriptions
+ AccountReader
+ ChangeSetReader,

View File

@@ -1,12 +1,17 @@
use std::{future::Future, sync::Arc};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockId;
use alloy_primitives::{map::AddressMap, U256};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
use reth_chain_state::{CanonStateSubscriptions, PersistedBlockSubscriptions};
use reth_chain_state::{
CanonStateNotification, CanonStateSubscriptions, ForkChoiceSubscriptions,
PersistedBlockSubscriptions,
};
use reth_errors::RethResult;
use reth_primitives_traits::{NodePrimitives, SealedHeader};
use reth_rpc_api::RethApiServer;
use reth_rpc_eth_types::{EthApiError, EthResult};
use reth_storage_api::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory};
@@ -92,6 +97,7 @@ where
+ ChangeSetReader
+ StateProviderFactory
+ CanonStateSubscriptions
+ ForkChoiceSubscriptions<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>
+ PersistedBlockSubscriptions
+ 'static,
{
@@ -126,6 +132,23 @@ where
Ok(())
}
/// Handler for `reth_subscribeFinalizedChainNotifications`
async fn reth_subscribe_finalized_chain_notifications(
&self,
pending: PendingSubscriptionSink,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let canon_stream = self.provider().canonical_state_stream();
let finalized_stream = self.provider().finalized_block_stream();
self.inner.task_spawner.spawn(Box::pin(finalized_chain_notifications(
sink,
canon_stream,
finalized_stream,
)));
Ok(())
}
}
/// Pipes all stream items to the subscription sink.
@@ -158,6 +181,71 @@ where
}
}
/// Buffers committed chain notifications and emits them when a new finalized block is received.
async fn finalized_chain_notifications<N>(
sink: SubscriptionSink,
mut canon_stream: reth_chain_state::CanonStateNotificationStream<N>,
mut finalized_stream: reth_chain_state::ForkChoiceStream<SealedHeader<N::BlockHeader>>,
) where
N: NodePrimitives,
{
let mut buffered: Vec<CanonStateNotification<N>> = Vec::new();
loop {
tokio::select! {
_ = sink.closed() => {
break
}
maybe_canon = canon_stream.next() => {
let Some(notification) = maybe_canon else { break };
match &notification {
CanonStateNotification::Commit { .. } => {
buffered.push(notification);
}
CanonStateNotification::Reorg { .. } => {
buffered.clear();
}
}
}
maybe_finalized = finalized_stream.next() => {
let Some(finalized_header) = maybe_finalized else { break };
let finalized_num = finalized_header.number();
let mut committed = Vec::new();
buffered.retain(|n| {
if *n.committed().range().end() <= finalized_num {
committed.push(n.clone());
false
} else {
true
}
});
if committed.is_empty() {
continue;
}
committed.sort_by_key(|n| *n.committed().range().start());
let msg = match SubscriptionMessage::new(
sink.method_name(),
sink.subscription_id(),
&committed,
) {
Ok(msg) => msg,
Err(err) => {
tracing::error!(target: "rpc::reth", %err, "Failed to serialize finalized chain notification");
break
}
};
if sink.send(msg).await.is_err() {
break;
}
}
}
}
}
impl<Provider> std::fmt::Debug for RethApi<Provider> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RethApi").finish_non_exhaustive()