From 8e5644dac09c6e49806b24b3dd096a080be5ea5a Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 14 Mar 2023 17:03:36 +0100 Subject: [PATCH] feat(rpc): complete log filter (#1741) --- crates/rpc/rpc/src/eth/filter.rs | 62 +++++++++++++----------- crates/rpc/rpc/src/eth/logs_utils.rs | 72 ++++++++++++++++++++++++++++ crates/rpc/rpc/src/eth/mod.rs | 1 + crates/rpc/rpc/src/eth/pubsub.rs | 58 ++++------------------ 4 files changed, 115 insertions(+), 78 deletions(-) create mode 100644 crates/rpc/rpc/src/eth/logs_utils.rs diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 9c7bdda56d..064bf14dfc 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -1,5 +1,5 @@ use crate::{ - eth::error::EthApiError, + eth::{error::EthApiError, logs_utils}, result::{internal_rpc_err, rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, }; @@ -7,7 +7,7 @@ use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_primitives::{ filter::{Filter, FilterBlockOption, FilteredParams}, - Block, U256, + U256, }; use reth_provider::{BlockProvider, EvmEnvProvider}; use reth_rpc_api::EthFilterApiServer; @@ -150,7 +150,7 @@ where trace!(target: "rpc::eth::filter", ?id, "uninstalled filter"); Ok(true) } else { - Err(internal_rpc_err(format!("Filter id {id:?} does not exist."))) + Ok(false) } } @@ -201,9 +201,8 @@ where /// Returns an error if: /// - underlying database error /// - amount of matches exceeds configured limit - #[allow(dead_code)] fn filter_logs(&self, filter: &Filter, from_block: u64, to_block: u64) -> RpcResult> { - let mut logs = Vec::new(); + let mut all_logs = Vec::new(); let filter_params = FilteredParams::new(Some(filter.clone())); let topics = @@ -213,38 +212,41 @@ where let address_filter = FilteredParams::address_filter(&filter.address); let topics_filter = FilteredParams::topics_filter(&topics); + // loop over the range of new blocks and check logs if the filter matches the log's bloom + // filter for block_number in from_block..=to_block { if let Some(block) = self.client.block_by_number(block_number).to_rpc_result()? { // only if filter matches if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) && FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter) { - self.append_matching_block_logs(&mut logs, &filter_params, block); + // get receipts for the block + if let Some(receipts) = + self.client.receipts_by_block(block.number.into()).to_rpc_result()? + { + let block_hash = block.hash_slow(); - // TODO size check + logs_utils::append_matching_block_logs( + &mut all_logs, + &filter_params, + block_hash, + block_number, + block.body.into_iter().map(|tx| tx.hash).zip(receipts), + ); + + // size check + if all_logs.len() > self.max_logs_in_response { + return Err(FilterError::QueryExceedsMaxResults( + self.max_logs_in_response, + ) + .into()) + } + } } } } - Ok(logs) - } - - /// Appends all logs emitted in the `block` that match the `filter` to the `logs` vector. - #[allow(clippy::ptr_arg)] - fn append_matching_block_logs( - &self, - _logs: &mut Vec, - _filter: &FilteredParams, - block: Block, - ) { - let _block_log_index: u32 = 0; - let _block_hash = block.hash_slow(); - - // loop over all transactions in the block - for tx in block.body { - let _transaction_log_index: u32 = 0; - let _transaction_hash = tx.hash; - } + Ok(all_logs) } } @@ -266,7 +268,6 @@ struct ActiveFilter { } #[derive(Clone, Debug)] -#[allow(clippy::large_enum_variant)] enum FilterKind { Log(Box), Block, @@ -278,6 +279,8 @@ enum FilterKind { pub enum FilterError { #[error("filter not found")] FilterNotFound(FilterId), + #[error("Query exceeds max results {0}")] + QueryExceedsMaxResults(usize), } // convert the error @@ -285,9 +288,12 @@ impl From for jsonrpsee::core::Error { fn from(err: FilterError) -> Self { match err { FilterError::FilterNotFound(_) => rpc_error_with_code( - jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE, + jsonrpsee::types::error::INVALID_PARAMS_CODE, "filter not found", ), + err @ FilterError::QueryExceedsMaxResults(_) => { + rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string()) + } } } } diff --git a/crates/rpc/rpc/src/eth/logs_utils.rs b/crates/rpc/rpc/src/eth/logs_utils.rs new file mode 100644 index 0000000000..41f3cdf87e --- /dev/null +++ b/crates/rpc/rpc/src/eth/logs_utils.rs @@ -0,0 +1,72 @@ +use reth_primitives::{filter::FilteredParams, Receipt, TxHash, U256}; +use reth_rpc_types::Log; +use revm::primitives::B256 as H256; + +/// Returns all matching logs of a block's receipts grouped with the hash of their transaction. +pub(crate) fn matching_block_logs( + filter: &FilteredParams, + block_hash: H256, + block_number: u64, + tx_and_receipts: I, +) -> Vec +where + I: IntoIterator, +{ + let mut all_logs = Vec::new(); + append_matching_block_logs(&mut all_logs, filter, block_hash, block_number, tx_and_receipts); + all_logs +} + +/// Appends all matching logs of a block's receipts grouped with the hash of their transaction +pub(crate) fn append_matching_block_logs( + all_logs: &mut Vec, + filter: &FilteredParams, + block_hash: H256, + block_number: u64, + tx_and_receipts: I, +) where + I: IntoIterator, +{ + let block_number_u256 = U256::from(block_number); + // tracks the index of a log in the entire block + let mut log_index: u32 = 0; + for (transaction_idx, (transaction_hash, receipt)) in tx_and_receipts.into_iter().enumerate() { + let logs = receipt.logs; + for (transaction_log_idx, log) in logs.into_iter().enumerate() { + if log_matches_filter(block_hash, block_number, &log, filter) { + let log = Log { + address: log.address, + topics: log.topics, + data: log.data, + block_hash: Some(block_hash), + block_number: Some(block_number_u256), + transaction_hash: Some(transaction_hash), + transaction_index: Some(U256::from(transaction_idx)), + log_index: Some(U256::from(log_index)), + transaction_log_index: Some(U256::from(transaction_log_idx)), + removed: false, + }; + all_logs.push(log); + } + log_index += 1; + } + } +} + +/// Returns true if the log matches the filter and should be included +pub(crate) fn log_matches_filter( + block_hash: H256, + block_number: u64, + log: &reth_primitives::Log, + params: &FilteredParams, +) -> bool { + if params.filter.is_some() && + (!params.filter_block_range(block_number) || + !params.filter_block_hash(block_hash) || + !params.filter_address(log) || + !params.filter_topics(log)) + { + return false + } + true +} diff --git a/crates/rpc/rpc/src/eth/mod.rs b/crates/rpc/rpc/src/eth/mod.rs index 7dee1a1257..6958b1238d 100644 --- a/crates/rpc/rpc/src/eth/mod.rs +++ b/crates/rpc/rpc/src/eth/mod.rs @@ -5,6 +5,7 @@ pub mod cache; pub mod error; mod filter; mod id_provider; +mod logs_utils; mod pubsub; pub(crate) mod revm_utils; mod signer; diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 5a2ee798ef..ff35b78ba9 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -1,9 +1,10 @@ //! `eth_` PubSub RPC handler implementation +use crate::eth::logs_utils; use futures::StreamExt; use jsonrpsee::{types::SubscriptionResult, SubscriptionSink}; use reth_interfaces::{events::ChainEventSubscriptions, sync::SyncStateProvider}; -use reth_primitives::{filter::FilteredParams, BlockId, TxHash, H256, U256}; +use reth_primitives::{filter::FilteredParams, BlockId, TxHash}; use reth_provider::{BlockProvider, EvmEnvProvider}; use reth_rpc_api::EthPubSubApiServer; use reth_rpc_types::{ @@ -226,56 +227,13 @@ where .flat_map(move |(new_block, transactions, receipts)| { let block_hash = new_block.hash; let block_number = new_block.header.number; - let mut all_logs: Vec = Vec::new(); - - // tracks the index of a log in the entire block - let mut log_index: u32 = 0; - for (transaction_idx, (tx, receipt)) in - transactions.into_iter().zip(receipts).enumerate() - { - let logs = receipt.logs; - - // tracks the index of the log in the transaction - let transaction_hash = tx.hash; - - for (transaction_log_idx, log) in logs.into_iter().enumerate() { - if matches_filter(block_hash, block_number, &log, &filter) { - let log = Log { - address: log.address, - topics: log.topics, - data: log.data, - block_hash: Some(block_hash), - block_number: Some(U256::from(block_number)), - transaction_hash: Some(transaction_hash), - transaction_index: Some(U256::from(transaction_idx)), - log_index: Some(U256::from(log_index)), - transaction_log_index: Some(U256::from(transaction_log_idx)), - removed: false, - }; - all_logs.push(log); - } - log_index += 1; - } - } + let all_logs = logs_utils::matching_block_logs( + &filter, + block_hash, + block_number, + transactions.into_iter().map(|tx| tx.hash).zip(receipts), + ); futures::stream::iter(all_logs) }) } } - -/// Returns true if the log matches the filter and should be included -fn matches_filter( - block_hash: H256, - block_number: u64, - log: &reth_primitives::Log, - params: &FilteredParams, -) -> bool { - if params.filter.is_some() && - (!params.filter_block_range(block_number) || - !params.filter_block_hash(block_hash) || - !params.filter_address(log) || - !params.filter_topics(log)) - { - return false - } - true -}