feat: make SubscriptionIdcompatible with client proc macro (#1378)

This commit is contained in:
Aurélien
2023-02-15 20:33:42 +01:00
committed by GitHub
parent 9842f15ee0
commit cd9b988367
5 changed files with 51 additions and 21 deletions

1
Cargo.lock generated
View File

@@ -4689,6 +4689,7 @@ name = "reth-rpc-types"
version = "0.1.0"
dependencies = [
"bytes",
"jsonrpsee",
"reth-network-api",
"reth-primitives",
"reth-rlp",

View File

@@ -1,34 +1,34 @@
use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc, types::SubscriptionId};
use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc};
use reth_primitives::rpc::Filter;
use reth_rpc_types::{FilterChanges, Log};
use reth_rpc_types::{FilterChanges, FilterId, Log};
/// Rpc Interface for poll-based ethereum filter API.
#[cfg_attr(not(feature = "client"), rpc(server))]
#[cfg_attr(feature = "client", rpc(server))] // TODO(mattsse) make it work with SubscriptionId lifetime
#[cfg_attr(feature = "client", rpc(server, client))]
pub trait EthFilterApi {
/// Creates anew filter and returns its id.
#[method(name = "eth_newFilter")]
async fn new_filter(&self, filter: Filter) -> Result<SubscriptionId<'static>>;
async fn new_filter(&self, filter: Filter) -> Result<FilterId>;
/// Creates a new block filter and returns its id.
#[method(name = "eth_newBlockFilter")]
async fn new_block_filter(&self) -> Result<SubscriptionId<'static>>;
async fn new_block_filter(&self) -> Result<FilterId>;
/// Creates a pending transaction filter and returns its id.
#[method(name = "eth_newPendingTransactionFilter")]
async fn new_pending_transaction_filter(&self) -> Result<SubscriptionId<'static>>;
async fn new_pending_transaction_filter(&self) -> Result<FilterId>;
/// Returns all filter changes since last poll.
#[method(name = "eth_getFilterChanges")]
async fn filter_changes(&self, id: SubscriptionId<'_>) -> Result<FilterChanges>;
async fn filter_changes(&self, id: FilterId) -> Result<FilterChanges>;
/// Returns all logs matching given filter (in a range 'from' - 'to').
#[method(name = "eth_getFilterLogs")]
async fn filter_logs(&self, id: SubscriptionId<'_>) -> Result<Vec<Log>>;
async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>>;
/// Uninstalls filter.
#[method(name = "eth_uninstallFilter")]
async fn uninstall_filter(&self, id: SubscriptionId<'_>) -> Result<bool>;
async fn uninstall_filter(&self, id: FilterId) -> Result<bool>;
/// Returns logs matching given filter object.
#[method(name = "eth_getLogs")]

View File

@@ -21,3 +21,4 @@ thiserror = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bytes = "1.2"
jsonrpsee = { version = "0.16" }

View File

@@ -1,4 +1,5 @@
use crate::Log;
use jsonrpsee::types::SubscriptionId;
use reth_primitives::H256;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -58,3 +59,32 @@ impl<'de> Deserialize<'de> for FilterChanges {
Ok(changes)
}
}
/// Owned equivalent of [SubscriptionId]
#[derive(Debug, PartialEq, Clone, Hash, Eq, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(untagged)]
pub enum FilterId {
/// Numeric id
Num(u64),
/// String id
Str(String),
}
impl From<FilterId> for SubscriptionId<'_> {
fn from(value: FilterId) -> Self {
match value {
FilterId::Num(n) => SubscriptionId::Num(n),
FilterId::Str(s) => SubscriptionId::Str(s.into()),
}
}
}
impl From<SubscriptionId<'_>> for FilterId {
fn from(value: SubscriptionId<'_>) -> Self {
match value {
SubscriptionId::Num(n) => FilterId::Num(n),
SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
}
}
}

View File

@@ -3,12 +3,11 @@ use async_trait::async_trait;
use jsonrpsee::{
core::RpcResult,
server::{IdProvider, RandomIntegerIdProvider},
types::SubscriptionId,
};
use reth_primitives::rpc::Filter;
use reth_provider::BlockProvider;
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{FilterChanges, Log};
use reth_rpc_types::{FilterChanges, FilterId, Log};
use reth_transaction_pool::TransactionPool;
use std::{collections::HashMap, sync::Arc, time::Instant};
use tokio::sync::Mutex;
@@ -45,29 +44,28 @@ where
Client: BlockProvider + 'static,
Pool: TransactionPool + 'static,
{
async fn new_filter(&self, filter: Filter) -> RpcResult<SubscriptionId<'static>> {
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
self.inner.install_filter(FilterKind::Log(filter)).await
}
async fn new_block_filter(&self) -> RpcResult<SubscriptionId<'static>> {
async fn new_block_filter(&self) -> RpcResult<FilterId> {
self.inner.install_filter(FilterKind::Block).await
}
async fn new_pending_transaction_filter(&self) -> RpcResult<SubscriptionId<'static>> {
async fn new_pending_transaction_filter(&self) -> RpcResult<FilterId> {
self.inner.install_filter(FilterKind::PendingTransaction).await
}
async fn filter_changes(&self, _id: SubscriptionId<'_>) -> RpcResult<FilterChanges> {
async fn filter_changes(&self, _id: FilterId) -> RpcResult<FilterChanges> {
todo!()
}
async fn filter_logs(&self, _id: SubscriptionId<'_>) -> RpcResult<Vec<Log>> {
async fn filter_logs(&self, _id: FilterId) -> RpcResult<Vec<Log>> {
todo!()
}
async fn uninstall_filter(&self, id: SubscriptionId<'_>) -> RpcResult<bool> {
async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
let mut filters = self.inner.active_filters.inner.lock().await;
let id = id.into_owned();
if filters.remove(&id).is_some() {
trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
Ok(true)
@@ -99,9 +97,9 @@ where
Pool: TransactionPool + 'static,
{
/// Installs a new filter and returns the new identifier.
async fn install_filter(&self, kind: FilterKind) -> RpcResult<SubscriptionId<'static>> {
async fn install_filter(&self, kind: FilterKind) -> RpcResult<FilterId> {
let last_poll_block_number = self.client.chain_info().to_rpc_result()?.best_number;
let id = self.id_provider.next_id();
let id = FilterId::from(self.id_provider.next_id());
let mut filters = self.active_filters.inner.lock().await;
filters.insert(
id.clone(),
@@ -114,7 +112,7 @@ where
/// All active filters
#[derive(Debug, Clone, Default)]
pub struct ActiveFilters {
inner: Arc<Mutex<HashMap<SubscriptionId<'static>, ActiveFilter>>>,
inner: Arc<Mutex<HashMap<FilterId, ActiveFilter>>>,
}
/// An installed filter