feat(rpc): add persisted block subscription (#20877)

Co-authored-by: cakevm <cakevm@proton.me>
This commit is contained in:
Matthias Seitz
2026-01-09 11:37:46 +01:00
committed by GitHub
parent 8a9b5d90f4
commit 33bcd60348
3 changed files with 48 additions and 26 deletions

View File

@@ -24,4 +24,14 @@ pub trait RethApi {
item = reth_chain_state::CanonStateNotification
)]
async fn reth_subscribe_chain_notifications(&self) -> jsonrpsee::core::SubscriptionResult;
/// Subscribe to persisted block notifications.
///
/// Emits a notification with the block number and hash when a new block is persisted to disk.
#[subscription(
name = "subscribePersistedBlock",
unsubscribe = "unsubscribePersistedBlock",
item = alloy_eips::BlockNumHash
)]
async fn reth_subscribe_persisted_block(&self) -> jsonrpsee::core::SubscriptionResult;
}

View File

@@ -103,7 +103,7 @@ pub use eth::EthHandlers;
mod metrics;
use crate::middleware::RethRpcMiddleware;
pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
use reth_chain_state::CanonStateSubscriptions;
use reth_chain_state::{CanonStateSubscriptions, PersistedBlockSubscriptions};
use reth_rpc::eth::sim_bundle::EthSimBundle;
// Rpc rate limiter
@@ -311,6 +311,7 @@ where
N: NodePrimitives,
Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
+ CanonStateSubscriptions<Primitives = N>
+ PersistedBlockSubscriptions
+ AccountReader
+ ChangeSetReader,
Pool: TransactionPool + Clone + 'static,
@@ -655,7 +656,8 @@ where
Transaction = N::SignedTx,
> + AccountReader
+ ChangeSetReader
+ CanonStateSubscriptions,
+ CanonStateSubscriptions
+ PersistedBlockSubscriptions,
Network: NetworkInfo + Peers + Clone + 'static,
EthApi: EthApiServer<
RpcTxReq<EthApi::NetworkTypes>,
@@ -843,6 +845,7 @@ where
N: NodePrimitives,
Provider: FullRpcProvider<Block = N::Block>
+ CanonStateSubscriptions<Primitives = N>
+ PersistedBlockSubscriptions
+ AccountReader
+ ChangeSetReader,
Pool: TransactionPool + Clone + 'static,

View File

@@ -3,17 +3,15 @@ use std::{collections::HashMap, future::Future, sync::Arc};
use alloy_eips::BlockId;
use alloy_primitives::{Address, U256};
use async_trait::async_trait;
use futures::StreamExt;
use futures::{Stream, StreamExt};
use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
use jsonrpsee_types::ErrorObject;
use reth_chain_state::{CanonStateNotificationStream, CanonStateSubscriptions};
use reth_chain_state::{CanonStateSubscriptions, PersistedBlockSubscriptions};
use reth_errors::RethResult;
use reth_primitives_traits::NodePrimitives;
use reth_rpc_api::RethApiServer;
use reth_rpc_eth_types::{EthApiError, EthResult};
use reth_rpc_server_types::result::internal_rpc_err;
use reth_storage_api::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory};
use reth_tasks::TaskSpawner;
use serde::Serialize;
use tokio::sync::oneshot;
/// `reth` API implementation.
@@ -97,6 +95,7 @@ where
+ ChangeSetReader
+ StateProviderFactory
+ CanonStateSubscriptions
+ PersistedBlockSubscriptions
+ 'static,
{
/// Handler for `reth_getBalanceChangesInBlock`
@@ -114,38 +113,48 @@ where
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let stream = self.provider().canonical_state_stream();
self.inner.task_spawner.spawn(Box::pin(async move {
let _ = pipe_from_stream(sink, stream).await;
}));
self.inner.task_spawner.spawn(Box::pin(pipe_from_stream(sink, stream)));
Ok(())
}
/// Handler for `reth_subscribePersistedBlock`
async fn reth_subscribe_persisted_block(
&self,
pending: PendingSubscriptionSink,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let stream = self.provider().persisted_block_stream();
self.inner.task_spawner.spawn(Box::pin(pipe_from_stream(sink, stream)));
Ok(())
}
}
/// Pipes all stream items to the subscription sink.
async fn pipe_from_stream<N: NodePrimitives>(
sink: SubscriptionSink,
mut stream: CanonStateNotificationStream<N>,
) -> Result<(), ErrorObject<'static>> {
async fn pipe_from_stream<S, T>(sink: SubscriptionSink, mut stream: S)
where
S: Stream<Item = T> + Unpin,
T: Serialize,
{
loop {
tokio::select! {
_ = sink.closed() => {
// connection dropped
break Ok(())
break
}
maybe_item = stream.next() => {
let item = match maybe_item {
Some(item) => item,
None => {
// stream ended
break Ok(())
},
let Some(item) = maybe_item else {
break
};
let msg = match SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) {
Ok(msg) => msg,
Err(err) => {
tracing::error!(target: "rpc::reth", %err, "Failed to serialize subscription message");
break
}
};
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(|e| internal_rpc_err(e.to_string()))?;
if sink.send(msg).await.is_err() {
break Ok(());
break;
}
}
}