perf(rpc): utilize caching layer in log range (#2553)

This commit is contained in:
Matthias Seitz
2023-05-04 15:26:27 +02:00
committed by GitHub
parent dfd35fabb4
commit 231d1f96bb
2 changed files with 49 additions and 16 deletions

View File

@@ -2,7 +2,7 @@
use futures::{future::Either, StreamExt};
use reth_interfaces::{provider::ProviderError, Result};
use reth_primitives::{Block, Receipt, TransactionSigned, H256};
use reth_primitives::{Block, Receipt, SealedBlock, TransactionSigned, H256};
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use revm::primitives::{BlockEnv, CfgEnv};
@@ -149,6 +149,13 @@ impl EthStateCache {
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
/// Requests the [Block] for the block hash, sealed with the given block hash.
///
/// Returns `None` if the block does not exist.
pub(crate) async fn get_sealed_block(&self, block_hash: H256) -> Result<Option<SealedBlock>> {
Ok(self.get_block(block_hash).await?.map(|block| block.seal(block_hash)))
}
/// Requests the transactions of the [Block]
///
/// Returns `None` if the block does not exist.

View File

@@ -1,12 +1,18 @@
use super::cache::EthStateCache;
use crate::{
eth::{error::EthApiError, logs_utils},
eth::{
error::{EthApiError, EthResult},
logs_utils,
},
result::{internal_rpc_err, rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider,
};
use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_primitives::filter::{Filter, FilterBlockOption, FilteredParams};
use reth_primitives::{
filter::{Filter, FilterBlockOption, FilteredParams},
SealedBlock,
};
use reth_provider::{BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{FilterChanges, FilterId, Log};
@@ -118,9 +124,11 @@ where
}
};
self.inner
let logs = self
.inner
.get_logs_in_block_range(&filter, from_block_number, to_block_number)
.map(FilterChanges::Logs)
.await?;
Ok(FilterChanges::Logs(logs))
}
}
}
@@ -221,7 +229,9 @@ where
let start_block = info.best_number;
let (from_block_number, to_block_number) =
logs_utils::get_filter_block_range(from_block, to_block, start_block, info);
self.get_logs_in_block_range(&filter, from_block_number, to_block_number)
Ok(self
.get_logs_in_block_range(&filter, from_block_number, to_block_number)
.await?)
}
}
}
@@ -242,17 +252,27 @@ where
Ok(id)
}
/// Returns the block with the given block number if it exists.
async fn block_by_number(&self, num: u64) -> EthResult<Option<SealedBlock>> {
match self.client.block_hash(num)? {
Some(hash) => Ok(self.eth_cache.get_sealed_block(hash).await?),
None => Ok(None),
}
}
/// Returns all logs in the given _inclusive_ range that match the filter
///
/// Returns an error if:
/// - underlying database error
/// - amount of matches exceeds configured limit
fn get_logs_in_block_range(
async fn get_logs_in_block_range(
&self,
filter: &Filter,
from_block: u64,
to_block: u64,
) -> RpcResult<Vec<Log>> {
) -> Result<Vec<Log>, FilterError> {
trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
let mut all_logs = Vec::new();
let filter_params = FilteredParams::new(Some(filter.clone()));
@@ -265,16 +285,14 @@ where
// loop over the range of new blocks and check logs if the filter matches the log's bloom
// filter
for block_number in from_block..=to_block {
if let Some(block) = self.client.block_by_number(block_number).to_rpc_result()? {
if let Some(block) = self.block_by_number(block_number).await? {
// only if filter matches
if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) &&
FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter)
{
// get receipts for the block
if let Some(receipts) =
self.client.receipts_by_block(block.number.into()).to_rpc_result()?
{
let block_hash = block.hash_slow();
if let Some(receipts) = self.eth_cache.get_receipts(block.hash).await? {
let block_hash = block.hash;
logs_utils::append_matching_block_logs(
&mut all_logs,
@@ -288,8 +306,7 @@ where
if all_logs.len() > self.max_logs_in_response {
return Err(FilterError::QueryExceedsMaxResults(
self.max_logs_in_response,
)
.into())
))
}
}
}
@@ -325,12 +342,14 @@ enum FilterKind {
}
/// Errors that can occur in the handler implementation
#[derive(Debug, Clone, thiserror::Error)]
#[derive(Debug, thiserror::Error)]
pub enum FilterError {
#[error("filter not found")]
FilterNotFound(FilterId),
#[error("Query exceeds max results {0}")]
QueryExceedsMaxResults(usize),
#[error(transparent)]
EthAPIError(#[from] EthApiError),
}
// convert the error
@@ -341,9 +360,16 @@ impl From<FilterError> for jsonrpsee::core::Error {
jsonrpsee::types::error::INVALID_PARAMS_CODE,
"filter not found",
),
FilterError::EthAPIError(err) => err.into(),
err @ FilterError::QueryExceedsMaxResults(_) => {
rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
}
}
}
}
impl From<reth_interfaces::Error> for FilterError {
fn from(err: reth_interfaces::Error) -> Self {
FilterError::EthAPIError(err.into())
}
}