From 8c0979144ca1d03176dfb0ecd44eb60cc054fe93 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 11 May 2023 19:52:48 +0200 Subject: [PATCH] perf: read header range first (#2635) --- crates/rpc/rpc/src/eth/filter.rs | 104 ++++++++++++++++++++++++++----- 1 file changed, 89 insertions(+), 15 deletions(-) diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 383795ad13..c9c1cc3a41 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -11,16 +11,19 @@ use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_primitives::{ filter::{Filter, FilterBlockOption, FilteredParams}, - SealedBlock, + Receipt, SealedBlock, }; use reth_provider::{BlockIdProvider, BlockProvider, EvmEnvProvider}; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{FilterChanges, FilterId, Log}; use reth_transaction_pool::TransactionPool; -use std::{collections::HashMap, sync::Arc, time::Instant}; +use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant}; use tokio::sync::Mutex; use tracing::trace; +/// The maximum number of headers we read at once when handling a range filter. +const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb + /// `Eth` filter RPC implementation. #[derive(Debug, Clone)] pub struct EthFilter { @@ -48,6 +51,7 @@ impl EthFilter { id_provider: Arc::new(EthSubscriptionIdProvider::default()), max_logs_per_response, eth_cache, + max_headers_range: MAX_HEADERS_RANGE, }; Self { inner: Arc::new(inner) } } @@ -209,6 +213,8 @@ struct EthFilterInner { max_logs_per_response: usize, /// The async cache frontend for eth related data eth_cache: EthStateCache, + /// maximum number of headers to read at once for range filter + max_headers_range: u64, } impl EthFilterInner @@ -280,12 +286,21 @@ where Ok(id) } - /// Returns the block with the given block number if it exists. - async fn block_by_number(&self, num: u64) -> EthResult> { - match self.client.block_hash(num)? { - Some(hash) => Ok(self.eth_cache.get_sealed_block(hash).await?), - None => Ok(None), - } + async fn block_and_receipts( + &self, + block_number: u64, + ) -> EthResult)>> { + let block_hash = match self.client.block_hash(block_number)? { + Some(hash) => hash, + None => return Ok(None), + }; + + let block = self.eth_cache.get_sealed_block(block_hash); + let receipts = self.eth_cache.get_receipts(block_hash); + + let (block, receipts) = futures::try_join!(block, receipts)?; + + Ok(block.zip(receipts)) } /// Returns all logs in the given _inclusive_ range that match the filter @@ -312,20 +327,23 @@ where // loop over the range of new blocks and check logs if the filter matches the log's bloom // filter - for block_number in from_block..=to_block { - if let Some(block) = self.block_by_number(block_number).await? { + for (from, to) in + BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range) + { + let headers = self.client.headers_range(from..=to)?; + + for header in headers { // only if filter matches - if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) && - FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter) + if FilteredParams::matches_address(header.logs_bloom, &address_filter) && + FilteredParams::matches_topics(header.logs_bloom, &topics_filter) { - // get receipts for the block - if let Some(receipts) = self.eth_cache.get_receipts(block.hash).await? { + if let Some((block, receipts)) = self.block_and_receipts(header.number).await? { let block_hash = block.hash; logs_utils::append_matching_block_logs( &mut all_logs, &filter_params, - (block_number, block_hash).into(), + (block.number, block_hash).into(), block.body.into_iter().map(|tx| tx.hash()).zip(receipts), false, ); @@ -401,3 +419,59 @@ impl From for FilterError { FilterError::EthAPIError(err.into()) } } + +/// An iterator that yields _inclusive_ block ranges of a given step size +#[derive(Debug)] +struct BlockRangeInclusiveIter { + iter: StepBy>, + step: u64, + end: u64, +} + +impl BlockRangeInclusiveIter { + fn new(range: RangeInclusive, step: u64) -> Self { + Self { end: *range.end(), iter: range.step_by(step as usize + 1), step } + } +} + +impl Iterator for BlockRangeInclusiveIter { + type Item = (u64, u64); + + fn next(&mut self) -> Option { + let start = self.iter.next()?; + let end = (start + self.step).min(self.end); + if start > end { + return None + } + Some((start, end)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::{thread_rng, Rng}; + + #[test] + fn test_block_range_iter() { + for _ in 0..100 { + let mut rng = thread_rng(); + let start = rng.gen::() as u64; + let end = start.saturating_add(rng.gen::() as u64); + let step = rng.gen::() as u64; + let range = start..=end; + let mut iter = BlockRangeInclusiveIter::new(range.clone(), step); + let (from, mut end) = iter.next().unwrap(); + assert_eq!(from, start); + assert_eq!(end, (from + step).min(*range.end())); + + for (next_from, next_end) in iter { + // ensure range starts with previous end + 1 + assert_eq!(next_from, end + 1); + end = next_end; + } + + assert_eq!(end, *range.end()); + } + } +}