From f148cb31990bf69ebfac353f21f0468b86254b82 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Fri, 11 Jul 2025 11:21:08 +0200 Subject: [PATCH] feat(rpc): specialise contiguous receipt queries for logs (#16441) Co-authored-by: Matthias Seitz --- Cargo.lock | 1 + crates/rpc/rpc/Cargo.toml | 1 + crates/rpc/rpc/src/eth/filter.rs | 884 +++++++++++++++++- .../storage/provider/src/test_utils/mock.rs | 18 +- 4 files changed, 853 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 430f63f076..ecc7ddd190 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9839,6 +9839,7 @@ dependencies = [ "reth-chain-state", "reth-chainspec", "reth-consensus", + "reth-db-api", "reth-engine-primitives", "reth-errors", "reth-ethereum-primitives", diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 389502a2c7..4e6ca6ae24 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -96,6 +96,7 @@ reth-evm-ethereum.workspace = true reth-testing-utils.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } +reth-db-api.workspace = true alloy-consensus.workspace = true rand.workspace = true diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 59d07a06f8..4eecdee649 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -1,7 +1,7 @@ //! `eth_` `Filter` RPC handler implementation use alloy_consensus::BlockHeader; -use alloy_primitives::TxHash; +use alloy_primitives::{Sealable, TxHash}; use alloy_rpc_types_eth::{ BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log, PendingTransactionFilterKind, @@ -10,7 +10,7 @@ use async_trait::async_trait; use futures::future::TryFutureExt; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_errors::ProviderError; -use reth_primitives_traits::NodePrimitives; +use reth_primitives_traits::{NodePrimitives, SealedHeader}; use reth_rpc_eth_api::{ EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert, RpcNodeCore, RpcNodeCoreExt, RpcTransaction, @@ -22,15 +22,15 @@ use reth_rpc_eth_types::{ use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult}; use reth_storage_api::{ BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock, - ProviderReceipt, TransactionsProvider, + ProviderReceipt, ReceiptProvider, TransactionsProvider, }; use reth_tasks::TaskSpawner; use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool}; use std::{ - collections::HashMap, + collections::{HashMap, VecDeque}, fmt, future::Future, - iter::StepBy, + iter::{Peekable, StepBy}, ops::RangeInclusive, sync::Arc, time::{Duration, Instant}, @@ -39,7 +39,7 @@ use tokio::{ sync::{mpsc::Receiver, oneshot, Mutex}, time::MissedTickBehavior, }; -use tracing::{error, trace}; +use tracing::{debug, error, trace}; impl EngineEthFilter for EthFilter where @@ -56,6 +56,18 @@ where } } +/// Threshold for deciding between cached and range mode processing +const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250; + +/// Threshold for bloom filter matches that triggers reduced caching +const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20; + +/// Threshold for bloom filter matches that triggers moderately reduced caching +const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10; + +/// Minimum block count to apply bloom filter match adjustments +const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100; + /// The maximum number of headers we read at once when handling a range filter. const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb @@ -562,63 +574,93 @@ where /// Returns an error if: /// - underlying database error async fn get_logs_in_block_range_inner( - &self, + self: Arc, filter: &Filter, from_block: u64, to_block: u64, limits: QueryLimits, ) -> Result, EthFilterError> { let mut all_logs = Vec::new(); + let mut matching_headers = Vec::new(); - // loop over the range of new blocks and check logs if the filter matches the log's bloom - // filter + // get current chain tip to determine processing mode + let chain_tip = self.provider().best_block_number()?; + + // first collect all headers that match the bloom filter for cached mode decision for (from, to) in BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range) { let headers = self.provider().headers_range(from..=to)?; - for (idx, header) in headers - .iter() - .enumerate() - .filter(|(_, header)| filter.matches_bloom(header.logs_bloom())) - { - // these are consecutive headers, so we can use the parent hash of the next - // block to get the current header's hash - let block_hash = match headers.get(idx + 1) { - Some(child) => child.parent_hash(), - None => self - .provider() - .block_hash(header.number())? - .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?, + + let mut headers_iter = headers.into_iter().peekable(); + + while let Some(header) = headers_iter.next() { + if !filter.matches_bloom(header.logs_bloom()) { + continue + } + + let current_number = header.number(); + + let block_hash = match headers_iter.peek() { + Some(next_header) if next_header.number() == current_number + 1 => { + // Headers are consecutive, use the more efficient parent_hash + next_header.parent_hash() + } + _ => { + // Headers not consecutive or last header, calculate hash + header.hash_slow() + } }; - let num_hash = BlockNumHash::new(header.number(), block_hash); - if let Some((receipts, maybe_block)) = - self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await? - { - append_matching_block_logs( - &mut all_logs, - maybe_block - .map(ProviderOrBlock::Block) - .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())), - filter, - num_hash, - &receipts, - false, - header.timestamp(), - )?; + matching_headers.push(SealedHeader::new(header, block_hash)); + } + } - // size check but only if range is multiple blocks, so we always return all - // logs of a single block - let is_multi_block_range = from_block != to_block; - if let Some(max_logs_per_response) = limits.max_logs_per_response { - if is_multi_block_range && all_logs.len() > max_logs_per_response { - return Err(EthFilterError::QueryExceedsMaxResults { - max_logs: max_logs_per_response, - from_block, - to_block: num_hash.number.saturating_sub(1), - }); - } - } + // initialize the appropriate range mode based on collected headers + let mut range_mode = RangeMode::new( + self.clone(), + matching_headers, + from_block, + to_block, + self.max_headers_range, + chain_tip, + ); + + // iterate through the range mode to get receipts and blocks + while let Some(ReceiptBlockResult { receipts, recovered_block, header }) = + range_mode.next().await? + { + let num_hash = header.num_hash(); + append_matching_block_logs( + &mut all_logs, + recovered_block + .map(ProviderOrBlock::Block) + .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())), + filter, + num_hash, + &receipts, + false, + header.timestamp(), + )?; + + // size check but only if range is multiple blocks, so we always return all + // logs of a single block + let is_multi_block_range = from_block != to_block; + if let Some(max_logs_per_response) = limits.max_logs_per_response { + if is_multi_block_range && all_logs.len() > max_logs_per_response { + debug!( + target: "rpc::eth::filter", + logs_found = all_logs.len(), + max_logs_per_response, + from_block, + to_block = num_hash.number.saturating_sub(1), + "Query exceeded max logs per response limit" + ); + return Err(EthFilterError::QueryExceedsMaxResults { + max_logs: max_logs_per_response, + from_block, + to_block: num_hash.number.saturating_sub(1), + }); } } } @@ -841,11 +883,218 @@ impl From for EthFilterError { } } +/// Helper type for the common pattern of returning receipts, block and the original header that is +/// a match for the filter. +struct ReceiptBlockResult

+where + P: ReceiptProvider + BlockReader, +{ + /// We always need the entire receipts for the matching block. + receipts: Arc>>, + /// Block can be optional and we can fetch it lazily when needed. + recovered_block: Option>>>, + /// The header of the block. + header: SealedHeader<

::Header>, +} + +/// Represents different modes for processing block ranges when filtering logs +enum RangeMode< + Eth: RpcNodeCoreExt + EthApiTypes + 'static, +> { + /// Use cache-based processing for recent blocks + Cached(CachedMode), + /// Use range-based processing for older blocks + Range(RangeBlockMode), +} + +impl< + Eth: RpcNodeCoreExt + EthApiTypes + 'static, + > RangeMode +{ + /// Creates a new `RangeMode`. + fn new( + filter_inner: Arc>, + sealed_headers: Vec::Header>>, + from_block: u64, + to_block: u64, + max_headers_range: u64, + chain_tip: u64, + ) -> Self { + let block_count = to_block - from_block + 1; + let distance_from_tip = chain_tip.saturating_sub(to_block); + + // Determine if we should use cached mode based on range characteristics + let use_cached_mode = + Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip); + + if use_cached_mode && !sealed_headers.is_empty() { + Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() }) + } else { + Self::Range(RangeBlockMode { + filter_inner, + iter: sealed_headers.into_iter().peekable(), + next: VecDeque::new(), + max_range: max_headers_range as usize, + }) + } + } + + /// Determines whether to use cached mode based on bloom filter matches and range size + const fn should_use_cached_mode( + headers: &[SealedHeader<::Header>], + block_count: u64, + distance_from_tip: u64, + ) -> bool { + // Headers are already filtered by bloom, so count equals length + let bloom_matches = headers.len(); + + // Calculate adjusted threshold based on bloom matches + let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches); + + block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold + } + + /// Calculates the adjusted cache threshold based on bloom filter matches + const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 { + // Only apply adjustments for larger ranges + if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS { + return CACHED_MODE_BLOCK_THRESHOLD; + } + + match bloom_matches { + n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2, + n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4, + _ => CACHED_MODE_BLOCK_THRESHOLD, + } + } + + /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple. + async fn next(&mut self) -> Result>, EthFilterError> { + match self { + Self::Cached(cached) => cached.next().await, + Self::Range(range) => range.next().await, + } + } +} + +/// Mode for processing blocks using cache optimization for recent blocks +struct CachedMode< + Eth: RpcNodeCoreExt + EthApiTypes + 'static, +> { + filter_inner: Arc>, + headers_iter: std::vec::IntoIter::Header>>, +} + +impl< + Eth: RpcNodeCoreExt + EthApiTypes + 'static, + > CachedMode +{ + async fn next(&mut self) -> Result>, EthFilterError> { + for header in self.headers_iter.by_ref() { + // Use get_receipts_and_maybe_block which has automatic fallback to provider + if let Some((receipts, maybe_block)) = + self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await? + { + return Ok(Some(ReceiptBlockResult { + receipts, + recovered_block: maybe_block, + header, + })); + } + } + + Ok(None) // No more headers + } +} + +/// Mode for processing blocks using range queries for older blocks +struct RangeBlockMode< + Eth: RpcNodeCoreExt + EthApiTypes + 'static, +> { + filter_inner: Arc>, + iter: Peekable::Header>>>, + next: VecDeque>, + max_range: usize, +} + +impl< + Eth: RpcNodeCoreExt + EthApiTypes + 'static, + > RangeBlockMode +{ + async fn next(&mut self) -> Result>, EthFilterError> { + if let Some(result) = self.next.pop_front() { + return Ok(Some(result)); + } + + let Some(next_header) = self.iter.next() else { + return Ok(None); + }; + + let mut range_headers = Vec::with_capacity(self.max_range); + range_headers.push(next_header); + + // Collect consecutive blocks up to max_range size + while range_headers.len() < self.max_range { + let Some(peeked) = self.iter.peek() else { break }; + let Some(last_header) = range_headers.last() else { break }; + + let expected_next = last_header.header().number() + 1; + if peeked.header().number() != expected_next { + break; // Non-consecutive block, stop here + } + + let Some(next_header) = self.iter.next() else { break }; + range_headers.push(next_header); + } + + // Process each header individually to avoid queuing for all receipts + for header in range_headers { + // First check if already cached to avoid unnecessary provider calls + let (maybe_block, maybe_receipts) = self + .filter_inner + .eth_cache() + .maybe_cached_block_and_receipts(header.hash()) + .await?; + + let receipts = match maybe_receipts { + Some(receipts) => receipts, + None => { + // Not cached - fetch directly from provider without queuing + match self.filter_inner.provider().receipts_by_block(header.hash().into())? { + Some(receipts) => Arc::new(receipts), + None => continue, // No receipts found + } + } + }; + + if !receipts.is_empty() { + self.next.push_back(ReceiptBlockResult { + receipts, + recovered_block: maybe_block, + header, + }); + } + } + + Ok(self.next.pop_front()) + } +} + #[cfg(test)] mod tests { use super::*; + use crate::{eth::EthApi, EthApiBuilder}; + use alloy_primitives::FixedBytes; use rand::Rng; + use reth_chainspec::ChainSpecProvider; + use reth_ethereum_primitives::TxType; + use reth_evm_ethereum::EthEvmConfig; + use reth_network_api::noop::NoopNetwork; + use reth_provider::test_utils::MockEthProvider; + use reth_tasks::TokioTaskExecutor; use reth_testing_utils::generators; + use reth_transaction_pool::test_utils::{testing_pool, TestPool}; + use std::{collections::VecDeque, sync::Arc}; #[test] fn test_block_range_iter() { @@ -868,4 +1117,541 @@ mod tests { assert_eq!(end, *range.end()); } + + // Helper function to create a test EthApi instance + fn build_test_eth_api( + provider: MockEthProvider, + ) -> EthApi { + EthApiBuilder::new( + provider.clone(), + testing_pool(), + NoopNetwork::default(), + EthEvmConfig::new(provider.chain_spec()), + ) + .build() + } + + #[tokio::test] + async fn test_range_block_mode_empty_range() { + let provider = MockEthProvider::default(); + let eth_api = build_test_eth_api(provider); + + let eth_filter = super::EthFilter::new( + eth_api, + EthFilterConfig::default(), + Box::new(TokioTaskExecutor::default()), + ); + let filter_inner = eth_filter.inner; + + let headers = vec![]; + let max_range = 100; + + let mut range_mode = RangeBlockMode { + filter_inner, + iter: headers.into_iter().peekable(), + next: VecDeque::new(), + max_range, + }; + + let result = range_mode.next().await; + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); + } + + #[tokio::test] + async fn test_range_block_mode_queued_results_priority() { + let provider = MockEthProvider::default(); + let eth_api = build_test_eth_api(provider); + + let eth_filter = super::EthFilter::new( + eth_api, + EthFilterConfig::default(), + Box::new(TokioTaskExecutor::default()), + ); + let filter_inner = eth_filter.inner; + + let headers = vec![ + SealedHeader::new( + alloy_consensus::Header { number: 100, ..Default::default() }, + FixedBytes::random(), + ), + SealedHeader::new( + alloy_consensus::Header { number: 101, ..Default::default() }, + FixedBytes::random(), + ), + ]; + + // create specific mock results to test ordering + let expected_block_hash_1 = FixedBytes::from([1u8; 32]); + let expected_block_hash_2 = FixedBytes::from([2u8; 32]); + + // create mock receipts to test receipt handling + let mock_receipt_1 = reth_ethereum_primitives::Receipt { + tx_type: TxType::Legacy, + cumulative_gas_used: 100_000, + logs: vec![], + success: true, + }; + let mock_receipt_2 = reth_ethereum_primitives::Receipt { + tx_type: TxType::Eip1559, + cumulative_gas_used: 200_000, + logs: vec![], + success: true, + }; + let mock_receipt_3 = reth_ethereum_primitives::Receipt { + tx_type: TxType::Eip2930, + cumulative_gas_used: 150_000, + logs: vec![], + success: false, // Different success status + }; + + let mock_result_1 = ReceiptBlockResult { + receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]), + recovered_block: None, + header: SealedHeader::new( + alloy_consensus::Header { number: 42, ..Default::default() }, + expected_block_hash_1, + ), + }; + + let mock_result_2 = ReceiptBlockResult { + receipts: Arc::new(vec![mock_receipt_3.clone()]), + recovered_block: None, + header: SealedHeader::new( + alloy_consensus::Header { number: 43, ..Default::default() }, + expected_block_hash_2, + ), + }; + + let mut range_mode = RangeBlockMode { + filter_inner, + iter: headers.into_iter().peekable(), + next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results + max_range: 100, + }; + + // first call should return the first queued result (FIFO order) + let result1 = range_mode.next().await; + assert!(result1.is_ok()); + let receipt_result1 = result1.unwrap().unwrap(); + assert_eq!(receipt_result1.header.hash(), expected_block_hash_1); + assert_eq!(receipt_result1.header.number, 42); + + // verify receipts + assert_eq!(receipt_result1.receipts.len(), 2); + assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type); + assert_eq!( + receipt_result1.receipts[0].cumulative_gas_used, + mock_receipt_1.cumulative_gas_used + ); + assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success); + assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type); + assert_eq!( + receipt_result1.receipts[1].cumulative_gas_used, + mock_receipt_2.cumulative_gas_used + ); + assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success); + + // second call should return the second queued result + let result2 = range_mode.next().await; + assert!(result2.is_ok()); + let receipt_result2 = result2.unwrap().unwrap(); + assert_eq!(receipt_result2.header.hash(), expected_block_hash_2); + assert_eq!(receipt_result2.header.number, 43); + + // verify receipts + assert_eq!(receipt_result2.receipts.len(), 1); + assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type); + assert_eq!( + receipt_result2.receipts[0].cumulative_gas_used, + mock_receipt_3.cumulative_gas_used + ); + assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success); + + // queue should now be empty + assert!(range_mode.next.is_empty()); + + let result3 = range_mode.next().await; + assert!(result3.is_ok()); + } + + #[tokio::test] + async fn test_range_block_mode_single_block_no_receipts() { + let provider = MockEthProvider::default(); + let eth_api = build_test_eth_api(provider); + + let eth_filter = super::EthFilter::new( + eth_api, + EthFilterConfig::default(), + Box::new(TokioTaskExecutor::default()), + ); + let filter_inner = eth_filter.inner; + + let headers = vec![SealedHeader::new( + alloy_consensus::Header { number: 100, ..Default::default() }, + FixedBytes::random(), + )]; + + let mut range_mode = RangeBlockMode { + filter_inner, + iter: headers.into_iter().peekable(), + next: VecDeque::new(), + max_range: 100, + }; + + let result = range_mode.next().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_range_block_mode_provider_receipts() { + let provider = MockEthProvider::default(); + + let header_1 = alloy_consensus::Header { number: 100, ..Default::default() }; + let header_2 = alloy_consensus::Header { number: 101, ..Default::default() }; + let header_3 = alloy_consensus::Header { number: 102, ..Default::default() }; + + let block_hash_1 = FixedBytes::random(); + let block_hash_2 = FixedBytes::random(); + let block_hash_3 = FixedBytes::random(); + + provider.add_header(block_hash_1, header_1.clone()); + provider.add_header(block_hash_2, header_2.clone()); + provider.add_header(block_hash_3, header_3.clone()); + + // create mock receipts to test provider fetching with mock logs + let mock_log = alloy_primitives::Log { + address: alloy_primitives::Address::ZERO, + data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()), + }; + + let receipt_100_1 = reth_ethereum_primitives::Receipt { + tx_type: TxType::Legacy, + cumulative_gas_used: 21_000, + logs: vec![mock_log.clone()], + success: true, + }; + let receipt_100_2 = reth_ethereum_primitives::Receipt { + tx_type: TxType::Eip1559, + cumulative_gas_used: 42_000, + logs: vec![mock_log.clone()], + success: true, + }; + let receipt_101_1 = reth_ethereum_primitives::Receipt { + tx_type: TxType::Eip2930, + cumulative_gas_used: 30_000, + logs: vec![mock_log.clone()], + success: false, + }; + + provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]); + provider.add_receipts(101, vec![receipt_101_1.clone()]); + + let eth_api = build_test_eth_api(provider); + + let eth_filter = super::EthFilter::new( + eth_api, + EthFilterConfig::default(), + Box::new(TokioTaskExecutor::default()), + ); + let filter_inner = eth_filter.inner; + + let headers = vec![ + SealedHeader::new(header_1, block_hash_1), + SealedHeader::new(header_2, block_hash_2), + SealedHeader::new(header_3, block_hash_3), + ]; + + let mut range_mode = RangeBlockMode { + filter_inner, + iter: headers.into_iter().peekable(), + next: VecDeque::new(), + max_range: 3, // include the 3 blocks in the first queried results + }; + + // first call should fetch receipts from provider and return first block with receipts + let result = range_mode.next().await; + assert!(result.is_ok()); + let receipt_result = result.unwrap().unwrap(); + + assert_eq!(receipt_result.header.hash(), block_hash_1); + assert_eq!(receipt_result.header.number, 100); + assert_eq!(receipt_result.receipts.len(), 2); + + // verify receipts + assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type); + assert_eq!( + receipt_result.receipts[0].cumulative_gas_used, + receipt_100_1.cumulative_gas_used + ); + assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success); + + assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type); + assert_eq!( + receipt_result.receipts[1].cumulative_gas_used, + receipt_100_2.cumulative_gas_used + ); + assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success); + + // second call should return the second block with receipts + let result2 = range_mode.next().await; + assert!(result2.is_ok()); + let receipt_result2 = result2.unwrap().unwrap(); + + assert_eq!(receipt_result2.header.hash(), block_hash_2); + assert_eq!(receipt_result2.header.number, 101); + assert_eq!(receipt_result2.receipts.len(), 1); + + // verify receipts + assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type); + assert_eq!( + receipt_result2.receipts[0].cumulative_gas_used, + receipt_101_1.cumulative_gas_used + ); + assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success); + + // third call should return None since no more blocks with receipts + let result3 = range_mode.next().await; + assert!(result3.is_ok()); + assert!(result3.unwrap().is_none()); + } + + #[tokio::test] + async fn test_range_block_mode_iterator_exhaustion() { + let provider = MockEthProvider::default(); + let eth_api = build_test_eth_api(provider); + + let eth_filter = super::EthFilter::new( + eth_api, + EthFilterConfig::default(), + Box::new(TokioTaskExecutor::default()), + ); + let filter_inner = eth_filter.inner; + + let headers = vec![ + SealedHeader::new( + alloy_consensus::Header { number: 100, ..Default::default() }, + FixedBytes::random(), + ), + SealedHeader::new( + alloy_consensus::Header { number: 101, ..Default::default() }, + FixedBytes::random(), + ), + ]; + + let mut range_mode = RangeBlockMode { + filter_inner, + iter: headers.into_iter().peekable(), + next: VecDeque::new(), + max_range: 1, + }; + + let result1 = range_mode.next().await; + assert!(result1.is_ok()); + + assert!(range_mode.iter.peek().is_some()); + + let result2 = range_mode.next().await; + assert!(result2.is_ok()); + + // now iterator should be exhausted + assert!(range_mode.iter.peek().is_none()); + + // further calls should return None + let result3 = range_mode.next().await; + assert!(result3.is_ok()); + assert!(result3.unwrap().is_none()); + } + + #[tokio::test] + async fn test_cached_mode_with_mock_receipts() { + // create test data + let test_hash = FixedBytes::from([42u8; 32]); + let test_block_number = 100u64; + let test_header = SealedHeader::new( + alloy_consensus::Header { + number: test_block_number, + gas_used: 50_000, + ..Default::default() + }, + test_hash, + ); + + // add a mock receipt to the provider with a mock log + let mock_log = alloy_primitives::Log { + address: alloy_primitives::Address::ZERO, + data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()), + }; + + let mock_receipt = reth_ethereum_primitives::Receipt { + tx_type: TxType::Legacy, + cumulative_gas_used: 21_000, + logs: vec![mock_log], + success: true, + }; + + let provider = MockEthProvider::default(); + provider.add_header(test_hash, test_header.header().clone()); + provider.add_receipts(test_block_number, vec![mock_receipt.clone()]); + + let eth_api = build_test_eth_api(provider); + let eth_filter = super::EthFilter::new( + eth_api, + EthFilterConfig::default(), + Box::new(TokioTaskExecutor::default()), + ); + let filter_inner = eth_filter.inner; + + let headers = vec![test_header.clone()]; + + let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() }; + + // should find the receipt from provider fallback (cache will be empty) + let result = cached_mode.next().await.expect("next should succeed"); + let receipt_block_result = result.expect("should have receipt result"); + assert_eq!(receipt_block_result.header.hash(), test_hash); + assert_eq!(receipt_block_result.header.number, test_block_number); + assert_eq!(receipt_block_result.receipts.len(), 1); + assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type); + assert_eq!( + receipt_block_result.receipts[0].cumulative_gas_used, + mock_receipt.cumulative_gas_used + ); + assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success); + + // iterator should be exhausted + let result2 = cached_mode.next().await; + assert!(result2.is_ok()); + assert!(result2.unwrap().is_none()); + } + + #[tokio::test] + async fn test_cached_mode_empty_headers() { + let provider = MockEthProvider::default(); + let eth_api = build_test_eth_api(provider); + + let eth_filter = super::EthFilter::new( + eth_api, + EthFilterConfig::default(), + Box::new(TokioTaskExecutor::default()), + ); + let filter_inner = eth_filter.inner; + + let headers: Vec> = vec![]; + + let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() }; + + // should immediately return None for empty headers + let result = cached_mode.next().await.expect("next should succeed"); + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_non_consecutive_headers_after_bloom_filter() { + let provider = MockEthProvider::default(); + + // Create 4 headers where only blocks 100 and 102 will match bloom filter + let mut expected_hashes = vec![]; + let mut prev_hash = alloy_primitives::B256::default(); + + // Create a transaction for blocks that will have receipts + use alloy_consensus::TxLegacy; + use reth_ethereum_primitives::{TransactionSigned, TxType}; + + let tx_inner = TxLegacy { + chain_id: Some(1), + nonce: 0, + gas_price: 21_000, + gas_limit: 21_000, + to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO), + value: alloy_primitives::U256::ZERO, + input: alloy_primitives::Bytes::new(), + }; + let signature = alloy_primitives::Signature::test_signature(); + let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature); + + for i in 100u64..=103 { + let header = alloy_consensus::Header { + number: i, + parent_hash: prev_hash, + // Set bloom to match filter only for blocks 100 and 102 + logs_bloom: if i == 100 || i == 102 { + alloy_primitives::Bloom::from([1u8; 256]) + } else { + alloy_primitives::Bloom::default() + }, + ..Default::default() + }; + + let hash = header.hash_slow(); + expected_hashes.push(hash); + prev_hash = hash; + + // Add transaction to blocks that will have receipts (100 and 102) + let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] }; + + let block = reth_ethereum_primitives::Block { + header, + body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() }, + }; + provider.add_block(hash, block); + } + + // Add receipts with logs only to blocks that match bloom + let mock_log = alloy_primitives::Log { + address: alloy_primitives::Address::ZERO, + data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()), + }; + + let receipt = reth_ethereum_primitives::Receipt { + tx_type: TxType::Legacy, + cumulative_gas_used: 21_000, + logs: vec![mock_log], + success: true, + }; + + provider.add_receipts(100, vec![receipt.clone()]); + provider.add_receipts(101, vec![]); + provider.add_receipts(102, vec![receipt.clone()]); + provider.add_receipts(103, vec![]); + + // Add block body indices for each block so receipts can be fetched + use reth_db_api::models::StoredBlockBodyIndices; + provider + .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 }); + provider + .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 }); + provider + .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 }); + provider + .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 }); + + let eth_api = build_test_eth_api(provider); + let eth_filter = EthFilter::new( + eth_api, + EthFilterConfig::default(), + Box::new(TokioTaskExecutor::default()), + ); + + // Use default filter which will match any non-empty bloom + let filter = Filter::default(); + + // Get logs in the range - this will trigger the bloom filtering + let logs = eth_filter + .inner + .clone() + .get_logs_in_block_range(filter, 100, 103, QueryLimits::default()) + .await + .expect("should succeed"); + + // We should get logs from blocks 100 and 102 only (bloom filtered) + assert_eq!(logs.len(), 2); + + assert_eq!(logs[0].block_number, Some(100)); + assert_eq!(logs[1].block_number, Some(102)); + + // Each block hash should be the hash of its own header, not derived from any other header + assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100 + assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102 + } } diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 889712259c..68f8c38e59 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -64,6 +64,8 @@ pub struct MockEthProvider< pub chain_spec: Arc, /// Local state roots pub state_roots: Arc>>, + /// Local block body indices store + pub block_body_indices: Arc>>, tx: TxMock, prune_modes: Arc, } @@ -80,6 +82,7 @@ where accounts: self.accounts.clone(), chain_spec: self.chain_spec.clone(), state_roots: self.state_roots.clone(), + block_body_indices: self.block_body_indices.clone(), tx: self.tx.clone(), prune_modes: self.prune_modes.clone(), } @@ -96,6 +99,7 @@ impl MockEthProvider { accounts: Default::default(), chain_spec: Arc::new(reth_chainspec::ChainSpecBuilder::mainnet().build()), state_roots: Default::default(), + block_body_indices: Default::default(), tx: Default::default(), prune_modes: Default::default(), } @@ -156,6 +160,15 @@ impl MockEthProvider MockEthProvider StatePr impl BlockBodyIndicesProvider for MockEthProvider { - fn block_body_indices(&self, _num: u64) -> ProviderResult> { - Ok(None) + fn block_body_indices(&self, num: u64) -> ProviderResult> { + Ok(self.block_body_indices.lock().get(&num).copied()) } fn block_body_indices_range( &self,