Feat/pending transactions filter (#5014)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Nil Medvedev
2023-10-16 14:08:41 +01:00
committed by GitHub
parent 9ffc1433d9
commit 1468563f2f

View File

@@ -7,17 +7,18 @@ use crate::{
result::{rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider,
};
use alloy_primitives::B256;
use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_interfaces::RethError;
use reth_primitives::{BlockHashOrNumber, Receipt, SealedBlock};
use reth_primitives::{BlockHashOrNumber, Receipt, SealedBlock, TxHash};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider};
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant};
use tokio::sync::Mutex;
use tokio::sync::{mpsc::Receiver, Mutex};
use tracing::trace;
/// The maximum number of headers we read at once when handling a range filter.
@@ -94,8 +95,9 @@ where
};
match kind {
FilterKind::PendingTransaction => {
Err(EthApiError::Unsupported("pending transaction filter not supported").into())
FilterKind::PendingTransaction(receiver) => {
let pending_txs = receiver.drain().await;
Ok(FilterChanges::Hashes(pending_txs))
}
FilterKind::Block => {
// Note: we need to fetch the block hashes from inclusive range
@@ -182,7 +184,11 @@ where
/// Handler for `eth_newPendingTransactionFilter`
async fn new_pending_transaction_filter(&self) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
self.inner.install_filter(FilterKind::PendingTransaction).await
let receiver = self.inner.pool.pending_transactions_listener();
let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
self.inner.install_filter(FilterKind::PendingTransaction(pending_txs_receiver)).await
}
/// Handler for `eth_getFilterChanges`
@@ -419,11 +425,34 @@ struct ActiveFilter {
kind: FilterKind,
}
/// A receiver for pending transactions that returns all new transactions since the last poll.
#[derive(Debug, Clone)]
struct PendingTransactionsReceiver {
txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
}
impl PendingTransactionsReceiver {
fn new(receiver: Receiver<TxHash>) -> Self {
PendingTransactionsReceiver { txs_receiver: Arc::new(Mutex::new(receiver)) }
}
/// Returns all new pending transactions received since the last poll.
async fn drain(&self) -> Vec<B256> {
let mut pending_txs = Vec::new();
let mut prepared_stream = self.txs_receiver.lock().await;
while let Ok(tx_hash) = prepared_stream.try_recv() {
pending_txs.push(tx_hash);
}
pending_txs
}
}
#[derive(Clone, Debug)]
enum FilterKind {
Log(Box<Filter>),
Block,
PendingTransaction,
PendingTransaction(PendingTransactionsReceiver),
}
/// Errors that can occur in the handler implementation