From 231d1f96bba946059a2ad1f7ffed4ee3c7ff3494 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 4 May 2023 15:26:27 +0200 Subject: [PATCH] perf(rpc): utilize caching layer in log range (#2553) --- crates/rpc/rpc/src/eth/cache.rs | 9 ++++- crates/rpc/rpc/src/eth/filter.rs | 56 +++++++++++++++++++++++--------- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/crates/rpc/rpc/src/eth/cache.rs b/crates/rpc/rpc/src/eth/cache.rs index 15b41218c1..8b4b9c4375 100644 --- a/crates/rpc/rpc/src/eth/cache.rs +++ b/crates/rpc/rpc/src/eth/cache.rs @@ -2,7 +2,7 @@ use futures::{future::Either, StreamExt}; use reth_interfaces::{provider::ProviderError, Result}; -use reth_primitives::{Block, Receipt, TransactionSigned, H256}; +use reth_primitives::{Block, Receipt, SealedBlock, TransactionSigned, H256}; use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use revm::primitives::{BlockEnv, CfgEnv}; @@ -149,6 +149,13 @@ impl EthStateCache { rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? } + /// Requests the [Block] for the block hash, sealed with the given block hash. + /// + /// Returns `None` if the block does not exist. + pub(crate) async fn get_sealed_block(&self, block_hash: H256) -> Result> { + Ok(self.get_block(block_hash).await?.map(|block| block.seal(block_hash))) + } + /// Requests the transactions of the [Block] /// /// Returns `None` if the block does not exist. diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 2a6470aeab..aafa5587fd 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -1,12 +1,18 @@ use super::cache::EthStateCache; use crate::{ - eth::{error::EthApiError, logs_utils}, + eth::{ + error::{EthApiError, EthResult}, + logs_utils, + }, result::{internal_rpc_err, rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, }; use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; -use reth_primitives::filter::{Filter, FilterBlockOption, FilteredParams}; +use reth_primitives::{ + filter::{Filter, FilterBlockOption, FilteredParams}, + SealedBlock, +}; use reth_provider::{BlockProvider, EvmEnvProvider}; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{FilterChanges, FilterId, Log}; @@ -118,9 +124,11 @@ where } }; - self.inner + let logs = self + .inner .get_logs_in_block_range(&filter, from_block_number, to_block_number) - .map(FilterChanges::Logs) + .await?; + Ok(FilterChanges::Logs(logs)) } } } @@ -221,7 +229,9 @@ where let start_block = info.best_number; let (from_block_number, to_block_number) = logs_utils::get_filter_block_range(from_block, to_block, start_block, info); - self.get_logs_in_block_range(&filter, from_block_number, to_block_number) + Ok(self + .get_logs_in_block_range(&filter, from_block_number, to_block_number) + .await?) } } } @@ -242,17 +252,27 @@ where Ok(id) } + /// Returns the block with the given block number if it exists. + async fn block_by_number(&self, num: u64) -> EthResult> { + match self.client.block_hash(num)? { + Some(hash) => Ok(self.eth_cache.get_sealed_block(hash).await?), + None => Ok(None), + } + } + /// Returns all logs in the given _inclusive_ range that match the filter /// /// Returns an error if: /// - underlying database error /// - amount of matches exceeds configured limit - fn get_logs_in_block_range( + async fn get_logs_in_block_range( &self, filter: &Filter, from_block: u64, to_block: u64, - ) -> RpcResult> { + ) -> Result, FilterError> { + trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range"); + let mut all_logs = Vec::new(); let filter_params = FilteredParams::new(Some(filter.clone())); @@ -265,16 +285,14 @@ where // loop over the range of new blocks and check logs if the filter matches the log's bloom // filter for block_number in from_block..=to_block { - if let Some(block) = self.client.block_by_number(block_number).to_rpc_result()? { + if let Some(block) = self.block_by_number(block_number).await? { // only if filter matches if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) && FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter) { // get receipts for the block - if let Some(receipts) = - self.client.receipts_by_block(block.number.into()).to_rpc_result()? - { - let block_hash = block.hash_slow(); + if let Some(receipts) = self.eth_cache.get_receipts(block.hash).await? { + let block_hash = block.hash; logs_utils::append_matching_block_logs( &mut all_logs, @@ -288,8 +306,7 @@ where if all_logs.len() > self.max_logs_in_response { return Err(FilterError::QueryExceedsMaxResults( self.max_logs_in_response, - ) - .into()) + )) } } } @@ -325,12 +342,14 @@ enum FilterKind { } /// Errors that can occur in the handler implementation -#[derive(Debug, Clone, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum FilterError { #[error("filter not found")] FilterNotFound(FilterId), #[error("Query exceeds max results {0}")] QueryExceedsMaxResults(usize), + #[error(transparent)] + EthAPIError(#[from] EthApiError), } // convert the error @@ -341,9 +360,16 @@ impl From for jsonrpsee::core::Error { jsonrpsee::types::error::INVALID_PARAMS_CODE, "filter not found", ), + FilterError::EthAPIError(err) => err.into(), err @ FilterError::QueryExceedsMaxResults(_) => { rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string()) } } } } + +impl From for FilterError { + fn from(err: reth_interfaces::Error) -> Self { + FilterError::EthAPIError(err.into()) + } +}