feat: add full pending txs to stream (#3649)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
fomotrader
2023-07-10 07:34:12 -04:00
committed by GitHub
parent c8d0e7e9b3
commit 3910babb91
2 changed files with 55 additions and 11 deletions

View File

@@ -1,6 +1,10 @@
//! Ethereum types for pub-sub
use crate::{eth::Filter, Log, RichHeader};
use crate::{
eth::{Filter, Transaction},
Log, RichHeader,
};
use reth_primitives::H256;
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
@@ -14,6 +18,8 @@ pub enum SubscriptionResult {
Log(Box<Log>),
/// Transaction hash
TransactionHash(H256),
/// Full Transaction
FullTransaction(Box<Transaction>),
/// SyncStatus
SyncState(PubSubSyncStatus),
}
@@ -49,6 +55,7 @@ impl Serialize for SubscriptionResult {
SubscriptionResult::Header(ref header) => header.serialize(serializer),
SubscriptionResult::Log(ref log) => log.serialize(serializer),
SubscriptionResult::TransactionHash(ref hash) => hash.serialize(serializer),
SubscriptionResult::FullTransaction(ref tx) => tx.serialize(serializer),
SubscriptionResult::SyncState(ref sync) => sync.serialize(serializer),
}
}
@@ -76,10 +83,10 @@ pub enum SubscriptionKind {
Logs,
/// New Pending Transactions subscription.
///
/// Returns the hash for all transactions that are added to the pending state and are signed
/// with a key that is available in the node. When a transaction that was previously part of
/// the canonical chain isn't part of the new canonical chain after a reorganization its again
/// emitted.
/// Returns the hash or full tx for all transactions that are added to the pending state and
/// are signed with a key that is available in the node. When a transaction that was
/// previously part of the canonical chain isn't part of the new canonical chain after a
/// reorganization its again emitted.
NewPendingTransactions,
/// Node syncing status subscription.
///
@@ -97,6 +104,8 @@ pub enum Params {
None,
/// Log parameters.
Logs(Box<Filter>),
/// New pending transaction parameters.
NewPendingTransactions(bool),
}
impl Serialize for Params {
@@ -107,6 +116,7 @@ impl Serialize for Params {
match self {
Params::None => (&[] as &[serde_json::Value]).serialize(serializer),
Params::Logs(logs) => logs.serialize(serializer),
Params::NewPendingTransactions(full) => full.serialize(serializer),
}
}
}

View File

@@ -3,21 +3,22 @@ use crate::eth::logs_utils;
use futures::StreamExt;
use jsonrpsee::{server::SubscriptionMessage, PendingSubscriptionSink, SubscriptionSink};
use reth_network_api::NetworkInfo;
use reth_primitives::TxHash;
use reth_primitives::{IntoRecoveredTransaction, TxHash};
use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::FilteredParams;
use std::sync::Arc;
use crate::result::invalid_params_rpc_err;
use reth_rpc_types::{
pubsub::{
Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult,
SyncStatusMetadata,
},
Header, Log,
Header, Log, Transaction,
};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::TransactionPool;
use reth_transaction_pool::{NewTransactionEvent, TransactionPool};
use serde::Serialize;
use tokio_stream::{
wrappers::{BroadcastStream, ReceiverStream},
@@ -121,8 +122,34 @@ where
pipe_from_stream(accepted_sink, stream).await
}
SubscriptionKind::NewPendingTransactions => {
let stream =
pubsub.pending_transaction_stream().map(EthSubscriptionResult::TransactionHash);
if let Some(params) = params {
match params {
Params::NewPendingTransactions(true) => {
// full transaction objects requested
let stream = pubsub.full_pending_transaction_stream().map(|tx| {
EthSubscriptionResult::FullTransaction(Box::new(
Transaction::from_recovered(
tx.transaction.to_recovered_transaction(),
),
))
});
return pipe_from_stream(accepted_sink, stream).await
}
Params::NewPendingTransactions(false) | Params::None => {
// only hashes requested
}
Params::Logs(_) => {
return Err(invalid_params_rpc_err(
"Invalid params for newPendingTransactions",
)
.into())
}
}
}
let stream = pubsub
.pending_transaction_hashes_stream()
.map(EthSubscriptionResult::TransactionHash);
pipe_from_stream(accepted_sink, stream).await
}
SubscriptionKind::Syncing => {
@@ -241,9 +268,16 @@ where
Pool: TransactionPool + 'static,
{
/// Returns a stream that yields all transactions emitted by the txpool.
fn pending_transaction_stream(&self) -> impl Stream<Item = TxHash> {
fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
ReceiverStream::new(self.pool.pending_transactions_listener())
}
/// Returns a stream that yields all transactions emitted by the txpool.
fn full_pending_transaction_stream(
&self,
) -> impl Stream<Item = NewTransactionEvent<<Pool as TransactionPool>::Transaction>> {
self.pool.new_pending_pool_transactions_listener()
}
}
impl<Provider, Pool, Events, Network> EthPubSubInner<Provider, Pool, Events, Network>