diff --git a/crates/rpc/rpc-types/src/eth/pubsub.rs b/crates/rpc/rpc-types/src/eth/pubsub.rs index 63f152b662..f027037c17 100644 --- a/crates/rpc/rpc-types/src/eth/pubsub.rs +++ b/crates/rpc/rpc-types/src/eth/pubsub.rs @@ -1,6 +1,10 @@ //! Ethereum types for pub-sub -use crate::{eth::Filter, Log, RichHeader}; +use crate::{ + eth::{Filter, Transaction}, + Log, RichHeader, +}; + use reth_primitives::H256; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; @@ -14,6 +18,8 @@ pub enum SubscriptionResult { Log(Box), /// Transaction hash TransactionHash(H256), + /// Full Transaction + FullTransaction(Box), /// SyncStatus SyncState(PubSubSyncStatus), } @@ -49,6 +55,7 @@ impl Serialize for SubscriptionResult { SubscriptionResult::Header(ref header) => header.serialize(serializer), SubscriptionResult::Log(ref log) => log.serialize(serializer), SubscriptionResult::TransactionHash(ref hash) => hash.serialize(serializer), + SubscriptionResult::FullTransaction(ref tx) => tx.serialize(serializer), SubscriptionResult::SyncState(ref sync) => sync.serialize(serializer), } } @@ -76,10 +83,10 @@ pub enum SubscriptionKind { Logs, /// New Pending Transactions subscription. /// - /// Returns the hash for all transactions that are added to the pending state and are signed - /// with a key that is available in the node. When a transaction that was previously part of - /// the canonical chain isn't part of the new canonical chain after a reorganization its again - /// emitted. + /// Returns the hash or full tx for all transactions that are added to the pending state and + /// are signed with a key that is available in the node. When a transaction that was + /// previously part of the canonical chain isn't part of the new canonical chain after a + /// reorganization its again emitted. NewPendingTransactions, /// Node syncing status subscription. /// @@ -97,6 +104,8 @@ pub enum Params { None, /// Log parameters. Logs(Box), + /// New pending transaction parameters. + NewPendingTransactions(bool), } impl Serialize for Params { @@ -107,6 +116,7 @@ impl Serialize for Params { match self { Params::None => (&[] as &[serde_json::Value]).serialize(serializer), Params::Logs(logs) => logs.serialize(serializer), + Params::NewPendingTransactions(full) => full.serialize(serializer), } } } diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index f474426037..aa5dd7363f 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -3,21 +3,22 @@ use crate::eth::logs_utils; use futures::StreamExt; use jsonrpsee::{server::SubscriptionMessage, PendingSubscriptionSink, SubscriptionSink}; use reth_network_api::NetworkInfo; -use reth_primitives::TxHash; +use reth_primitives::{IntoRecoveredTransaction, TxHash}; use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider}; use reth_rpc_api::EthPubSubApiServer; use reth_rpc_types::FilteredParams; use std::sync::Arc; +use crate::result::invalid_params_rpc_err; use reth_rpc_types::{ pubsub::{ Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult, SyncStatusMetadata, }, - Header, Log, + Header, Log, Transaction, }; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; -use reth_transaction_pool::TransactionPool; +use reth_transaction_pool::{NewTransactionEvent, TransactionPool}; use serde::Serialize; use tokio_stream::{ wrappers::{BroadcastStream, ReceiverStream}, @@ -121,8 +122,34 @@ where pipe_from_stream(accepted_sink, stream).await } SubscriptionKind::NewPendingTransactions => { - let stream = - pubsub.pending_transaction_stream().map(EthSubscriptionResult::TransactionHash); + if let Some(params) = params { + match params { + Params::NewPendingTransactions(true) => { + // full transaction objects requested + let stream = pubsub.full_pending_transaction_stream().map(|tx| { + EthSubscriptionResult::FullTransaction(Box::new( + Transaction::from_recovered( + tx.transaction.to_recovered_transaction(), + ), + )) + }); + return pipe_from_stream(accepted_sink, stream).await + } + Params::NewPendingTransactions(false) | Params::None => { + // only hashes requested + } + Params::Logs(_) => { + return Err(invalid_params_rpc_err( + "Invalid params for newPendingTransactions", + ) + .into()) + } + } + } + + let stream = pubsub + .pending_transaction_hashes_stream() + .map(EthSubscriptionResult::TransactionHash); pipe_from_stream(accepted_sink, stream).await } SubscriptionKind::Syncing => { @@ -241,9 +268,16 @@ where Pool: TransactionPool + 'static, { /// Returns a stream that yields all transactions emitted by the txpool. - fn pending_transaction_stream(&self) -> impl Stream { + fn pending_transaction_hashes_stream(&self) -> impl Stream { ReceiverStream::new(self.pool.pending_transactions_listener()) } + + /// Returns a stream that yields all transactions emitted by the txpool. + fn full_pending_transaction_stream( + &self, + ) -> impl Stream::Transaction>> { + self.pool.new_pending_pool_transactions_listener() + } } impl EthPubSubInner