feat(rpc): specialise contiguous receipt queries for logs (#16441)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Federico Gimenez
2025-07-11 11:21:08 +02:00
committed by GitHub
parent 06a7d05649
commit f148cb3199
4 changed files with 853 additions and 51 deletions

1
Cargo.lock generated
View File

@@ -9839,6 +9839,7 @@ dependencies = [
"reth-chain-state",
"reth-chainspec",
"reth-consensus",
"reth-db-api",
"reth-engine-primitives",
"reth-errors",
"reth-ethereum-primitives",

View File

@@ -96,6 +96,7 @@ reth-evm-ethereum.workspace = true
reth-testing-utils.workspace = true
reth-transaction-pool = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-db-api.workspace = true
alloy-consensus.workspace = true
rand.workspace = true

View File

@@ -1,7 +1,7 @@
//! `eth_` `Filter` RPC handler implementation
use alloy_consensus::BlockHeader;
use alloy_primitives::TxHash;
use alloy_primitives::{Sealable, TxHash};
use alloy_rpc_types_eth::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
PendingTransactionFilterKind,
@@ -10,7 +10,7 @@ use async_trait::async_trait;
use futures::future::TryFutureExt;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_errors::ProviderError;
use reth_primitives_traits::NodePrimitives;
use reth_primitives_traits::{NodePrimitives, SealedHeader};
use reth_rpc_eth_api::{
EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
RpcNodeCore, RpcNodeCoreExt, RpcTransaction,
@@ -22,15 +22,15 @@ use reth_rpc_eth_types::{
use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
use reth_storage_api::{
BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
ProviderReceipt, TransactionsProvider,
ProviderReceipt, ReceiptProvider, TransactionsProvider,
};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
use std::{
collections::HashMap,
collections::{HashMap, VecDeque},
fmt,
future::Future,
iter::StepBy,
iter::{Peekable, StepBy},
ops::RangeInclusive,
sync::Arc,
time::{Duration, Instant},
@@ -39,7 +39,7 @@ use tokio::{
sync::{mpsc::Receiver, oneshot, Mutex},
time::MissedTickBehavior,
};
use tracing::{error, trace};
use tracing::{debug, error, trace};
impl<Eth> EngineEthFilter for EthFilter<Eth>
where
@@ -56,6 +56,18 @@ where
}
}
/// Threshold for deciding between cached and range mode processing
const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
/// Threshold for bloom filter matches that triggers reduced caching
const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
/// Threshold for bloom filter matches that triggers moderately reduced caching
const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
/// Minimum block count to apply bloom filter match adjustments
const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
/// The maximum number of headers we read at once when handling a range filter.
const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
@@ -562,63 +574,93 @@ where
/// Returns an error if:
/// - underlying database error
async fn get_logs_in_block_range_inner(
&self,
self: Arc<Self>,
filter: &Filter,
from_block: u64,
to_block: u64,
limits: QueryLimits,
) -> Result<Vec<Log>, EthFilterError> {
let mut all_logs = Vec::new();
let mut matching_headers = Vec::new();
// loop over the range of new blocks and check logs if the filter matches the log's bloom
// filter
// get current chain tip to determine processing mode
let chain_tip = self.provider().best_block_number()?;
// first collect all headers that match the bloom filter for cached mode decision
for (from, to) in
BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
{
let headers = self.provider().headers_range(from..=to)?;
for (idx, header) in headers
.iter()
.enumerate()
.filter(|(_, header)| filter.matches_bloom(header.logs_bloom()))
{
// these are consecutive headers, so we can use the parent hash of the next
// block to get the current header's hash
let block_hash = match headers.get(idx + 1) {
Some(child) => child.parent_hash(),
None => self
.provider()
.block_hash(header.number())?
.ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
let mut headers_iter = headers.into_iter().peekable();
while let Some(header) = headers_iter.next() {
if !filter.matches_bloom(header.logs_bloom()) {
continue
}
let current_number = header.number();
let block_hash = match headers_iter.peek() {
Some(next_header) if next_header.number() == current_number + 1 => {
// Headers are consecutive, use the more efficient parent_hash
next_header.parent_hash()
}
_ => {
// Headers not consecutive or last header, calculate hash
header.hash_slow()
}
};
let num_hash = BlockNumHash::new(header.number(), block_hash);
if let Some((receipts, maybe_block)) =
self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await?
{
append_matching_block_logs(
&mut all_logs,
maybe_block
.map(ProviderOrBlock::Block)
.unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
filter,
num_hash,
&receipts,
false,
header.timestamp(),
)?;
matching_headers.push(SealedHeader::new(header, block_hash));
}
}
// size check but only if range is multiple blocks, so we always return all
// logs of a single block
let is_multi_block_range = from_block != to_block;
if let Some(max_logs_per_response) = limits.max_logs_per_response {
if is_multi_block_range && all_logs.len() > max_logs_per_response {
return Err(EthFilterError::QueryExceedsMaxResults {
max_logs: max_logs_per_response,
from_block,
to_block: num_hash.number.saturating_sub(1),
});
}
}
// initialize the appropriate range mode based on collected headers
let mut range_mode = RangeMode::new(
self.clone(),
matching_headers,
from_block,
to_block,
self.max_headers_range,
chain_tip,
);
// iterate through the range mode to get receipts and blocks
while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
range_mode.next().await?
{
let num_hash = header.num_hash();
append_matching_block_logs(
&mut all_logs,
recovered_block
.map(ProviderOrBlock::Block)
.unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
filter,
num_hash,
&receipts,
false,
header.timestamp(),
)?;
// size check but only if range is multiple blocks, so we always return all
// logs of a single block
let is_multi_block_range = from_block != to_block;
if let Some(max_logs_per_response) = limits.max_logs_per_response {
if is_multi_block_range && all_logs.len() > max_logs_per_response {
debug!(
target: "rpc::eth::filter",
logs_found = all_logs.len(),
max_logs_per_response,
from_block,
to_block = num_hash.number.saturating_sub(1),
"Query exceeded max logs per response limit"
);
return Err(EthFilterError::QueryExceedsMaxResults {
max_logs: max_logs_per_response,
from_block,
to_block: num_hash.number.saturating_sub(1),
});
}
}
}
@@ -841,11 +883,218 @@ impl From<ProviderError> for EthFilterError {
}
}
/// Helper type for the common pattern of returning receipts, block and the original header that is
/// a match for the filter.
struct ReceiptBlockResult<P>
where
P: ReceiptProvider + BlockReader,
{
/// We always need the entire receipts for the matching block.
receipts: Arc<Vec<ProviderReceipt<P>>>,
/// Block can be optional and we can fetch it lazily when needed.
recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
/// The header of the block.
header: SealedHeader<<P as HeaderProvider>::Header>,
}
/// Represents different modes for processing block ranges when filtering logs
enum RangeMode<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
> {
/// Use cache-based processing for recent blocks
Cached(CachedMode<Eth>),
/// Use range-based processing for older blocks
Range(RangeBlockMode<Eth>),
}
impl<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
> RangeMode<Eth>
{
/// Creates a new `RangeMode`.
fn new(
filter_inner: Arc<EthFilterInner<Eth>>,
sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
from_block: u64,
to_block: u64,
max_headers_range: u64,
chain_tip: u64,
) -> Self {
let block_count = to_block - from_block + 1;
let distance_from_tip = chain_tip.saturating_sub(to_block);
// Determine if we should use cached mode based on range characteristics
let use_cached_mode =
Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
if use_cached_mode && !sealed_headers.is_empty() {
Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
} else {
Self::Range(RangeBlockMode {
filter_inner,
iter: sealed_headers.into_iter().peekable(),
next: VecDeque::new(),
max_range: max_headers_range as usize,
})
}
}
/// Determines whether to use cached mode based on bloom filter matches and range size
const fn should_use_cached_mode(
headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
block_count: u64,
distance_from_tip: u64,
) -> bool {
// Headers are already filtered by bloom, so count equals length
let bloom_matches = headers.len();
// Calculate adjusted threshold based on bloom matches
let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
}
/// Calculates the adjusted cache threshold based on bloom filter matches
const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
// Only apply adjustments for larger ranges
if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
return CACHED_MODE_BLOCK_THRESHOLD;
}
match bloom_matches {
n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
_ => CACHED_MODE_BLOCK_THRESHOLD,
}
}
/// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
match self {
Self::Cached(cached) => cached.next().await,
Self::Range(range) => range.next().await,
}
}
}
/// Mode for processing blocks using cache optimization for recent blocks
struct CachedMode<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
> {
filter_inner: Arc<EthFilterInner<Eth>>,
headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
}
impl<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
> CachedMode<Eth>
{
async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
for header in self.headers_iter.by_ref() {
// Use get_receipts_and_maybe_block which has automatic fallback to provider
if let Some((receipts, maybe_block)) =
self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
{
return Ok(Some(ReceiptBlockResult {
receipts,
recovered_block: maybe_block,
header,
}));
}
}
Ok(None) // No more headers
}
}
/// Mode for processing blocks using range queries for older blocks
struct RangeBlockMode<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
> {
filter_inner: Arc<EthFilterInner<Eth>>,
iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
max_range: usize,
}
impl<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
> RangeBlockMode<Eth>
{
async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
if let Some(result) = self.next.pop_front() {
return Ok(Some(result));
}
let Some(next_header) = self.iter.next() else {
return Ok(None);
};
let mut range_headers = Vec::with_capacity(self.max_range);
range_headers.push(next_header);
// Collect consecutive blocks up to max_range size
while range_headers.len() < self.max_range {
let Some(peeked) = self.iter.peek() else { break };
let Some(last_header) = range_headers.last() else { break };
let expected_next = last_header.header().number() + 1;
if peeked.header().number() != expected_next {
break; // Non-consecutive block, stop here
}
let Some(next_header) = self.iter.next() else { break };
range_headers.push(next_header);
}
// Process each header individually to avoid queuing for all receipts
for header in range_headers {
// First check if already cached to avoid unnecessary provider calls
let (maybe_block, maybe_receipts) = self
.filter_inner
.eth_cache()
.maybe_cached_block_and_receipts(header.hash())
.await?;
let receipts = match maybe_receipts {
Some(receipts) => receipts,
None => {
// Not cached - fetch directly from provider without queuing
match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
Some(receipts) => Arc::new(receipts),
None => continue, // No receipts found
}
}
};
if !receipts.is_empty() {
self.next.push_back(ReceiptBlockResult {
receipts,
recovered_block: maybe_block,
header,
});
}
}
Ok(self.next.pop_front())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{eth::EthApi, EthApiBuilder};
use alloy_primitives::FixedBytes;
use rand::Rng;
use reth_chainspec::ChainSpecProvider;
use reth_ethereum_primitives::TxType;
use reth_evm_ethereum::EthEvmConfig;
use reth_network_api::noop::NoopNetwork;
use reth_provider::test_utils::MockEthProvider;
use reth_tasks::TokioTaskExecutor;
use reth_testing_utils::generators;
use reth_transaction_pool::test_utils::{testing_pool, TestPool};
use std::{collections::VecDeque, sync::Arc};
#[test]
fn test_block_range_iter() {
@@ -868,4 +1117,541 @@ mod tests {
assert_eq!(end, *range.end());
}
// Helper function to create a test EthApi instance
fn build_test_eth_api(
provider: MockEthProvider,
) -> EthApi<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig> {
EthApiBuilder::new(
provider.clone(),
testing_pool(),
NoopNetwork::default(),
EthEvmConfig::new(provider.chain_spec()),
)
.build()
}
#[tokio::test]
async fn test_range_block_mode_empty_range() {
let provider = MockEthProvider::default();
let eth_api = build_test_eth_api(provider);
let eth_filter = super::EthFilter::new(
eth_api,
EthFilterConfig::default(),
Box::new(TokioTaskExecutor::default()),
);
let filter_inner = eth_filter.inner;
let headers = vec![];
let max_range = 100;
let mut range_mode = RangeBlockMode {
filter_inner,
iter: headers.into_iter().peekable(),
next: VecDeque::new(),
max_range,
};
let result = range_mode.next().await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
}
#[tokio::test]
async fn test_range_block_mode_queued_results_priority() {
let provider = MockEthProvider::default();
let eth_api = build_test_eth_api(provider);
let eth_filter = super::EthFilter::new(
eth_api,
EthFilterConfig::default(),
Box::new(TokioTaskExecutor::default()),
);
let filter_inner = eth_filter.inner;
let headers = vec![
SealedHeader::new(
alloy_consensus::Header { number: 100, ..Default::default() },
FixedBytes::random(),
),
SealedHeader::new(
alloy_consensus::Header { number: 101, ..Default::default() },
FixedBytes::random(),
),
];
// create specific mock results to test ordering
let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
// create mock receipts to test receipt handling
let mock_receipt_1 = reth_ethereum_primitives::Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 100_000,
logs: vec![],
success: true,
};
let mock_receipt_2 = reth_ethereum_primitives::Receipt {
tx_type: TxType::Eip1559,
cumulative_gas_used: 200_000,
logs: vec![],
success: true,
};
let mock_receipt_3 = reth_ethereum_primitives::Receipt {
tx_type: TxType::Eip2930,
cumulative_gas_used: 150_000,
logs: vec![],
success: false, // Different success status
};
let mock_result_1 = ReceiptBlockResult {
receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
recovered_block: None,
header: SealedHeader::new(
alloy_consensus::Header { number: 42, ..Default::default() },
expected_block_hash_1,
),
};
let mock_result_2 = ReceiptBlockResult {
receipts: Arc::new(vec![mock_receipt_3.clone()]),
recovered_block: None,
header: SealedHeader::new(
alloy_consensus::Header { number: 43, ..Default::default() },
expected_block_hash_2,
),
};
let mut range_mode = RangeBlockMode {
filter_inner,
iter: headers.into_iter().peekable(),
next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
max_range: 100,
};
// first call should return the first queued result (FIFO order)
let result1 = range_mode.next().await;
assert!(result1.is_ok());
let receipt_result1 = result1.unwrap().unwrap();
assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
assert_eq!(receipt_result1.header.number, 42);
// verify receipts
assert_eq!(receipt_result1.receipts.len(), 2);
assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
assert_eq!(
receipt_result1.receipts[0].cumulative_gas_used,
mock_receipt_1.cumulative_gas_used
);
assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
assert_eq!(
receipt_result1.receipts[1].cumulative_gas_used,
mock_receipt_2.cumulative_gas_used
);
assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
// second call should return the second queued result
let result2 = range_mode.next().await;
assert!(result2.is_ok());
let receipt_result2 = result2.unwrap().unwrap();
assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
assert_eq!(receipt_result2.header.number, 43);
// verify receipts
assert_eq!(receipt_result2.receipts.len(), 1);
assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
assert_eq!(
receipt_result2.receipts[0].cumulative_gas_used,
mock_receipt_3.cumulative_gas_used
);
assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
// queue should now be empty
assert!(range_mode.next.is_empty());
let result3 = range_mode.next().await;
assert!(result3.is_ok());
}
#[tokio::test]
async fn test_range_block_mode_single_block_no_receipts() {
let provider = MockEthProvider::default();
let eth_api = build_test_eth_api(provider);
let eth_filter = super::EthFilter::new(
eth_api,
EthFilterConfig::default(),
Box::new(TokioTaskExecutor::default()),
);
let filter_inner = eth_filter.inner;
let headers = vec![SealedHeader::new(
alloy_consensus::Header { number: 100, ..Default::default() },
FixedBytes::random(),
)];
let mut range_mode = RangeBlockMode {
filter_inner,
iter: headers.into_iter().peekable(),
next: VecDeque::new(),
max_range: 100,
};
let result = range_mode.next().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_range_block_mode_provider_receipts() {
let provider = MockEthProvider::default();
let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
let block_hash_1 = FixedBytes::random();
let block_hash_2 = FixedBytes::random();
let block_hash_3 = FixedBytes::random();
provider.add_header(block_hash_1, header_1.clone());
provider.add_header(block_hash_2, header_2.clone());
provider.add_header(block_hash_3, header_3.clone());
// create mock receipts to test provider fetching with mock logs
let mock_log = alloy_primitives::Log {
address: alloy_primitives::Address::ZERO,
data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
};
let receipt_100_1 = reth_ethereum_primitives::Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 21_000,
logs: vec![mock_log.clone()],
success: true,
};
let receipt_100_2 = reth_ethereum_primitives::Receipt {
tx_type: TxType::Eip1559,
cumulative_gas_used: 42_000,
logs: vec![mock_log.clone()],
success: true,
};
let receipt_101_1 = reth_ethereum_primitives::Receipt {
tx_type: TxType::Eip2930,
cumulative_gas_used: 30_000,
logs: vec![mock_log.clone()],
success: false,
};
provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
provider.add_receipts(101, vec![receipt_101_1.clone()]);
let eth_api = build_test_eth_api(provider);
let eth_filter = super::EthFilter::new(
eth_api,
EthFilterConfig::default(),
Box::new(TokioTaskExecutor::default()),
);
let filter_inner = eth_filter.inner;
let headers = vec![
SealedHeader::new(header_1, block_hash_1),
SealedHeader::new(header_2, block_hash_2),
SealedHeader::new(header_3, block_hash_3),
];
let mut range_mode = RangeBlockMode {
filter_inner,
iter: headers.into_iter().peekable(),
next: VecDeque::new(),
max_range: 3, // include the 3 blocks in the first queried results
};
// first call should fetch receipts from provider and return first block with receipts
let result = range_mode.next().await;
assert!(result.is_ok());
let receipt_result = result.unwrap().unwrap();
assert_eq!(receipt_result.header.hash(), block_hash_1);
assert_eq!(receipt_result.header.number, 100);
assert_eq!(receipt_result.receipts.len(), 2);
// verify receipts
assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
assert_eq!(
receipt_result.receipts[0].cumulative_gas_used,
receipt_100_1.cumulative_gas_used
);
assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
assert_eq!(
receipt_result.receipts[1].cumulative_gas_used,
receipt_100_2.cumulative_gas_used
);
assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
// second call should return the second block with receipts
let result2 = range_mode.next().await;
assert!(result2.is_ok());
let receipt_result2 = result2.unwrap().unwrap();
assert_eq!(receipt_result2.header.hash(), block_hash_2);
assert_eq!(receipt_result2.header.number, 101);
assert_eq!(receipt_result2.receipts.len(), 1);
// verify receipts
assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
assert_eq!(
receipt_result2.receipts[0].cumulative_gas_used,
receipt_101_1.cumulative_gas_used
);
assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
// third call should return None since no more blocks with receipts
let result3 = range_mode.next().await;
assert!(result3.is_ok());
assert!(result3.unwrap().is_none());
}
#[tokio::test]
async fn test_range_block_mode_iterator_exhaustion() {
let provider = MockEthProvider::default();
let eth_api = build_test_eth_api(provider);
let eth_filter = super::EthFilter::new(
eth_api,
EthFilterConfig::default(),
Box::new(TokioTaskExecutor::default()),
);
let filter_inner = eth_filter.inner;
let headers = vec![
SealedHeader::new(
alloy_consensus::Header { number: 100, ..Default::default() },
FixedBytes::random(),
),
SealedHeader::new(
alloy_consensus::Header { number: 101, ..Default::default() },
FixedBytes::random(),
),
];
let mut range_mode = RangeBlockMode {
filter_inner,
iter: headers.into_iter().peekable(),
next: VecDeque::new(),
max_range: 1,
};
let result1 = range_mode.next().await;
assert!(result1.is_ok());
assert!(range_mode.iter.peek().is_some());
let result2 = range_mode.next().await;
assert!(result2.is_ok());
// now iterator should be exhausted
assert!(range_mode.iter.peek().is_none());
// further calls should return None
let result3 = range_mode.next().await;
assert!(result3.is_ok());
assert!(result3.unwrap().is_none());
}
#[tokio::test]
async fn test_cached_mode_with_mock_receipts() {
// create test data
let test_hash = FixedBytes::from([42u8; 32]);
let test_block_number = 100u64;
let test_header = SealedHeader::new(
alloy_consensus::Header {
number: test_block_number,
gas_used: 50_000,
..Default::default()
},
test_hash,
);
// add a mock receipt to the provider with a mock log
let mock_log = alloy_primitives::Log {
address: alloy_primitives::Address::ZERO,
data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
};
let mock_receipt = reth_ethereum_primitives::Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 21_000,
logs: vec![mock_log],
success: true,
};
let provider = MockEthProvider::default();
provider.add_header(test_hash, test_header.header().clone());
provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
let eth_api = build_test_eth_api(provider);
let eth_filter = super::EthFilter::new(
eth_api,
EthFilterConfig::default(),
Box::new(TokioTaskExecutor::default()),
);
let filter_inner = eth_filter.inner;
let headers = vec![test_header.clone()];
let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
// should find the receipt from provider fallback (cache will be empty)
let result = cached_mode.next().await.expect("next should succeed");
let receipt_block_result = result.expect("should have receipt result");
assert_eq!(receipt_block_result.header.hash(), test_hash);
assert_eq!(receipt_block_result.header.number, test_block_number);
assert_eq!(receipt_block_result.receipts.len(), 1);
assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
assert_eq!(
receipt_block_result.receipts[0].cumulative_gas_used,
mock_receipt.cumulative_gas_used
);
assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
// iterator should be exhausted
let result2 = cached_mode.next().await;
assert!(result2.is_ok());
assert!(result2.unwrap().is_none());
}
#[tokio::test]
async fn test_cached_mode_empty_headers() {
let provider = MockEthProvider::default();
let eth_api = build_test_eth_api(provider);
let eth_filter = super::EthFilter::new(
eth_api,
EthFilterConfig::default(),
Box::new(TokioTaskExecutor::default()),
);
let filter_inner = eth_filter.inner;
let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
// should immediately return None for empty headers
let result = cached_mode.next().await.expect("next should succeed");
assert!(result.is_none());
}
#[tokio::test]
async fn test_non_consecutive_headers_after_bloom_filter() {
let provider = MockEthProvider::default();
// Create 4 headers where only blocks 100 and 102 will match bloom filter
let mut expected_hashes = vec![];
let mut prev_hash = alloy_primitives::B256::default();
// Create a transaction for blocks that will have receipts
use alloy_consensus::TxLegacy;
use reth_ethereum_primitives::{TransactionSigned, TxType};
let tx_inner = TxLegacy {
chain_id: Some(1),
nonce: 0,
gas_price: 21_000,
gas_limit: 21_000,
to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
value: alloy_primitives::U256::ZERO,
input: alloy_primitives::Bytes::new(),
};
let signature = alloy_primitives::Signature::test_signature();
let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
for i in 100u64..=103 {
let header = alloy_consensus::Header {
number: i,
parent_hash: prev_hash,
// Set bloom to match filter only for blocks 100 and 102
logs_bloom: if i == 100 || i == 102 {
alloy_primitives::Bloom::from([1u8; 256])
} else {
alloy_primitives::Bloom::default()
},
..Default::default()
};
let hash = header.hash_slow();
expected_hashes.push(hash);
prev_hash = hash;
// Add transaction to blocks that will have receipts (100 and 102)
let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
let block = reth_ethereum_primitives::Block {
header,
body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
};
provider.add_block(hash, block);
}
// Add receipts with logs only to blocks that match bloom
let mock_log = alloy_primitives::Log {
address: alloy_primitives::Address::ZERO,
data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
};
let receipt = reth_ethereum_primitives::Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 21_000,
logs: vec![mock_log],
success: true,
};
provider.add_receipts(100, vec![receipt.clone()]);
provider.add_receipts(101, vec![]);
provider.add_receipts(102, vec![receipt.clone()]);
provider.add_receipts(103, vec![]);
// Add block body indices for each block so receipts can be fetched
use reth_db_api::models::StoredBlockBodyIndices;
provider
.add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
provider
.add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
provider
.add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
provider
.add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
let eth_api = build_test_eth_api(provider);
let eth_filter = EthFilter::new(
eth_api,
EthFilterConfig::default(),
Box::new(TokioTaskExecutor::default()),
);
// Use default filter which will match any non-empty bloom
let filter = Filter::default();
// Get logs in the range - this will trigger the bloom filtering
let logs = eth_filter
.inner
.clone()
.get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
.await
.expect("should succeed");
// We should get logs from blocks 100 and 102 only (bloom filtered)
assert_eq!(logs.len(), 2);
assert_eq!(logs[0].block_number, Some(100));
assert_eq!(logs[1].block_number, Some(102));
// Each block hash should be the hash of its own header, not derived from any other header
assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
}
}

View File

@@ -64,6 +64,8 @@ pub struct MockEthProvider<
pub chain_spec: Arc<ChainSpec>,
/// Local state roots
pub state_roots: Arc<Mutex<Vec<B256>>>,
/// Local block body indices store
pub block_body_indices: Arc<Mutex<HashMap<BlockNumber, StoredBlockBodyIndices>>>,
tx: TxMock,
prune_modes: Arc<PruneModes>,
}
@@ -80,6 +82,7 @@ where
accounts: self.accounts.clone(),
chain_spec: self.chain_spec.clone(),
state_roots: self.state_roots.clone(),
block_body_indices: self.block_body_indices.clone(),
tx: self.tx.clone(),
prune_modes: self.prune_modes.clone(),
}
@@ -96,6 +99,7 @@ impl<T: NodePrimitives> MockEthProvider<T, reth_chainspec::ChainSpec> {
accounts: Default::default(),
chain_spec: Arc::new(reth_chainspec::ChainSpecBuilder::mainnet().build()),
state_roots: Default::default(),
block_body_indices: Default::default(),
tx: Default::default(),
prune_modes: Default::default(),
}
@@ -156,6 +160,15 @@ impl<ChainSpec> MockEthProvider<reth_ethereum_primitives::EthPrimitives, ChainSp
}
}
/// Add block body indices to local store
pub fn add_block_body_indices(
&self,
block_number: BlockNumber,
indices: StoredBlockBodyIndices,
) {
self.block_body_indices.lock().insert(block_number, indices);
}
/// Add state root to local state root store
pub fn add_state_root(&self, state_root: B256) {
self.state_roots.lock().push(state_root);
@@ -173,6 +186,7 @@ impl<ChainSpec> MockEthProvider<reth_ethereum_primitives::EthPrimitives, ChainSp
accounts: self.accounts,
chain_spec: Arc::new(chain_spec),
state_roots: self.state_roots,
block_body_indices: self.block_body_indices,
tx: self.tx,
prune_modes: self.prune_modes,
}
@@ -954,8 +968,8 @@ impl<T: NodePrimitives, ChainSpec: EthChainSpec + Send + Sync + 'static> StatePr
impl<T: NodePrimitives, ChainSpec: Send + Sync> BlockBodyIndicesProvider
for MockEthProvider<T, ChainSpec>
{
fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
Ok(None)
fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
Ok(self.block_body_indices.lock().get(&num).copied())
}
fn block_body_indices_range(
&self,