perf: read header range first (#2635)

This commit is contained in:
Matthias Seitz
2023-05-11 19:52:48 +02:00
committed by GitHub
parent 5e09a686db
commit 8c0979144c

View File

@@ -11,16 +11,19 @@ use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_primitives::{
filter::{Filter, FilterBlockOption, FilteredParams},
SealedBlock,
Receipt, SealedBlock,
};
use reth_provider::{BlockIdProvider, BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{FilterChanges, FilterId, Log};
use reth_transaction_pool::TransactionPool;
use std::{collections::HashMap, sync::Arc, time::Instant};
use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant};
use tokio::sync::Mutex;
use tracing::trace;
/// The maximum number of headers we read at once when handling a range filter.
const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
/// `Eth` filter RPC implementation.
#[derive(Debug, Clone)]
pub struct EthFilter<Client, Pool> {
@@ -48,6 +51,7 @@ impl<Client, Pool> EthFilter<Client, Pool> {
id_provider: Arc::new(EthSubscriptionIdProvider::default()),
max_logs_per_response,
eth_cache,
max_headers_range: MAX_HEADERS_RANGE,
};
Self { inner: Arc::new(inner) }
}
@@ -209,6 +213,8 @@ struct EthFilterInner<Client, Pool> {
max_logs_per_response: usize,
/// The async cache frontend for eth related data
eth_cache: EthStateCache,
/// maximum number of headers to read at once for range filter
max_headers_range: u64,
}
impl<Client, Pool> EthFilterInner<Client, Pool>
@@ -280,12 +286,21 @@ where
Ok(id)
}
/// Returns the block with the given block number if it exists.
async fn block_by_number(&self, num: u64) -> EthResult<Option<SealedBlock>> {
match self.client.block_hash(num)? {
Some(hash) => Ok(self.eth_cache.get_sealed_block(hash).await?),
None => Ok(None),
}
async fn block_and_receipts(
&self,
block_number: u64,
) -> EthResult<Option<(SealedBlock, Vec<Receipt>)>> {
let block_hash = match self.client.block_hash(block_number)? {
Some(hash) => hash,
None => return Ok(None),
};
let block = self.eth_cache.get_sealed_block(block_hash);
let receipts = self.eth_cache.get_receipts(block_hash);
let (block, receipts) = futures::try_join!(block, receipts)?;
Ok(block.zip(receipts))
}
/// Returns all logs in the given _inclusive_ range that match the filter
@@ -312,20 +327,23 @@ where
// loop over the range of new blocks and check logs if the filter matches the log's bloom
// filter
for block_number in from_block..=to_block {
if let Some(block) = self.block_by_number(block_number).await? {
for (from, to) in
BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
{
let headers = self.client.headers_range(from..=to)?;
for header in headers {
// only if filter matches
if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) &&
FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter)
if FilteredParams::matches_address(header.logs_bloom, &address_filter) &&
FilteredParams::matches_topics(header.logs_bloom, &topics_filter)
{
// get receipts for the block
if let Some(receipts) = self.eth_cache.get_receipts(block.hash).await? {
if let Some((block, receipts)) = self.block_and_receipts(header.number).await? {
let block_hash = block.hash;
logs_utils::append_matching_block_logs(
&mut all_logs,
&filter_params,
(block_number, block_hash).into(),
(block.number, block_hash).into(),
block.body.into_iter().map(|tx| tx.hash()).zip(receipts),
false,
);
@@ -401,3 +419,59 @@ impl From<reth_interfaces::Error> for FilterError {
FilterError::EthAPIError(err.into())
}
}
/// An iterator that yields _inclusive_ block ranges of a given step size
#[derive(Debug)]
struct BlockRangeInclusiveIter {
iter: StepBy<RangeInclusive<u64>>,
step: u64,
end: u64,
}
impl BlockRangeInclusiveIter {
fn new(range: RangeInclusive<u64>, step: u64) -> Self {
Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
}
}
impl Iterator for BlockRangeInclusiveIter {
type Item = (u64, u64);
fn next(&mut self) -> Option<Self::Item> {
let start = self.iter.next()?;
let end = (start + self.step).min(self.end);
if start > end {
return None
}
Some((start, end))
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::{thread_rng, Rng};
#[test]
fn test_block_range_iter() {
for _ in 0..100 {
let mut rng = thread_rng();
let start = rng.gen::<u32>() as u64;
let end = start.saturating_add(rng.gen::<u32>() as u64);
let step = rng.gen::<u16>() as u64;
let range = start..=end;
let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
let (from, mut end) = iter.next().unwrap();
assert_eq!(from, start);
assert_eq!(end, (from + step).min(*range.end()));
for (next_from, next_end) in iter {
// ensure range starts with previous end + 1
assert_eq!(next_from, end + 1);
end = next_end;
}
assert_eq!(end, *range.end());
}
}
}