diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index e6403d11a8..94aed3e807 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -1,8 +1,8 @@ use super::cache::EthStateCache; use crate::{ eth::{ - error::{EthApiError, EthResult}, - logs_utils, + error::EthApiError, + logs_utils::{self, append_matching_block_logs}, }, result::{rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, @@ -11,11 +11,11 @@ use core::fmt; use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; -use reth_primitives::{BlockHashOrNumber, IntoRecoveredTransaction, Receipt, SealedBlock, TxHash}; +use reth_primitives::{IntoRecoveredTransaction, TxHash}; use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError}; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{ - Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log, + BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log, PendingTransactionFilterKind, }; use reth_tasks::TaskSpawner; @@ -351,17 +351,18 @@ where FilterBlockOption::AtBlockHash(block_hash) => { let mut all_logs = Vec::new(); // all matching logs in the block, if it exists - if let Some((block, receipts)) = - self.eth_cache.get_block_and_receipts(block_hash).await? - { - let filter = FilteredParams::new(Some(filter)); - logs_utils::append_matching_block_logs( - &mut all_logs, - &filter, - (block_hash, block.number).into(), - block.body.into_iter().map(|tx| tx.hash()).zip(receipts.iter()), - false, - ); + if let Some(block_number) = self.provider.block_number_for_id(block_hash.into())? { + if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? { + let filter = FilteredParams::new(Some(filter)); + logs_utils::append_matching_block_logs( + &mut all_logs, + &self.provider, + &filter, + (block_hash, block_number).into(), + &receipts, + false, + )?; + } } Ok(all_logs) } @@ -402,19 +403,6 @@ where Ok(id) } - /// Fetches both receipts and block for the given block number. - async fn block_and_receipts_by_number( - &self, - hash_or_number: BlockHashOrNumber, - ) -> EthResult>)>> { - let block_hash = match self.provider.convert_block_hash(hash_or_number)? { - Some(hash) => hash, - None => return Ok(None), - }; - - Ok(self.eth_cache.get_block_and_receipts(block_hash).await?) - } - /// Returns all logs in the given _inclusive_ range that match the filter /// /// Returns an error if: @@ -447,29 +435,29 @@ where let headers = self.provider.headers_range(from..=to)?; for (idx, header) in headers.iter().enumerate() { - // these are consecutive headers, so we can use the parent hash of the next block to - // get the current header's hash - let num_hash: BlockHashOrNumber = headers - .get(idx + 1) - .map(|h| h.parent_hash.into()) - .unwrap_or_else(|| header.number.into()); - // only if filter matches if FilteredParams::matches_address(header.logs_bloom, &address_filter) && FilteredParams::matches_topics(header.logs_bloom, &topics_filter) { - if let Some((block, receipts)) = - self.block_and_receipts_by_number(num_hash).await? - { - let block_hash = block.hash; + // these are consecutive headers, so we can use the parent hash of the next + // block to get the current header's hash + let block_hash = match headers.get(idx + 1) { + Some(parent) => parent.parent_hash, + None => self + .provider + .block_hash(header.number)? + .ok_or(ProviderError::BlockNotFound(header.number.into()))?, + }; - logs_utils::append_matching_block_logs( + if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? { + append_matching_block_logs( &mut all_logs, + &self.provider, &filter_params, - (block.number, block_hash).into(), - block.body.into_iter().map(|tx| tx.hash()).zip(receipts.iter()), + BlockNumHash::new(header.number, block_hash), + &receipts, false, - ); + )?; // size check but only if range is multiple blocks, so we always return all // logs of a single block diff --git a/crates/rpc/rpc/src/eth/logs_utils.rs b/crates/rpc/rpc/src/eth/logs_utils.rs index fc2edb0451..0bc3795643 100644 --- a/crates/rpc/rpc/src/eth/logs_utils.rs +++ b/crates/rpc/rpc/src/eth/logs_utils.rs @@ -1,46 +1,37 @@ -use reth_primitives::{BlockNumHash, ChainInfo, Receipt, TxHash, U256}; +use super::filter::FilterError; +use alloy_primitives::TxHash; +use reth_primitives::{BlockNumHash, ChainInfo, Receipt, U256}; +use reth_provider::{BlockReader, ProviderError}; use reth_rpc_types::{FilteredParams, Log}; use reth_rpc_types_compat::log::from_primitive_log; -/// Returns all matching logs of a block's receipts grouped with the hash of their transaction. -pub(crate) fn matching_block_logs<'a, I>( +/// Returns all matching of a block's receipts when the transaction hashes are known. +pub(crate) fn matching_block_logs_with_tx_hashes<'a, I>( filter: &FilteredParams, - block: BlockNumHash, - tx_and_receipts: I, + block_num_hash: BlockNumHash, + tx_hashes_and_receipts: I, removed: bool, ) -> Vec where I: IntoIterator, { let mut all_logs = Vec::new(); - append_matching_block_logs(&mut all_logs, filter, block, tx_and_receipts, removed); - all_logs -} - -/// Appends all matching logs of a block's receipts grouped with the hash of their transaction -pub(crate) fn append_matching_block_logs<'a, I>( - all_logs: &mut Vec, - filter: &FilteredParams, - block: BlockNumHash, - tx_and_receipts: I, - removed: bool, -) where - I: IntoIterator, -{ - let block_number_u256 = U256::from(block.number); - // tracks the index of a log in the entire block + // Tracks the index of a log in the entire block. let mut log_index: u32 = 0; - for (transaction_idx, (transaction_hash, receipt)) in tx_and_receipts.into_iter().enumerate() { - for log in receipt.logs.iter() { - if log_matches_filter(block, log, filter) { + // Iterate over transaction hashes and receipts and append matching logs. + for (receipt_idx, (tx_hash, receipt)) in tx_hashes_and_receipts.into_iter().enumerate() { + let logs = &receipt.logs; + for log in logs { + if log_matches_filter(block_num_hash, log, filter) { let log = Log { address: log.address, topics: log.topics.clone(), data: log.data.clone(), - block_hash: Some(block.hash), - block_number: Some(block_number_u256), - transaction_hash: Some(transaction_hash), - transaction_index: Some(U256::from(transaction_idx)), + block_hash: Some(block_num_hash.hash), + block_number: Some(U256::from(block_num_hash.number)), + transaction_hash: Some(tx_hash), + // The transaction and receipt index is always the same. + transaction_index: Some(U256::from(receipt_idx)), log_index: Some(U256::from(log_index)), removed, }; @@ -49,6 +40,68 @@ pub(crate) fn append_matching_block_logs<'a, I>( log_index += 1; } } + all_logs +} + +/// Appends all matching logs of a block's receipts. +/// If the log matches, look up the corresponding transaction hash. +pub(crate) fn append_matching_block_logs( + all_logs: &mut Vec, + provider: impl BlockReader, + filter: &FilteredParams, + block_num_hash: BlockNumHash, + receipts: &[Receipt], + removed: bool, +) -> Result<(), FilterError> { + // Tracks the index of a log in the entire block. + let mut log_index: u32 = 0; + + // Lazy loaded number of the first transaction in the block. + // This is useful for blocks with multiple matching logs because it prevents + // re-querying the block body indices. + let mut loaded_first_tx_num = None; + + // Iterate over receipts and append matching logs. + for (receipt_idx, receipt) in receipts.iter().enumerate() { + let logs = &receipt.logs; + for log in logs { + if log_matches_filter(block_num_hash, log, filter) { + let first_tx_num = match loaded_first_tx_num { + Some(num) => num, + None => { + let block_body_indices = + provider.block_body_indices(block_num_hash.number)?.ok_or( + ProviderError::BlockBodyIndicesNotFound(block_num_hash.number), + )?; + loaded_first_tx_num = Some(block_body_indices.first_tx_num); + block_body_indices.first_tx_num + } + }; + + // This is safe because Transactions and Receipts have the same keys. + let transaction_id = first_tx_num + receipt_idx as u64; + let transaction = provider + .transaction_by_id(transaction_id)? + .ok_or(ProviderError::TransactionNotFound(transaction_id.into()))?; + + let log = Log { + address: log.address, + topics: log.topics.clone(), + data: log.data.clone(), + block_hash: Some(block_num_hash.hash), + block_number: Some(U256::from(block_num_hash.number)), + transaction_hash: Some(transaction.hash()), + // The transaction and receipt index is always the same. + transaction_index: Some(U256::from(receipt_idx)), + log_index: Some(U256::from(log_index)), + removed, + }; + all_logs.push(log); + } + log_index += 1; + } + } + Ok(()) } /// Returns true if the log matches the filter and should be included diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index aa746464e4..ded49d2a1b 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -308,7 +308,7 @@ where }) .flat_map(futures::stream::iter) .flat_map(move |(block_receipts, removed)| { - let all_logs = logs_utils::matching_block_logs( + let all_logs = logs_utils::matching_block_logs_with_tx_hashes( &filter, block_receipts.block, block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),