diff --git a/Cargo.lock b/Cargo.lock index b65534f2cf..18e7d51622 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4689,6 +4689,7 @@ name = "reth-rpc-types" version = "0.1.0" dependencies = [ "bytes", + "jsonrpsee", "reth-network-api", "reth-primitives", "reth-rlp", diff --git a/crates/rpc/rpc-api/src/eth_filter.rs b/crates/rpc/rpc-api/src/eth_filter.rs index bab30675d5..addd8ab9e4 100644 --- a/crates/rpc/rpc-api/src/eth_filter.rs +++ b/crates/rpc/rpc-api/src/eth_filter.rs @@ -1,34 +1,34 @@ -use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc, types::SubscriptionId}; +use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc}; use reth_primitives::rpc::Filter; -use reth_rpc_types::{FilterChanges, Log}; +use reth_rpc_types::{FilterChanges, FilterId, Log}; /// Rpc Interface for poll-based ethereum filter API. #[cfg_attr(not(feature = "client"), rpc(server))] -#[cfg_attr(feature = "client", rpc(server))] // TODO(mattsse) make it work with SubscriptionId lifetime +#[cfg_attr(feature = "client", rpc(server, client))] pub trait EthFilterApi { /// Creates anew filter and returns its id. #[method(name = "eth_newFilter")] - async fn new_filter(&self, filter: Filter) -> Result>; + async fn new_filter(&self, filter: Filter) -> Result; /// Creates a new block filter and returns its id. #[method(name = "eth_newBlockFilter")] - async fn new_block_filter(&self) -> Result>; + async fn new_block_filter(&self) -> Result; /// Creates a pending transaction filter and returns its id. #[method(name = "eth_newPendingTransactionFilter")] - async fn new_pending_transaction_filter(&self) -> Result>; + async fn new_pending_transaction_filter(&self) -> Result; /// Returns all filter changes since last poll. #[method(name = "eth_getFilterChanges")] - async fn filter_changes(&self, id: SubscriptionId<'_>) -> Result; + async fn filter_changes(&self, id: FilterId) -> Result; /// Returns all logs matching given filter (in a range 'from' - 'to'). #[method(name = "eth_getFilterLogs")] - async fn filter_logs(&self, id: SubscriptionId<'_>) -> Result>; + async fn filter_logs(&self, id: FilterId) -> Result>; /// Uninstalls filter. #[method(name = "eth_uninstallFilter")] - async fn uninstall_filter(&self, id: SubscriptionId<'_>) -> Result; + async fn uninstall_filter(&self, id: FilterId) -> Result; /// Returns logs matching given filter object. #[method(name = "eth_getLogs")] diff --git a/crates/rpc/rpc-types/Cargo.toml b/crates/rpc/rpc-types/Cargo.toml index ee140dba79..cd8393495a 100644 --- a/crates/rpc/rpc-types/Cargo.toml +++ b/crates/rpc/rpc-types/Cargo.toml @@ -21,3 +21,4 @@ thiserror = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" bytes = "1.2" +jsonrpsee = { version = "0.16" } diff --git a/crates/rpc/rpc-types/src/eth/filter.rs b/crates/rpc/rpc-types/src/eth/filter.rs index ec9447d90a..0b90276343 100644 --- a/crates/rpc/rpc-types/src/eth/filter.rs +++ b/crates/rpc/rpc-types/src/eth/filter.rs @@ -1,4 +1,5 @@ use crate::Log; +use jsonrpsee::types::SubscriptionId; use reth_primitives::H256; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -58,3 +59,32 @@ impl<'de> Deserialize<'de> for FilterChanges { Ok(changes) } } + +/// Owned equivalent of [SubscriptionId] +#[derive(Debug, PartialEq, Clone, Hash, Eq, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +#[serde(untagged)] +pub enum FilterId { + /// Numeric id + Num(u64), + /// String id + Str(String), +} + +impl From for SubscriptionId<'_> { + fn from(value: FilterId) -> Self { + match value { + FilterId::Num(n) => SubscriptionId::Num(n), + FilterId::Str(s) => SubscriptionId::Str(s.into()), + } + } +} + +impl From> for FilterId { + fn from(value: SubscriptionId<'_>) -> Self { + match value { + SubscriptionId::Num(n) => FilterId::Num(n), + SubscriptionId::Str(s) => FilterId::Str(s.into_owned()), + } + } +} diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index f00d497829..1a10b785c1 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -3,12 +3,11 @@ use async_trait::async_trait; use jsonrpsee::{ core::RpcResult, server::{IdProvider, RandomIntegerIdProvider}, - types::SubscriptionId, }; use reth_primitives::rpc::Filter; use reth_provider::BlockProvider; use reth_rpc_api::EthFilterApiServer; -use reth_rpc_types::{FilterChanges, Log}; +use reth_rpc_types::{FilterChanges, FilterId, Log}; use reth_transaction_pool::TransactionPool; use std::{collections::HashMap, sync::Arc, time::Instant}; use tokio::sync::Mutex; @@ -45,29 +44,28 @@ where Client: BlockProvider + 'static, Pool: TransactionPool + 'static, { - async fn new_filter(&self, filter: Filter) -> RpcResult> { + async fn new_filter(&self, filter: Filter) -> RpcResult { self.inner.install_filter(FilterKind::Log(filter)).await } - async fn new_block_filter(&self) -> RpcResult> { + async fn new_block_filter(&self) -> RpcResult { self.inner.install_filter(FilterKind::Block).await } - async fn new_pending_transaction_filter(&self) -> RpcResult> { + async fn new_pending_transaction_filter(&self) -> RpcResult { self.inner.install_filter(FilterKind::PendingTransaction).await } - async fn filter_changes(&self, _id: SubscriptionId<'_>) -> RpcResult { + async fn filter_changes(&self, _id: FilterId) -> RpcResult { todo!() } - async fn filter_logs(&self, _id: SubscriptionId<'_>) -> RpcResult> { + async fn filter_logs(&self, _id: FilterId) -> RpcResult> { todo!() } - async fn uninstall_filter(&self, id: SubscriptionId<'_>) -> RpcResult { + async fn uninstall_filter(&self, id: FilterId) -> RpcResult { let mut filters = self.inner.active_filters.inner.lock().await; - let id = id.into_owned(); if filters.remove(&id).is_some() { trace!(target: "rpc::eth::filter", ?id, "uninstalled filter"); Ok(true) @@ -99,9 +97,9 @@ where Pool: TransactionPool + 'static, { /// Installs a new filter and returns the new identifier. - async fn install_filter(&self, kind: FilterKind) -> RpcResult> { + async fn install_filter(&self, kind: FilterKind) -> RpcResult { let last_poll_block_number = self.client.chain_info().to_rpc_result()?.best_number; - let id = self.id_provider.next_id(); + let id = FilterId::from(self.id_provider.next_id()); let mut filters = self.active_filters.inner.lock().await; filters.insert( id.clone(), @@ -114,7 +112,7 @@ where /// All active filters #[derive(Debug, Clone, Default)] pub struct ActiveFilters { - inner: Arc, ActiveFilter>>>, + inner: Arc>>, } /// An installed filter