From 78d06fd76940fdef8ca30103de4f76a1bff2e11c Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 15 Feb 2023 06:37:26 +0100 Subject: [PATCH] feat(rpc): work on subscriptions (#1290) --- Cargo.lock | 1 + crates/rpc/rpc-api/src/eth_pubsub.rs | 6 +- crates/rpc/rpc-types/src/eth/pubsub.rs | 22 +++++- crates/rpc/rpc/Cargo.toml | 1 + crates/rpc/rpc/src/eth/pubsub.rs | 103 ++++++++++++++++++++----- 5 files changed, 110 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf4157ef2b..b65534f2cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4624,6 +4624,7 @@ dependencies = [ "reth-rpc-api", "reth-rpc-engine-api", "reth-rpc-types", + "reth-tasks", "reth-transaction-pool", "secp256k1 0.26.0", "serde", diff --git a/crates/rpc/rpc-api/src/eth_pubsub.rs b/crates/rpc/rpc-api/src/eth_pubsub.rs index 7777c2579b..d27c068ec1 100644 --- a/crates/rpc/rpc-api/src/eth_pubsub.rs +++ b/crates/rpc/rpc-api/src/eth_pubsub.rs @@ -1,14 +1,14 @@ use jsonrpsee::proc_macros::rpc; -use reth_rpc_types::pubsub::{Kind, Params}; +use reth_rpc_types::pubsub::{Params, SubscriptionKind}; /// Ethereum pub-sub rpc interface. #[rpc(server)] pub trait EthPubSubApi { - /// Create an ethereum subscription. + /// Create an ethereum subscription for the given params #[subscription( name = "eth_subscribe", unsubscribe = "eth_unsubscribe", item = reth_rpc_types::pubsub::SubscriptionResult )] - fn subscribe(&self, kind: Kind, params: Option); + fn subscribe(&self, kind: SubscriptionKind, params: Option); } diff --git a/crates/rpc/rpc-types/src/eth/pubsub.rs b/crates/rpc/rpc-types/src/eth/pubsub.rs index 29605bf0ae..7fbd1e46bd 100644 --- a/crates/rpc/rpc-types/src/eth/pubsub.rs +++ b/crates/rpc/rpc-types/src/eth/pubsub.rs @@ -58,14 +58,34 @@ impl Serialize for SubscriptionResult { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] -pub enum Kind { +pub enum SubscriptionKind { /// New block headers subscription. + /// + /// Fires a notification each time a new header is appended to the chain, including chain + /// reorganizations. In case of a chain reorganization the subscription will emit all new + /// headers for the new chain. Therefore the subscription can emit multiple headers on the same + /// height. NewHeads, /// Logs subscription. + /// + /// Returns logs that are included in new imported blocks and match the given filter criteria. + /// In case of a chain reorganization previous sent logs that are on the old chain will be + /// resent with the removed property set to true. Logs from transactions that ended up in the + /// new chain are emitted. Therefore, a subscription can emit logs for the same transaction + /// multiple times. Logs, /// New Pending Transactions subscription. + /// + /// Returns the hash for all transactions that are added to the pending state and are signed + /// with a key that is available in the node. When a transaction that was previously part of + /// the canonical chain isn't part of the new canonical chain after a reogranization its again + /// emitted. NewPendingTransactions, /// Node syncing status subscription. + /// + /// Indicates when the node starts or stops synchronizing. The result can either be a boolean + /// indicating that the synchronization has started (true), finished (false) or an object with + /// various progress indicators. Syncing, } diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index f9bd449a34..3558a774b6 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -19,6 +19,7 @@ reth-provider = { path = "../../storage/provider", features = ["test-utils"] } reth-transaction-pool = { path = "../../transaction-pool", features=["test-utils"]} reth-network-api = { path = "../../net/network-api" } reth-rpc-engine-api = { path = "../rpc-engine-api" } +reth-tasks = { path = "../../tasks" } # rpc jsonrpsee = { version = "0.16" } diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 393b0003de..c4e5cce3eb 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -1,63 +1,128 @@ //! `eth_` PubSub RPC handler implementation use jsonrpsee::{types::SubscriptionResult, SubscriptionSink}; +use reth_primitives::{rpc::FilteredParams, TxHash}; use reth_provider::BlockProvider; use reth_rpc_api::EthPubSubApiServer; -use reth_rpc_types::pubsub::{Kind, Params}; +use reth_rpc_types::pubsub::{ + Params, SubscriptionKind, SubscriptionResult as EthSubscriptionResult, +}; +use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::TransactionPool; -use std::sync::Arc; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; /// `Eth` pubsub RPC implementation. /// /// This handles -#[derive(Debug, Clone)] pub struct EthPubSub { /// All nested fields bundled together. - inner: Arc>, + inner: EthPubSubInner, + /// The type that's used to spawn subscription tasks. + subscription_task_spawner: Box, } // === impl EthPubSub === impl EthPubSub { /// Creates a new, shareable instance. - pub fn new(client: Arc, pool: Pool) -> Self { + /// + /// Subscription tasks are spawned via [tokio::task::spawn] + pub fn new(client: Client, pool: Pool) -> Self { + Self::with_spawner(client, pool, Box::::default()) + } + + /// Creates a new, shareable instance. + pub fn with_spawner( + client: Client, + pool: Pool, + subscription_task_spawner: Box, + ) -> Self { let inner = EthPubSubInner { client, pool }; - Self { inner: Arc::new(inner) } + Self { inner, subscription_task_spawner } } } impl EthPubSubApiServer for EthPubSub where - Client: BlockProvider + 'static, + Client: BlockProvider + Clone + 'static, Pool: TransactionPool + 'static, { fn subscribe( &self, mut sink: SubscriptionSink, - _kind: Kind, - _params: Option, + kind: SubscriptionKind, + params: Option, ) -> SubscriptionResult { sink.accept()?; - todo!() + + let pubsub = self.inner.clone(); + self.subscription_task_spawner.spawn(Box::pin(async move { + handle_accepted(pubsub, sink, kind, params).await; + })); + + Ok(()) } } /// The actual handler for and accepted [`EthPubSub::subscribe`] call. async fn handle_accepted( - _pool: Pool, - _client: Arc, - _accepted_sink: SubscriptionSink, - _kind: Kind, - _params: Option, -) { + pubsub: EthPubSubInner, + mut accepted_sink: SubscriptionSink, + kind: SubscriptionKind, + params: Option, +) where + Client: BlockProvider + 'static, + Pool: TransactionPool + 'static, +{ + // if no params are provided, used default filter params + let _params = match params { + Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)), + _ => FilteredParams::default(), + }; + + match kind { + SubscriptionKind::NewHeads => { + // TODO subscribe new blocks -> convert + } + SubscriptionKind::Logs => { + // TODO subscribe new blocks -> fetch logs via bloom + } + SubscriptionKind::NewPendingTransactions => { + let stream = pubsub + .into_pending_transaction_stream() + .map(EthSubscriptionResult::TransactionHash); + accepted_sink.pipe_from_stream(stream).await; + } + SubscriptionKind::Syncing => { + // TODO subscribe new blocks -> read is_syncing from network + } + } +} + +impl std::fmt::Debug for EthPubSub { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EthPubSub").finish_non_exhaustive() + } } /// Container type `EthPubSub` -#[derive(Debug)] +#[derive(Clone)] struct EthPubSubInner { /// The transaction pool. pool: Pool, /// The client that can interact with the chain. - client: Arc, - // TODO needs spawn access + client: Client, +} + +// == impl EthPubSubInner === + +impl EthPubSubInner +where + Client: BlockProvider + 'static, + Pool: TransactionPool + 'static, +{ + /// Returns a stream that yields all transactions emitted by the txpool. + fn into_pending_transaction_stream(self) -> impl Stream { + ReceiverStream::new(self.pool.pending_transactions_listener()) + } }