From 42843d5d71c29c70581460d2ac40ec0c2c10f651 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 16 May 2023 19:35:48 +0200 Subject: [PATCH] perf: spawn eth filter tasks (#2696) --- crates/rpc/rpc-builder/src/auth.rs | 8 +- crates/rpc/rpc-builder/src/lib.rs | 6 +- crates/rpc/rpc/src/eth/filter.rs | 246 ++++++++++++++++++----------- 3 files changed, 163 insertions(+), 97 deletions(-) diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index b270f3d447..ac9bf3df78 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -61,7 +61,13 @@ where gas_oracle, Box::new(executor.clone()), ); - let eth_filter = EthFilter::new(client, pool, eth_cache.clone(), DEFAULT_MAX_LOGS_IN_RESPONSE); + let eth_filter = EthFilter::new( + client, + pool, + eth_cache.clone(), + DEFAULT_MAX_LOGS_IN_RESPONSE, + Box::new(executor.clone()), + ); launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 84db25c072..1af7f2e227 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -845,19 +845,21 @@ where }), ); + let executor = Box::new(self.executor.clone()); let api = EthApi::with_spawner( self.client.clone(), self.pool.clone(), self.network.clone(), cache.clone(), gas_oracle, - Box::new(self.executor.clone()), + executor.clone(), ); let filter = EthFilter::new( self.client.clone(), self.pool.clone(), cache.clone(), self.config.eth.max_logs_per_response, + executor.clone(), ); let pubsub = EthPubSub::with_spawner( @@ -865,7 +867,7 @@ where self.pool.clone(), self.events.clone(), self.network.clone(), - Box::new(self.executor.clone()), + executor, ); let eth = EthHandlers { api, cache, filter, pubsub }; diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 916685ab39..710578fb44 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -4,7 +4,7 @@ use crate::{ error::{EthApiError, EthResult}, logs_utils, }, - result::{internal_rpc_err, rpc_error_with_code, ToRpcResult}, + result::{rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, }; use async_trait::async_trait; @@ -13,16 +13,19 @@ use reth_primitives::{Receipt, SealedBlock}; use reth_provider::{BlockIdProvider, BlockProvider, EvmEnvProvider}; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log}; +use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; -use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant}; -use tokio::sync::Mutex; +use std::{ + collections::HashMap, future::Future, iter::StepBy, ops::RangeInclusive, sync::Arc, + time::Instant, +}; +use tokio::sync::{oneshot, Mutex}; use tracing::trace; /// The maximum number of headers we read at once when handling a range filter. const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb /// `Eth` filter RPC implementation. -#[derive(Debug, Clone)] pub struct EthFilter { /// All nested fields bundled together. inner: Arc>, @@ -40,6 +43,7 @@ impl EthFilter { pool: Pool, eth_cache: EthStateCache, max_logs_per_response: usize, + task_spawner: Box, ) -> Self { let inner = EthFilterInner { client, @@ -49,6 +53,7 @@ impl EthFilter { max_logs_per_response, eth_cache, max_headers_range: MAX_HEADERS_RANGE, + task_spawner, }; Self { inner: Arc::new(inner) } } @@ -59,6 +64,117 @@ impl EthFilter { } } +impl EthFilter +where + Client: BlockProvider + BlockIdProvider + EvmEnvProvider + 'static, + Pool: TransactionPool + 'static, +{ + /// Executes the given filter on a new task. + /// + /// All the filter handles are implemented asynchronously. However, filtering is still a bit CPU + /// intensive. + async fn spawn_filter_task(&self, c: C) -> Result + where + C: FnOnce(Self) -> F, + F: Future> + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let this = self.clone(); + let f = c(this); + self.inner.task_spawner.spawn(Box::pin(async move { + let res = f.await; + let _ = tx.send(res); + })); + rx.await.map_err(|_| FilterError::InternalError)? + } + + /// Returns all the filter changes for the given id, if any + pub async fn filter_changes(&self, id: FilterId) -> Result { + let info = self.inner.client.chain_info()?; + let best_number = info.best_number; + + let (start_block, kind) = { + let mut filters = self.inner.active_filters.inner.lock().await; + let filter = filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id))?; + + // update filter + // we fetch all changes from [filter.block..best_block], so we advance the filter's + // block to `best_block +1` + let mut block = best_number + 1; + std::mem::swap(&mut filter.block, &mut block); + filter.last_poll_timestamp = Instant::now(); + + (block, filter.kind.clone()) + }; + + match kind { + FilterKind::PendingTransaction => { + Err(EthApiError::Unsupported("pending transaction filter not supported").into()) + } + FilterKind::Block => { + let mut block_hashes = Vec::new(); + for block_num in start_block..best_number { + let block_hash = self + .inner + .client + .block_hash(block_num)? + .ok_or(EthApiError::UnknownBlockNumber)?; + block_hashes.push(block_hash); + } + Ok(FilterChanges::Hashes(block_hashes)) + } + FilterKind::Log(filter) => { + let (from_block_number, to_block_number) = match filter.block_option { + FilterBlockOption::Range { from_block, to_block } => { + let from = from_block + .map(|num| self.inner.client.convert_block_number(num)) + .transpose()? + .flatten(); + let to = to_block + .map(|num| self.inner.client.convert_block_number(num)) + .transpose()? + .flatten(); + logs_utils::get_filter_block_range(from, to, start_block, info) + } + FilterBlockOption::AtBlockHash(_) => { + // blockHash is equivalent to fromBlock = toBlock = the block number with + // hash blockHash + (start_block, best_number) + } + }; + + let logs = self + .inner + .get_logs_in_block_range(&filter, from_block_number, to_block_number) + .await?; + Ok(FilterChanges::Logs(logs)) + } + } + } + + /// Returns an array of all logs matching filter with given id. + /// + /// Returns an error if no matching log filter exists. + /// + /// Handler for `eth_getFilterLogs` + pub async fn filter_logs(&self, id: FilterId) -> Result, FilterError> { + let filter = { + let filters = self.inner.active_filters.inner.lock().await; + if let FilterKind::Log(ref filter) = + filters.get(&id).ok_or_else(|| FilterError::FilterNotFound(id.clone()))?.kind + { + *filter.clone() + } else { + // Not a log filter + return Err(FilterError::FilterNotFound(id)) + } + }; + + self.inner.logs_for_filter(filter).await + } +} + #[async_trait] impl EthFilterApiServer for EthFilter where @@ -86,69 +202,7 @@ where /// Handler for `eth_getFilterChanges` async fn filter_changes(&self, id: FilterId) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_getFilterChanges"); - let info = self.inner.client.chain_info().to_rpc_result()?; - let best_number = info.best_number; - - let (start_block, kind) = { - let mut filters = self.inner.active_filters.inner.lock().await; - let filter = filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id))?; - - // update filter - // we fetch all changes from [filter.block..best_block], so we advance the filter's - // block to `best_block +1` - let mut block = best_number + 1; - std::mem::swap(&mut filter.block, &mut block); - filter.last_poll_timestamp = Instant::now(); - - (block, filter.kind.clone()) - }; - - match kind { - FilterKind::PendingTransaction => { - return Err(internal_rpc_err("method not implemented")) - } - FilterKind::Block => { - let mut block_hashes = Vec::new(); - for block_num in start_block..best_number { - let block_hash = self - .inner - .client - .block_hash(block_num) - .to_rpc_result()? - .ok_or(EthApiError::UnknownBlockNumber)?; - block_hashes.push(block_hash); - } - Ok(FilterChanges::Hashes(block_hashes)) - } - FilterKind::Log(filter) => { - let (from_block_number, to_block_number) = match filter.block_option { - FilterBlockOption::Range { from_block, to_block } => { - let from = from_block - .map(|num| self.inner.client.convert_block_number(num)) - .transpose() - .to_rpc_result()? - .flatten(); - let to = to_block - .map(|num| self.inner.client.convert_block_number(num)) - .transpose() - .to_rpc_result()? - .flatten(); - logs_utils::get_filter_block_range(from, to, start_block, info) - } - FilterBlockOption::AtBlockHash(_) => { - // blockHash is equivalent to fromBlock = toBlock = the block number with - // hash blockHash - (start_block, best_number) - } - }; - - let logs = self - .inner - .get_logs_in_block_range(&filter, from_block_number, to_block_number) - .await?; - Ok(FilterChanges::Logs(logs)) - } - } + Ok(self.spawn_filter_task(|this| async move { this.filter_changes(id).await }).await?) } /// Returns an array of all logs matching filter with given id. @@ -158,19 +212,7 @@ where /// Handler for `eth_getFilterLogs` async fn filter_logs(&self, id: FilterId) -> RpcResult> { trace!(target: "rpc::eth", "Serving eth_getFilterLogs"); - let filter = { - let filters = self.inner.active_filters.inner.lock().await; - if let FilterKind::Log(ref filter) = - filters.get(&id).ok_or_else(|| FilterError::FilterNotFound(id.clone()))?.kind - { - *filter.clone() - } else { - // Not a log filter - return Err(FilterError::FilterNotFound(id).into()) - } - }; - - self.inner.logs_for_filter(filter).await + Ok(self.spawn_filter_task(|this| async move { this.filter_logs(id).await }).await?) } /// Handler for `eth_uninstallFilter` @@ -190,7 +232,21 @@ where /// Handler for `eth_getLogs` async fn logs(&self, filter: Filter) -> RpcResult> { trace!(target: "rpc::eth", "Serving eth_getLogs"); - self.inner.logs_for_filter(filter).await + Ok(self + .spawn_filter_task(|this| async move { this.inner.logs_for_filter(filter).await }) + .await?) + } +} + +impl std::fmt::Debug for EthFilter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EthFilter").finish_non_exhaustive() + } +} + +impl Clone for EthFilter { + fn clone(&self) -> Self { + Self { inner: Arc::clone(&self.inner) } } } @@ -212,6 +268,8 @@ struct EthFilterInner { eth_cache: EthStateCache, /// maximum number of headers to read at once for range filter max_headers_range: u64, + /// The type that can spawn tasks. + task_spawner: Box, } impl EthFilterInner @@ -220,16 +278,14 @@ where Pool: TransactionPool + 'static, { /// Returns logs matching given filter object. - async fn logs_for_filter(&self, filter: Filter) -> RpcResult> { + async fn logs_for_filter(&self, filter: Filter) -> Result, FilterError> { match filter.block_option { FilterBlockOption::AtBlockHash(block_hash) => { let mut all_logs = Vec::new(); // all matching logs in the block, if it exists - if let Some(block) = self.eth_cache.get_block(block_hash).await.to_rpc_result()? { + if let Some(block) = self.eth_cache.get_block(block_hash).await? { // get receipts for the block - if let Some(receipts) = - self.eth_cache.get_receipts(block_hash).await.to_rpc_result()? - { + if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? { let filter = FilteredParams::new(Some(filter)); logs_utils::append_matching_block_logs( &mut all_logs, @@ -244,25 +300,21 @@ where } FilterBlockOption::Range { from_block, to_block } => { // compute the range - let info = self.client.chain_info().to_rpc_result()?; + let info = self.client.chain_info()?; // we start at the most recent block if unset in filter let start_block = info.best_number; let from = from_block .map(|num| self.client.convert_block_number(num)) - .transpose() - .to_rpc_result()? + .transpose()? .flatten(); let to = to_block .map(|num| self.client.convert_block_number(num)) - .transpose() - .to_rpc_result()? + .transpose()? .flatten(); let (from_block_number, to_block_number) = logs_utils::get_filter_block_range(from, to, start_block, info); - Ok(self - .get_logs_in_block_range(&filter, from_block_number, to_block_number) - .await?) + self.get_logs_in_block_range(&filter, from_block_number, to_block_number).await } } } @@ -393,6 +445,9 @@ pub enum FilterError { QueryExceedsMaxResults(usize), #[error(transparent)] EthAPIError(#[from] EthApiError), + /// Error thrown when a spawned task failed to deliver a response. + #[error("internal filter error")] + InternalError, } // convert the error @@ -403,6 +458,9 @@ impl From for jsonrpsee::types::error::ErrorObject<'static> { jsonrpsee::types::error::INVALID_PARAMS_CODE, "filter not found", ), + err @ FilterError::InternalError => { + rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string()) + } FilterError::EthAPIError(err) => err.into(), err @ FilterError::QueryExceedsMaxResults(_) => { rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())