From 269b878f5cdfd8e6444675fd1b25a8ceae02ecbb Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 7 Aug 2023 19:07:55 +0200 Subject: [PATCH] perf: no longer spawn filter tasks (#4096) --- crates/rpc/rpc/src/eth/filter.rs | 36 ++++++-------------------------- 1 file changed, 6 insertions(+), 30 deletions(-) diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 280bc0817f..a7c5ce53f3 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -15,11 +15,8 @@ use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log}; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; -use std::{ - collections::HashMap, future::Future, iter::StepBy, ops::RangeInclusive, sync::Arc, - time::Instant, -}; -use tokio::sync::{oneshot, Mutex}; +use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant}; +use tokio::sync::Mutex; use tracing::trace; /// The maximum number of headers we read at once when handling a range filter. @@ -69,26 +66,6 @@ where Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static, Pool: TransactionPool + 'static, { - /// Executes the given filter on a new task. - /// - /// All the filter handles are implemented asynchronously. However, filtering is still a bit CPU - /// intensive. - async fn spawn_filter_task(&self, c: C) -> Result - where - C: FnOnce(Self) -> F, - F: Future> + Send + 'static, - R: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - let this = self.clone(); - let f = c(this); - self.inner.task_spawner.spawn(Box::pin(async move { - let res = f.await; - let _ = tx.send(res); - })); - rx.await.map_err(|_| FilterError::InternalError)? - } - /// Returns all the filter changes for the given id, if any pub async fn filter_changes(&self, id: FilterId) -> Result { let info = self.inner.provider.chain_info()?; @@ -202,7 +179,7 @@ where /// Handler for `eth_getFilterChanges` async fn filter_changes(&self, id: FilterId) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_getFilterChanges"); - Ok(self.spawn_filter_task(|this| async move { this.filter_changes(id).await }).await?) + Ok(EthFilter::filter_changes(self, id).await?) } /// Returns an array of all logs matching filter with given id. @@ -212,7 +189,7 @@ where /// Handler for `eth_getFilterLogs` async fn filter_logs(&self, id: FilterId) -> RpcResult> { trace!(target: "rpc::eth", "Serving eth_getFilterLogs"); - Ok(self.spawn_filter_task(|this| async move { this.filter_logs(id).await }).await?) + Ok(EthFilter::filter_logs(self, id).await?) } /// Handler for `eth_uninstallFilter` @@ -232,9 +209,7 @@ where /// Handler for `eth_getLogs` async fn logs(&self, filter: Filter) -> RpcResult> { trace!(target: "rpc::eth", "Serving eth_getLogs"); - Ok(self - .spawn_filter_task(|this| async move { this.inner.logs_for_filter(filter).await }) - .await?) + Ok(EthFilter::logs(self, filter).await?) } } @@ -269,6 +244,7 @@ struct EthFilterInner { /// maximum number of headers to read at once for range filter max_headers_range: u64, /// The type that can spawn tasks. + #[allow(unused)] task_spawner: Box, }