From 1468563f2f9e312fbe7fe9603a3f480f3b6f9bfa Mon Sep 17 00:00:00 2001 From: Nil Medvedev Date: Mon, 16 Oct 2023 14:08:41 +0100 Subject: [PATCH] Feat/pending transactions filter (#5014) Co-authored-by: Matthias Seitz --- crates/rpc/rpc/src/eth/filter.rs | 41 +++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 5eb4053f9a..8facee2ead 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -7,17 +7,18 @@ use crate::{ result::{rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, }; +use alloy_primitives::B256; use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_interfaces::RethError; -use reth_primitives::{BlockHashOrNumber, Receipt, SealedBlock}; +use reth_primitives::{BlockHashOrNumber, Receipt, SealedBlock, TxHash}; use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider}; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log}; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant}; -use tokio::sync::Mutex; +use tokio::sync::{mpsc::Receiver, Mutex}; use tracing::trace; /// The maximum number of headers we read at once when handling a range filter. @@ -94,8 +95,9 @@ where }; match kind { - FilterKind::PendingTransaction => { - Err(EthApiError::Unsupported("pending transaction filter not supported").into()) + FilterKind::PendingTransaction(receiver) => { + let pending_txs = receiver.drain().await; + Ok(FilterChanges::Hashes(pending_txs)) } FilterKind::Block => { // Note: we need to fetch the block hashes from inclusive range @@ -182,7 +184,11 @@ where /// Handler for `eth_newPendingTransactionFilter` async fn new_pending_transaction_filter(&self) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter"); - self.inner.install_filter(FilterKind::PendingTransaction).await + let receiver = self.inner.pool.pending_transactions_listener(); + + let pending_txs_receiver = PendingTransactionsReceiver::new(receiver); + + self.inner.install_filter(FilterKind::PendingTransaction(pending_txs_receiver)).await } /// Handler for `eth_getFilterChanges` @@ -419,11 +425,34 @@ struct ActiveFilter { kind: FilterKind, } +/// A receiver for pending transactions that returns all new transactions since the last poll. +#[derive(Debug, Clone)] +struct PendingTransactionsReceiver { + txs_receiver: Arc>>, +} + +impl PendingTransactionsReceiver { + fn new(receiver: Receiver) -> Self { + PendingTransactionsReceiver { txs_receiver: Arc::new(Mutex::new(receiver)) } + } + + /// Returns all new pending transactions received since the last poll. + async fn drain(&self) -> Vec { + let mut pending_txs = Vec::new(); + let mut prepared_stream = self.txs_receiver.lock().await; + + while let Ok(tx_hash) = prepared_stream.try_recv() { + pending_txs.push(tx_hash); + } + pending_txs + } +} + #[derive(Clone, Debug)] enum FilterKind { Log(Box), Block, - PendingTransaction, + PendingTransaction(PendingTransactionsReceiver), } /// Errors that can occur in the handler implementation