perf: skip initial transaction lookup during log queries (#5805)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Roman Krasiuk
2023-12-18 02:03:39 -08:00
committed by GitHub
parent 18c881532f
commit 43f29fe7c2
3 changed files with 113 additions and 72 deletions

View File

@@ -1,8 +1,8 @@
use super::cache::EthStateCache;
use crate::{
eth::{
error::{EthApiError, EthResult},
logs_utils,
error::EthApiError,
logs_utils::{self, append_matching_block_logs},
},
result::{rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider,
@@ -11,11 +11,11 @@ use core::fmt;
use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_primitives::{BlockHashOrNumber, IntoRecoveredTransaction, Receipt, SealedBlock, TxHash};
use reth_primitives::{IntoRecoveredTransaction, TxHash};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError};
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{
Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
PendingTransactionFilterKind,
};
use reth_tasks::TaskSpawner;
@@ -351,17 +351,18 @@ where
FilterBlockOption::AtBlockHash(block_hash) => {
let mut all_logs = Vec::new();
// all matching logs in the block, if it exists
if let Some((block, receipts)) =
self.eth_cache.get_block_and_receipts(block_hash).await?
{
let filter = FilteredParams::new(Some(filter));
logs_utils::append_matching_block_logs(
&mut all_logs,
&filter,
(block_hash, block.number).into(),
block.body.into_iter().map(|tx| tx.hash()).zip(receipts.iter()),
false,
);
if let Some(block_number) = self.provider.block_number_for_id(block_hash.into())? {
if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? {
let filter = FilteredParams::new(Some(filter));
logs_utils::append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter,
(block_hash, block_number).into(),
&receipts,
false,
)?;
}
}
Ok(all_logs)
}
@@ -402,19 +403,6 @@ where
Ok(id)
}
/// Fetches both receipts and block for the given block number.
async fn block_and_receipts_by_number(
&self,
hash_or_number: BlockHashOrNumber,
) -> EthResult<Option<(SealedBlock, Arc<Vec<Receipt>>)>> {
let block_hash = match self.provider.convert_block_hash(hash_or_number)? {
Some(hash) => hash,
None => return Ok(None),
};
Ok(self.eth_cache.get_block_and_receipts(block_hash).await?)
}
/// Returns all logs in the given _inclusive_ range that match the filter
///
/// Returns an error if:
@@ -447,29 +435,29 @@ where
let headers = self.provider.headers_range(from..=to)?;
for (idx, header) in headers.iter().enumerate() {
// these are consecutive headers, so we can use the parent hash of the next block to
// get the current header's hash
let num_hash: BlockHashOrNumber = headers
.get(idx + 1)
.map(|h| h.parent_hash.into())
.unwrap_or_else(|| header.number.into());
// only if filter matches
if FilteredParams::matches_address(header.logs_bloom, &address_filter) &&
FilteredParams::matches_topics(header.logs_bloom, &topics_filter)
{
if let Some((block, receipts)) =
self.block_and_receipts_by_number(num_hash).await?
{
let block_hash = block.hash;
// these are consecutive headers, so we can use the parent hash of the next
// block to get the current header's hash
let block_hash = match headers.get(idx + 1) {
Some(parent) => parent.parent_hash,
None => self
.provider
.block_hash(header.number)?
.ok_or(ProviderError::BlockNotFound(header.number.into()))?,
};
logs_utils::append_matching_block_logs(
if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? {
append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter_params,
(block.number, block_hash).into(),
block.body.into_iter().map(|tx| tx.hash()).zip(receipts.iter()),
BlockNumHash::new(header.number, block_hash),
&receipts,
false,
);
)?;
// size check but only if range is multiple blocks, so we always return all
// logs of a single block

View File

@@ -1,46 +1,37 @@
use reth_primitives::{BlockNumHash, ChainInfo, Receipt, TxHash, U256};
use super::filter::FilterError;
use alloy_primitives::TxHash;
use reth_primitives::{BlockNumHash, ChainInfo, Receipt, U256};
use reth_provider::{BlockReader, ProviderError};
use reth_rpc_types::{FilteredParams, Log};
use reth_rpc_types_compat::log::from_primitive_log;
/// Returns all matching logs of a block's receipts grouped with the hash of their transaction.
pub(crate) fn matching_block_logs<'a, I>(
/// Returns all matching of a block's receipts when the transaction hashes are known.
pub(crate) fn matching_block_logs_with_tx_hashes<'a, I>(
filter: &FilteredParams,
block: BlockNumHash,
tx_and_receipts: I,
block_num_hash: BlockNumHash,
tx_hashes_and_receipts: I,
removed: bool,
) -> Vec<Log>
where
I: IntoIterator<Item = (TxHash, &'a Receipt)>,
{
let mut all_logs = Vec::new();
append_matching_block_logs(&mut all_logs, filter, block, tx_and_receipts, removed);
all_logs
}
/// Appends all matching logs of a block's receipts grouped with the hash of their transaction
pub(crate) fn append_matching_block_logs<'a, I>(
all_logs: &mut Vec<Log>,
filter: &FilteredParams,
block: BlockNumHash,
tx_and_receipts: I,
removed: bool,
) where
I: IntoIterator<Item = (TxHash, &'a Receipt)>,
{
let block_number_u256 = U256::from(block.number);
// tracks the index of a log in the entire block
// Tracks the index of a log in the entire block.
let mut log_index: u32 = 0;
for (transaction_idx, (transaction_hash, receipt)) in tx_and_receipts.into_iter().enumerate() {
for log in receipt.logs.iter() {
if log_matches_filter(block, log, filter) {
// Iterate over transaction hashes and receipts and append matching logs.
for (receipt_idx, (tx_hash, receipt)) in tx_hashes_and_receipts.into_iter().enumerate() {
let logs = &receipt.logs;
for log in logs {
if log_matches_filter(block_num_hash, log, filter) {
let log = Log {
address: log.address,
topics: log.topics.clone(),
data: log.data.clone(),
block_hash: Some(block.hash),
block_number: Some(block_number_u256),
transaction_hash: Some(transaction_hash),
transaction_index: Some(U256::from(transaction_idx)),
block_hash: Some(block_num_hash.hash),
block_number: Some(U256::from(block_num_hash.number)),
transaction_hash: Some(tx_hash),
// The transaction and receipt index is always the same.
transaction_index: Some(U256::from(receipt_idx)),
log_index: Some(U256::from(log_index)),
removed,
};
@@ -49,6 +40,68 @@ pub(crate) fn append_matching_block_logs<'a, I>(
log_index += 1;
}
}
all_logs
}
/// Appends all matching logs of a block's receipts.
/// If the log matches, look up the corresponding transaction hash.
pub(crate) fn append_matching_block_logs(
all_logs: &mut Vec<Log>,
provider: impl BlockReader,
filter: &FilteredParams,
block_num_hash: BlockNumHash,
receipts: &[Receipt],
removed: bool,
) -> Result<(), FilterError> {
// Tracks the index of a log in the entire block.
let mut log_index: u32 = 0;
// Lazy loaded number of the first transaction in the block.
// This is useful for blocks with multiple matching logs because it prevents
// re-querying the block body indices.
let mut loaded_first_tx_num = None;
// Iterate over receipts and append matching logs.
for (receipt_idx, receipt) in receipts.iter().enumerate() {
let logs = &receipt.logs;
for log in logs {
if log_matches_filter(block_num_hash, log, filter) {
let first_tx_num = match loaded_first_tx_num {
Some(num) => num,
None => {
let block_body_indices =
provider.block_body_indices(block_num_hash.number)?.ok_or(
ProviderError::BlockBodyIndicesNotFound(block_num_hash.number),
)?;
loaded_first_tx_num = Some(block_body_indices.first_tx_num);
block_body_indices.first_tx_num
}
};
// This is safe because Transactions and Receipts have the same keys.
let transaction_id = first_tx_num + receipt_idx as u64;
let transaction = provider
.transaction_by_id(transaction_id)?
.ok_or(ProviderError::TransactionNotFound(transaction_id.into()))?;
let log = Log {
address: log.address,
topics: log.topics.clone(),
data: log.data.clone(),
block_hash: Some(block_num_hash.hash),
block_number: Some(U256::from(block_num_hash.number)),
transaction_hash: Some(transaction.hash()),
// The transaction and receipt index is always the same.
transaction_index: Some(U256::from(receipt_idx)),
log_index: Some(U256::from(log_index)),
removed,
};
all_logs.push(log);
}
log_index += 1;
}
}
Ok(())
}
/// Returns true if the log matches the filter and should be included

View File

@@ -308,7 +308,7 @@ where
})
.flat_map(futures::stream::iter)
.flat_map(move |(block_receipts, removed)| {
let all_logs = logs_utils::matching_block_logs(
let all_logs = logs_utils::matching_block_logs_with_tx_hashes(
&filter,
block_receipts.block,
block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),