diff --git a/crates/rpc/rpc/src/eth/api/block.rs b/crates/rpc/rpc/src/eth/api/block.rs index c1c835107c..e619d210a4 100644 --- a/crates/rpc/rpc/src/eth/api/block.rs +++ b/crates/rpc/rpc/src/eth/api/block.rs @@ -1,5 +1,7 @@ //! Contains RPC handler implementations specific to blocks. +use std::sync::Arc; + use crate::{ eth::{ api::transactions::build_transaction_receipt_with_block_receipts, @@ -65,7 +67,10 @@ where let mut block_and_receipts = None; if block_id.is_pending() { - block_and_receipts = self.provider().pending_block_and_receipts()?; + block_and_receipts = self + .provider() + .pending_block_and_receipts()? + .map(|(sb, receipts)| (sb, Arc::new(receipts))); } else if let Some(block_hash) = self.provider().block_hash_for_id(block_id)? { block_and_receipts = self.cache().get_block_and_receipts(block_hash).await?; } @@ -87,7 +92,7 @@ where let receipts = block .body .into_iter() - .zip(receipts.clone()) + .zip(receipts.iter()) .enumerate() .map(|(idx, (tx, receipt))| { let meta = TransactionMeta { @@ -106,7 +111,7 @@ where build_transaction_receipt_with_block_receipts( tx, meta, - receipt, + receipt.clone(), &receipts, #[cfg(feature = "optimism")] op_tx_meta, diff --git a/crates/rpc/rpc/src/eth/api/fee_history.rs b/crates/rpc/rpc/src/eth/api/fee_history.rs index 8c01d672c1..399f776503 100644 --- a/crates/rpc/rpc/src/eth/api/fee_history.rs +++ b/crates/rpc/rpc/src/eth/api/fee_history.rs @@ -70,7 +70,7 @@ impl FeeHistoryCache { /// Insert block data into the cache. async fn insert_blocks(&self, blocks: I) where - I: Iterator)>, + I: Iterator>)>, { let mut entries = self.inner.entries.write().await; @@ -244,7 +244,7 @@ pub async fn fee_history_cache_new_blocks_task( let (blocks, receipts): (Vec<_>, Vec<_>) = committed .blocks_and_receipts() .map(|(block, receipts)| { - (block.block.clone(), receipts.iter().flatten().cloned().collect::>()) + (block.block.clone(), Arc::new(receipts.iter().flatten().cloned().collect::>())) }) .unzip(); fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await; diff --git a/crates/rpc/rpc/src/eth/cache/mod.rs b/crates/rpc/rpc/src/eth/cache/mod.rs index 80cf92d1b7..05c1686829 100644 --- a/crates/rpc/rpc/src/eth/cache/mod.rs +++ b/crates/rpc/rpc/src/eth/cache/mod.rs @@ -40,7 +40,7 @@ type BlockTransactionsResponseSender = type BlockWithSendersResponseSender = oneshot::Sender>>; /// The type that can send the response to the requested receipts of a block. -type ReceiptsResponseSender = oneshot::Sender>>>; +type ReceiptsResponseSender = oneshot::Sender>>>>; /// The type that can send the response to a requested env type EnvResponseSender = oneshot::Sender>; @@ -52,7 +52,8 @@ type BlockLruCache = MultiConsumerLruCache< Either, >; -type ReceiptsLruCache = MultiConsumerLruCache, L, ReceiptsResponseSender>; +type ReceiptsLruCache = + MultiConsumerLruCache>, L, ReceiptsResponseSender>; type EnvLruCache = MultiConsumerLruCache; @@ -180,7 +181,7 @@ impl EthStateCache { pub async fn get_transactions_and_receipts( &self, block_hash: B256, - ) -> ProviderResult, Vec)>> { + ) -> ProviderResult, Arc>)>> { let transactions = self.get_block_transactions(block_hash); let receipts = self.get_receipts(block_hash); @@ -214,7 +215,10 @@ impl EthStateCache { /// Requests the [Receipt] for the block hash /// /// Returns `None` if the block was not found. - pub async fn get_receipts(&self, block_hash: B256) -> ProviderResult>> { + pub async fn get_receipts( + &self, + block_hash: B256, + ) -> ProviderResult>>> { let (response_tx, rx) = oneshot::channel(); let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx }); rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? @@ -224,7 +228,7 @@ impl EthStateCache { pub async fn get_block_and_receipts( &self, block_hash: B256, - ) -> ProviderResult)>> { + ) -> ProviderResult>)>> { let block = self.get_sealed_block(block_hash); let receipts = self.get_receipts(block_hash); @@ -268,7 +272,7 @@ pub(crate) struct EthStateCacheService< LimitEnvs = ByLength, > where LimitBlocks: Limiter, - LimitReceipts: Limiter>, + LimitReceipts: Limiter>>, LimitEnvs: Limiter, { /// The type used to lookup data from disk @@ -318,7 +322,11 @@ where } } - fn on_new_receipts(&mut self, block_hash: B256, res: ProviderResult>>) { + fn on_new_receipts( + &mut self, + block_hash: B256, + res: ProviderResult>>>, + ) { if let Some(queued) = self.receipts_cache.remove(&block_hash) { // send the response to queued senders for tx in queued { @@ -426,7 +434,10 @@ where this.action_task_spawner.spawn_blocking(Box::pin(async move { // Acquire permit let _permit = rate_limiter.acquire().await; - let res = provider.receipts_by_block(block_hash.into()); + let res = provider + .receipts_by_block(block_hash.into()) + .map(|maybe_receipts| maybe_receipts.map(Arc::new)); + let _ = action_tx .send(CacheAction::ReceiptsResult { block_hash, res }); })); @@ -496,9 +507,9 @@ where for block_receipts in receipts { this.on_new_receipts( block_receipts.block_hash, - Ok(Some( + Ok(Some(Arc::new( block_receipts.receipts.into_iter().flatten().collect(), - )), + ))), ); } } @@ -517,7 +528,7 @@ enum CacheAction { GetEnv { block_hash: B256, response_tx: EnvResponseSender }, GetReceipts { block_hash: B256, response_tx: ReceiptsResponseSender }, BlockWithSendersResult { block_hash: B256, res: ProviderResult> }, - ReceiptsResult { block_hash: B256, res: ProviderResult>> }, + ReceiptsResult { block_hash: B256, res: ProviderResult>>> }, EnvResult { block_hash: B256, res: Box> }, CacheNewCanonicalChain { blocks: Vec, receipts: Vec }, } diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 96cb687e63..e6403d11a8 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -359,7 +359,7 @@ where &mut all_logs, &filter, (block_hash, block.number).into(), - block.body.into_iter().map(|tx| tx.hash()).zip(receipts), + block.body.into_iter().map(|tx| tx.hash()).zip(receipts.iter()), false, ); } @@ -406,7 +406,7 @@ where async fn block_and_receipts_by_number( &self, hash_or_number: BlockHashOrNumber, - ) -> EthResult)>> { + ) -> EthResult>)>> { let block_hash = match self.provider.convert_block_hash(hash_or_number)? { Some(hash) => hash, None => return Ok(None), @@ -467,7 +467,7 @@ where &mut all_logs, &filter_params, (block.number, block_hash).into(), - block.body.into_iter().map(|tx| tx.hash()).zip(receipts), + block.body.into_iter().map(|tx| tx.hash()).zip(receipts.iter()), false, ); diff --git a/crates/rpc/rpc/src/eth/logs_utils.rs b/crates/rpc/rpc/src/eth/logs_utils.rs index 2fdd711356..fc2edb0451 100644 --- a/crates/rpc/rpc/src/eth/logs_utils.rs +++ b/crates/rpc/rpc/src/eth/logs_utils.rs @@ -3,14 +3,14 @@ use reth_rpc_types::{FilteredParams, Log}; use reth_rpc_types_compat::log::from_primitive_log; /// Returns all matching logs of a block's receipts grouped with the hash of their transaction. -pub(crate) fn matching_block_logs( +pub(crate) fn matching_block_logs<'a, I>( filter: &FilteredParams, block: BlockNumHash, tx_and_receipts: I, removed: bool, ) -> Vec where - I: IntoIterator, + I: IntoIterator, { let mut all_logs = Vec::new(); append_matching_block_logs(&mut all_logs, filter, block, tx_and_receipts, removed); @@ -18,26 +18,25 @@ where } /// Appends all matching logs of a block's receipts grouped with the hash of their transaction -pub(crate) fn append_matching_block_logs( +pub(crate) fn append_matching_block_logs<'a, I>( all_logs: &mut Vec, filter: &FilteredParams, block: BlockNumHash, tx_and_receipts: I, removed: bool, ) where - I: IntoIterator, + I: IntoIterator, { let block_number_u256 = U256::from(block.number); // tracks the index of a log in the entire block let mut log_index: u32 = 0; for (transaction_idx, (transaction_hash, receipt)) in tx_and_receipts.into_iter().enumerate() { - let logs = receipt.logs; - for log in logs.into_iter() { - if log_matches_filter(block, &log, filter) { + for log in receipt.logs.iter() { + if log_matches_filter(block, log, filter) { let log = Log { address: log.address, - topics: log.topics, - data: log.data, + topics: log.topics.clone(), + data: log.data.clone(), block_hash: Some(block.hash), block_number: Some(block_number_u256), transaction_hash: Some(transaction_hash), diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index b720ebec16..aa746464e4 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -311,7 +311,7 @@ where let all_logs = logs_utils::matching_block_logs( &filter, block_receipts.block, - block_receipts.tx_receipts, + block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)), removed, ); futures::stream::iter(all_logs)