diff --git a/crates/rpc/rpc-api/src/eth_filter.rs b/crates/rpc/rpc-api/src/eth_filter.rs index be0330e535..bab30675d5 100644 --- a/crates/rpc/rpc-api/src/eth_filter.rs +++ b/crates/rpc/rpc-api/src/eth_filter.rs @@ -1,34 +1,34 @@ -use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc}; -use reth_primitives::{rpc::Filter, U256}; -use reth_rpc_types::{FilterChanges, Index, Log}; +use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc, types::SubscriptionId}; +use reth_primitives::rpc::Filter; +use reth_rpc_types::{FilterChanges, Log}; /// Rpc Interface for poll-based ethereum filter API. #[cfg_attr(not(feature = "client"), rpc(server))] -#[cfg_attr(feature = "client", rpc(server, client))] +#[cfg_attr(feature = "client", rpc(server))] // TODO(mattsse) make it work with SubscriptionId lifetime pub trait EthFilterApi { /// Creates anew filter and returns its id. #[method(name = "eth_newFilter")] - fn new_filter(&self, filter: Filter) -> Result; + async fn new_filter(&self, filter: Filter) -> Result>; /// Creates a new block filter and returns its id. #[method(name = "eth_newBlockFilter")] - fn new_block_filter(&self) -> Result; + async fn new_block_filter(&self) -> Result>; /// Creates a pending transaction filter and returns its id. #[method(name = "eth_newPendingTransactionFilter")] - fn new_pending_transaction_filter(&self) -> Result; + async fn new_pending_transaction_filter(&self) -> Result>; /// Returns all filter changes since last poll. #[method(name = "eth_getFilterChanges")] - async fn filter_changes(&self, index: Index) -> Result; + async fn filter_changes(&self, id: SubscriptionId<'_>) -> Result; /// Returns all logs matching given filter (in a range 'from' - 'to'). #[method(name = "eth_getFilterLogs")] - async fn filter_logs(&self, index: Index) -> Result>; + async fn filter_logs(&self, id: SubscriptionId<'_>) -> Result>; /// Uninstalls filter. #[method(name = "eth_uninstallFilter")] - fn uninstall_filter(&self, index: Index) -> Result; + async fn uninstall_filter(&self, id: SubscriptionId<'_>) -> Result; /// Returns logs matching given filter object. #[method(name = "eth_getLogs")] diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 83310b527a..f00d497829 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -1,11 +1,18 @@ +use crate::result::{internal_rpc_err, ToRpcResult}; use async_trait::async_trait; -use jsonrpsee::core::RpcResult; -use reth_primitives::{rpc::Filter, U256}; +use jsonrpsee::{ + core::RpcResult, + server::{IdProvider, RandomIntegerIdProvider}, + types::SubscriptionId, +}; +use reth_primitives::rpc::Filter; use reth_provider::BlockProvider; use reth_rpc_api::EthFilterApiServer; -use reth_rpc_types::{FilterChanges, Index, Log}; +use reth_rpc_types::{FilterChanges, Log}; use reth_transaction_pool::TransactionPool; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc, time::Instant}; +use tokio::sync::Mutex; +use tracing::trace; /// `Eth` filter RPC implementation. #[derive(Debug, Clone)] @@ -16,10 +23,20 @@ pub struct EthFilter { impl EthFilter { /// Creates a new, shareable instance. - pub fn new(client: Arc, pool: Pool) -> Self { - let inner = EthFilterInner { client, pool }; + pub fn new(client: Client, pool: Pool) -> Self { + let inner = EthFilterInner { + client, + active_filters: Default::default(), + pool, + id_provider: Arc::new(RandomIntegerIdProvider), + }; Self { inner: Arc::new(inner) } } + + /// Returns all currently active filters + pub fn active_filters(&self) -> &ActiveFilters { + &self.inner.active_filters + } } #[async_trait] @@ -28,28 +45,35 @@ where Client: BlockProvider + 'static, Pool: TransactionPool + 'static, { - fn new_filter(&self, _filter: Filter) -> RpcResult { + async fn new_filter(&self, filter: Filter) -> RpcResult> { + self.inner.install_filter(FilterKind::Log(filter)).await + } + + async fn new_block_filter(&self) -> RpcResult> { + self.inner.install_filter(FilterKind::Block).await + } + + async fn new_pending_transaction_filter(&self) -> RpcResult> { + self.inner.install_filter(FilterKind::PendingTransaction).await + } + + async fn filter_changes(&self, _id: SubscriptionId<'_>) -> RpcResult { todo!() } - fn new_block_filter(&self) -> RpcResult { + async fn filter_logs(&self, _id: SubscriptionId<'_>) -> RpcResult> { todo!() } - fn new_pending_transaction_filter(&self) -> RpcResult { - todo!() - } - - async fn filter_changes(&self, _index: Index) -> RpcResult { - todo!() - } - - async fn filter_logs(&self, _index: Index) -> RpcResult> { - todo!() - } - - fn uninstall_filter(&self, _index: Index) -> RpcResult { - todo!() + async fn uninstall_filter(&self, id: SubscriptionId<'_>) -> RpcResult { + let mut filters = self.inner.active_filters.inner.lock().await; + let id = id.into_owned(); + if filters.remove(&id).is_some() { + trace!(target: "rpc::eth::filter", ?id, "uninstalled filter"); + Ok(true) + } else { + Err(internal_rpc_err(format!("Filter id {id:?} does not exist."))) + } } async fn logs(&self, _filter: Filter) -> RpcResult> { @@ -63,6 +87,51 @@ struct EthFilterInner { /// The transaction pool. pool: Pool, /// The client that can interact with the chain. - client: Arc, - // TODO needs spawn access + client: Client, + /// All currently installed filters. + active_filters: ActiveFilters, + id_provider: Arc, +} + +impl EthFilterInner +where + Client: BlockProvider + 'static, + Pool: TransactionPool + 'static, +{ + /// Installs a new filter and returns the new identifier. + async fn install_filter(&self, kind: FilterKind) -> RpcResult> { + let last_poll_block_number = self.client.chain_info().to_rpc_result()?.best_number; + let id = self.id_provider.next_id(); + let mut filters = self.active_filters.inner.lock().await; + filters.insert( + id.clone(), + ActiveFilter { last_poll_block_number, last_poll_timestamp: Instant::now(), kind }, + ); + Ok(id) + } +} + +/// All active filters +#[derive(Debug, Clone, Default)] +pub struct ActiveFilters { + inner: Arc, ActiveFilter>>>, +} + +/// An installed filter +#[derive(Debug)] +struct ActiveFilter { + /// At which block the filter was polled last. + last_poll_block_number: u64, + /// Last time this filter was polled. + last_poll_timestamp: Instant, + /// What kind of filter it is. + kind: FilterKind, +} + +#[derive(Clone, Debug)] +#[allow(clippy::large_enum_variant)] // stored on heap +enum FilterKind { + Log(Filter), + Block, + PendingTransaction, }