From 1846f2d73c7793423f5987d36905e1b13100428f Mon Sep 17 00:00:00 2001 From: chirag-bgh <76247491+chirag-bgh@users.noreply.github.com> Date: Sun, 12 Mar 2023 14:46:47 +0530 Subject: [PATCH] feat: Implement is_syncing subscription handler (#1562) Co-authored-by: Matthias Seitz --- crates/rpc/rpc-builder/src/eth.rs | 2 +- crates/rpc/rpc/src/eth/pubsub.rs | 92 ++++++++++++++++++++++++------- 2 files changed, 74 insertions(+), 20 deletions(-) diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 45c136e91b..eec235cdd8 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -14,7 +14,7 @@ pub struct EthHandlers { /// Polling based filter handler available on all transports pub filter: EthFilter, /// Handler for subscriptions only available for transports that support it (ws, ipc) - pub pubsub: Option>, + pub pubsub: Option>, } /// Additional config values for the eth namespace diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 57f66f974f..01064fdb21 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -1,12 +1,15 @@ //! `eth_` PubSub RPC handler implementation use jsonrpsee::{types::SubscriptionResult, SubscriptionSink}; -use reth_interfaces::events::ChainEventSubscriptions; +use reth_interfaces::{events::ChainEventSubscriptions, sync::SyncStateProvider}; use reth_primitives::{rpc::FilteredParams, TxHash}; use reth_provider::{BlockProvider, EvmEnvProvider}; use reth_rpc_api::EthPubSubApiServer; use reth_rpc_types::{ - pubsub::{Params, SubscriptionKind, SubscriptionResult as EthSubscriptionResult}, + pubsub::{ + Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult, + SyncStatusMetadata, + }, Header, }; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; @@ -20,21 +23,21 @@ use tokio_stream::{ /// /// This handles `eth_subscribe` RPC calls. #[derive(Clone)] -pub struct EthPubSub { +pub struct EthPubSub { /// All nested fields bundled together. - inner: EthPubSubInner, + inner: EthPubSubInner, /// The type that's used to spawn subscription tasks. subscription_task_spawner: Box, } // === impl EthPubSub === -impl EthPubSub { +impl EthPubSub { /// Creates a new, shareable instance. /// /// Subscription tasks are spawned via [tokio::task::spawn] - pub fn new(client: Client, pool: Pool, chain_events: Events) -> Self { - Self::with_spawner(client, pool, chain_events, Box::::default()) + pub fn new(client: Client, pool: Pool, chain_events: Events, network: Network) -> Self { + Self::with_spawner(client, pool, chain_events, network, Box::::default()) } /// Creates a new, shareable instance. @@ -42,18 +45,20 @@ impl EthPubSub { client: Client, pool: Pool, chain_events: Events, + network: Network, subscription_task_spawner: Box, ) -> Self { - let inner = EthPubSubInner { client, pool, chain_events }; + let inner = EthPubSubInner { client, pool, chain_events, network }; Self { inner, subscription_task_spawner } } } -impl EthPubSubApiServer for EthPubSub +impl EthPubSubApiServer for EthPubSub where Client: BlockProvider + EvmEnvProvider + Clone + 'static, Pool: TransactionPool + 'static, Events: ChainEventSubscriptions + Clone + 'static, + Network: SyncStateProvider + Clone + 'static, { fn subscribe( &self, @@ -65,7 +70,7 @@ where let pubsub = self.inner.clone(); self.subscription_task_spawner.spawn(Box::pin(async move { - handle_accepted(pubsub, sink, kind, params).await; + handle_accepted(pubsub, sink, kind, params, Box::::default()).await; })); Ok(()) @@ -73,15 +78,17 @@ where } /// The actual handler for and accepted [`EthPubSub::subscribe`] call. -async fn handle_accepted( - pubsub: EthPubSubInner, +async fn handle_accepted( + pubsub: EthPubSubInner, mut accepted_sink: SubscriptionSink, kind: SubscriptionKind, params: Option, + subscription_task_spawner: Box, ) where - Client: BlockProvider + EvmEnvProvider + 'static, + Client: BlockProvider + EvmEnvProvider + Clone + 'static, Pool: TransactionPool + 'static, - Events: ChainEventSubscriptions + 'static, + Events: ChainEventSubscriptions + Clone + 'static, + Network: SyncStateProvider + Clone + 'static, { // if no params are provided, used default filter params let _params = match params { @@ -106,12 +113,35 @@ async fn handle_accepted( accepted_sink.pipe_from_stream(stream).await; } SubscriptionKind::Syncing => { - // TODO subscribe new blocks -> read is_syncing from network + subscription_task_spawner.spawn(Box::pin(async move { + // get new block subscription + let mut new_blocks = + UnboundedReceiverStream::new(pubsub.chain_events.subscribe_new_blocks()); + // get current sync status + let mut initial_sync_status = pubsub.network.is_syncing(); + let current_sub_res = pubsub.sync_status(initial_sync_status).await; + + // send the current status immediately + let _ = accepted_sink.send(¤t_sub_res); + + while (new_blocks.next().await).is_some() { + let current_syncing = pubsub.network.is_syncing(); + // Only send a new response if the sync status has changed + if current_syncing != initial_sync_status { + // Update the sync status on each new block + initial_sync_status = current_syncing; + + // send a new message now that the status changed + let sync_status = pubsub.sync_status(current_syncing).await; + let _ = accepted_sink.send(&sync_status); + } + } + })); } } } -impl std::fmt::Debug for EthPubSub { +impl std::fmt::Debug for EthPubSub { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("EthPubSub").finish_non_exhaustive() } @@ -119,18 +149,41 @@ impl std::fmt::Debug for EthPubSub { /// Container type `EthPubSub` #[derive(Clone)] -struct EthPubSubInner { +struct EthPubSubInner { /// The transaction pool. pool: Pool, /// The client that can interact with the chain. client: Client, /// A type that allows to create new event subscriptions, chain_events: Events, + /// The network. + network: Network, } // == impl EthPubSubInner === -impl EthPubSubInner +impl EthPubSubInner +where + Client: BlockProvider + 'static, +{ + /// Returns the current sync status for the `syncing` subscription + async fn sync_status(&self, is_syncing: bool) -> EthSubscriptionResult { + let current_block = + self.client.chain_info().map(|info| info.best_number).unwrap_or_default(); + if is_syncing { + EthSubscriptionResult::SyncState(PubSubSyncStatus::Detailed(SyncStatusMetadata { + syncing: true, + starting_block: 0, + current_block, + highest_block: Some(current_block), + })) + } else { + EthSubscriptionResult::SyncState(PubSubSyncStatus::Simple(false)) + } + } +} + +impl EthPubSubInner where Pool: TransactionPool + 'static, { @@ -140,10 +193,11 @@ where } } -impl EthPubSubInner +impl EthPubSubInner where Client: BlockProvider + EvmEnvProvider + 'static, Events: ChainEventSubscriptions + 'static, + Network: SyncStateProvider + 'static, { /// Returns a stream that yields all new RPC blocks. fn into_new_headers_stream(self) -> impl Stream {