From ab685579f038f445e7601ec877a3914af22a2d66 Mon Sep 17 00:00:00 2001 From: figtracer <1gusredo@gmail.com> Date: Mon, 26 Jan 2026 14:37:53 +0000 Subject: [PATCH] feat(rpc): add transaction hash caching to EthStateCache (#21180) Co-authored-by: Matthias Seitz Co-authored-by: Amp --- crates/node/core/src/args/rpc_server.rs | 1 + crates/node/core/src/args/rpc_state_cache.rs | 10 ++- crates/rpc/rpc-builder/src/config.rs | 1 + .../rpc-eth-api/src/helpers/transaction.rs | 15 +++- crates/rpc/rpc-eth-types/src/block.rs | 57 ++++++++++++++- crates/rpc/rpc-eth-types/src/cache/config.rs | 5 +- crates/rpc/rpc-eth-types/src/cache/mod.rs | 73 ++++++++++++++++++- crates/rpc/rpc-eth-types/src/lib.rs | 1 + crates/rpc/rpc-eth-types/src/simulate.rs | 4 +- crates/rpc/rpc-server-types/src/constants.rs | 3 + docs/vocs/docs/pages/cli/op-reth/node.mdx | 5 ++ docs/vocs/docs/pages/cli/reth/node.mdx | 5 ++ 12 files changed, 169 insertions(+), 11 deletions(-) diff --git a/crates/node/core/src/args/rpc_server.rs b/crates/node/core/src/args/rpc_server.rs index 0b0dcc066a..e44433b1db 100644 --- a/crates/node/core/src/args/rpc_server.rs +++ b/crates/node/core/src/args/rpc_server.rs @@ -1025,6 +1025,7 @@ mod tests { max_receipts: 2000, max_headers: 1000, max_concurrent_db_requests: 512, + max_cached_tx_hashes: 30_000, }, gas_price_oracle: GasPriceOracleArgs { blocks: 20, diff --git a/crates/node/core/src/args/rpc_state_cache.rs b/crates/node/core/src/args/rpc_state_cache.rs index 9568c09f3d..054fe78e5e 100644 --- a/crates/node/core/src/args/rpc_state_cache.rs +++ b/crates/node/core/src/args/rpc_state_cache.rs @@ -1,7 +1,7 @@ use clap::Args; use reth_rpc_server_types::constants::cache::{ DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN, - DEFAULT_RECEIPT_CACHE_MAX_LEN, + DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN, }; /// Parameters to configure RPC state cache. @@ -36,6 +36,13 @@ pub struct RpcStateCacheArgs { default_value_t = DEFAULT_CONCURRENT_DB_REQUESTS, )] pub max_concurrent_db_requests: usize, + + /// Maximum number of transaction hashes to cache for transaction lookups. + #[arg( + long = "rpc-cache.max-cached-tx-hashes", + default_value_t = DEFAULT_MAX_CACHED_TX_HASHES, + )] + pub max_cached_tx_hashes: u32, } impl RpcStateCacheArgs { @@ -54,6 +61,7 @@ impl Default for RpcStateCacheArgs { max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN, max_headers: DEFAULT_HEADER_CACHE_MAX_LEN, max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS, + max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES, } } } diff --git a/crates/rpc/rpc-builder/src/config.rs b/crates/rpc/rpc-builder/src/config.rs index 1acd6744ed..2d90d3c55c 100644 --- a/crates/rpc/rpc-builder/src/config.rs +++ b/crates/rpc/rpc-builder/src/config.rs @@ -122,6 +122,7 @@ impl RethRpcServerConfig for RpcServerArgs { max_receipts: self.rpc_state_cache.max_receipts, max_headers: self.rpc_state_cache.max_headers, max_concurrent_db_requests: self.rpc_state_cache.max_concurrent_db_requests, + max_cached_tx_hashes: self.rpc_state_cache.max_cached_tx_hashes, } } diff --git a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs index 7861362a38..7a07383490 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs @@ -619,7 +619,20 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt { Output = Result>>, Self::Error>, > + Send { async move { - // Try to find the transaction on disk + // First, try the RPC cache + if let Some(cached) = self.cache().get_transaction_by_hash(hash).await && + let Some(tx) = cached.recovered_transaction() + { + return Ok(Some(TransactionSource::Block { + transaction: tx.cloned(), + index: cached.tx_index as u64, + block_hash: cached.block.hash(), + block_number: cached.block.number(), + base_fee: cached.block.base_fee_per_gas(), + })); + } + + // Cache miss - try to find the transaction on disk if let Some((tx, meta)) = self .spawn_blocking_io(move |this| { this.provider() diff --git a/crates/rpc/rpc-eth-types/src/block.rs b/crates/rpc/rpc-eth-types/src/block.rs index 8e8420f180..6316effa36 100644 --- a/crates/rpc/rpc-eth-types/src/block.rs +++ b/crates/rpc/rpc-eth-types/src/block.rs @@ -2,15 +2,68 @@ use std::sync::Arc; -use alloy_consensus::TxReceipt; +use alloy_consensus::{transaction::TxHashRef, TxReceipt}; use alloy_primitives::TxHash; use reth_primitives_traits::{ - BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedBlock, + Block, BlockBody, BlockTy, IndexedTx, NodePrimitives, ReceiptTy, Recovered, RecoveredBlock, + SealedBlock, }; use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes}; use crate::utils::calculate_gas_used_and_next_log_index; +/// Cached data for a transaction lookup. +#[derive(Debug, Clone)] +pub struct CachedTransaction { + /// The block containing this transaction. + pub block: Arc>, + /// Index of the transaction within the block. + pub tx_index: usize, + /// Receipts for the block, if available. + pub receipts: Option>>, +} + +impl CachedTransaction { + /// Creates a new cached transaction entry. + pub const fn new( + block: Arc>, + tx_index: usize, + receipts: Option>>, + ) -> Self { + Self { block, tx_index, receipts } + } + + /// Returns the `Recovered<&T>` transaction at the cached index. + pub fn recovered_transaction(&self) -> Option::Transaction>> { + self.block.recovered_transaction(self.tx_index) + } + + /// Converts this cached transaction into an RPC receipt using the given converter. + /// + /// Returns `None` if receipts are not available or the transaction index is out of bounds. + pub fn into_receipt( + self, + converter: &C, + ) -> Option::Receipt, C::Error>> + where + N: NodePrimitives, + R: TxReceipt + Clone, + C: RpcConvert, + { + let receipts = self.receipts?; + let receipt = receipts.get(self.tx_index)?; + let tx_hash = *self.block.body().transactions().get(self.tx_index)?.tx_hash(); + let tx = self.block.find_indexed(tx_hash)?; + convert_transaction_receipt::( + self.block.as_ref(), + receipts.as_ref(), + tx, + receipt, + converter, + ) + } +} + /// A pair of an [`Arc`] wrapped [`RecoveredBlock`] and its corresponding receipts. /// /// This type is used throughout the RPC layer to efficiently pass around diff --git a/crates/rpc/rpc-eth-types/src/cache/config.rs b/crates/rpc/rpc-eth-types/src/cache/config.rs index 001a5b4d4d..1f908e57aa 100644 --- a/crates/rpc/rpc-eth-types/src/cache/config.rs +++ b/crates/rpc/rpc-eth-types/src/cache/config.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use reth_rpc_server_types::constants::cache::{ DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN, - DEFAULT_RECEIPT_CACHE_MAX_LEN, + DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN, }; /// Settings for the [`EthStateCache`](super::EthStateCache). @@ -27,6 +27,8 @@ pub struct EthStateCacheConfig { /// /// Default is 512. pub max_concurrent_db_requests: usize, + /// Maximum number of transaction hashes to cache for transaction lookups. + pub max_cached_tx_hashes: u32, } impl Default for EthStateCacheConfig { @@ -36,6 +38,7 @@ impl Default for EthStateCacheConfig { max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN, max_headers: DEFAULT_HEADER_CACHE_MAX_LEN, max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS, + max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES, } } } diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 73d8072e6d..7ae10da83a 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -1,17 +1,18 @@ //! Async caching support for eth RPC use super::{EthStateCacheConfig, MultiConsumerLruCache}; -use alloy_consensus::BlockHeader; +use crate::block::CachedTransaction; +use alloy_consensus::{transaction::TxHashRef, BlockHeader}; use alloy_eips::BlockHashOrNumber; -use alloy_primitives::B256; +use alloy_primitives::{TxHash, B256}; use futures::{stream::FuturesOrdered, Stream, StreamExt}; use reth_chain_state::CanonStateNotification; use reth_errors::{ProviderError, ProviderResult}; use reth_execution_types::Chain; -use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock}; +use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock}; use reth_storage_api::{BlockReader, TransactionVariant}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; -use schnellru::{ByLength, Limiter}; +use schnellru::{ByLength, Limiter, LruMap}; use std::{ future::Future, pin::Pin, @@ -47,6 +48,9 @@ type HeaderResponseSender = oneshot::Sender>; /// The type that can send the response with a chain of cached blocks type CachedParentBlocksResponseSender = oneshot::Sender>>>; +/// The type that can send the response for a transaction hash lookup +type TransactionHashResponseSender = oneshot::Sender>>; + type BlockLruCache = MultiConsumerLruCache>, L, BlockWithSendersResponseSender>; @@ -79,11 +83,13 @@ impl EthStateCache { max_receipts: u32, max_headers: u32, max_concurrent_db_operations: usize, + max_cached_tx_hashes: u32, ) -> (Self, EthStateCacheService) where Provider: BlockReader, { let (to_service, rx) = unbounded_channel(); + let service = EthStateCacheService { provider, full_block_cache: BlockLruCache::new(max_blocks, "blocks"), @@ -93,6 +99,7 @@ impl EthStateCache { action_rx: UnboundedReceiverStream::new(rx), action_task_spawner, rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)), + tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)), }; let cache = Self { to_service }; (cache, service) @@ -127,6 +134,7 @@ impl EthStateCache { max_receipts, max_headers, max_concurrent_db_requests, + max_cached_tx_hashes, } = config; let (this, service) = Self::create( provider, @@ -135,6 +143,7 @@ impl EthStateCache { max_receipts, max_headers, max_concurrent_db_requests, + max_cached_tx_hashes, ); executor.spawn_critical("eth state cache", Box::pin(service)); this @@ -255,6 +264,19 @@ impl EthStateCache { Some(blocks) } } + + /// Looks up a transaction by its hash in the cache index. + /// + /// Returns the cached block, transaction index, and optionally receipts if the transaction + /// is in a cached block. + pub async fn get_transaction_by_hash( + &self, + tx_hash: TxHash, + ) -> Option> { + let (response_tx, rx) = oneshot::channel(); + let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx }); + rx.await.ok()? + } } /// Thrown when the cache service task dropped. #[derive(Debug, thiserror::Error)] @@ -317,6 +339,8 @@ pub(crate) struct EthStateCacheService< /// /// This restricts the max concurrent fetch tasks at the same time. rate_limiter: Arc, + /// LRU index mapping transaction hashes to their block hash and index within the block. + tx_hash_index: LruMap, } impl EthStateCacheService @@ -324,6 +348,29 @@ where Provider: BlockReader + Clone + Unpin + 'static, Tasks: TaskSpawner + Clone + 'static, { + /// Indexes all transactions in a block by transaction hash. + fn index_block_transactions(&mut self, block: &RecoveredBlock) { + let block_hash = block.hash(); + for (tx_idx, tx) in block.body().transactions().iter().enumerate() { + self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx)); + } + } + + /// Removes transaction index entries for a reorged block. + /// + /// Only removes entries that still point to this block, preserving mappings for transactions + /// that were re-mined in a new canonical block. + fn remove_block_transactions(&mut self, block: &RecoveredBlock) { + let block_hash = block.hash(); + for tx in block.body().transactions() { + if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) && + *mapped_hash == block_hash + { + self.tx_hash_index.remove(tx.tx_hash()); + } + } + } + fn on_new_block( &mut self, block_hash: B256, @@ -550,6 +597,8 @@ where } CacheAction::CacheNewCanonicalChain { chain_change } => { for block in chain_change.blocks { + // Index transactions before caching the block + this.index_block_transactions(&block); this.on_new_block(block.hash(), Ok(Some(Arc::new(block)))); } @@ -562,6 +611,8 @@ where } CacheAction::RemoveReorgedChain { chain_change } => { for block in chain_change.blocks { + // Remove transaction index entries for reorged blocks + this.remove_block_transactions(&block); this.on_reorg_block(block.hash(), Ok(Some(block))); } @@ -596,6 +647,15 @@ where let _ = response_tx.send(blocks); } + CacheAction::GetTransactionByHash { tx_hash, response_tx } => { + let result = + this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| { + let block = this.full_block_cache.get(block_hash).cloned()?; + let receipts = this.receipts_cache.get(block_hash).cloned(); + Some(CachedTransaction::new(block, *idx, receipts)) + }); + let _ = response_tx.send(result); + } }; this.update_cached_metrics(); } @@ -649,6 +709,11 @@ enum CacheAction { max_blocks: usize, response_tx: CachedParentBlocksResponseSender, }, + /// Look up a transaction's cached data by its hash + GetTransactionByHash { + tx_hash: TxHash, + response_tx: TransactionHashResponseSender, + }, } struct BlockReceipts { diff --git a/crates/rpc/rpc-eth-types/src/lib.rs b/crates/rpc/rpc-eth-types/src/lib.rs index f5b7e07cea..ef234e33aa 100644 --- a/crates/rpc/rpc-eth-types/src/lib.rs +++ b/crates/rpc/rpc-eth-types/src/lib.rs @@ -27,6 +27,7 @@ pub mod tx_forward; pub mod utils; pub use alloy_rpc_types_eth::FillTransaction; +pub use block::CachedTransaction; pub use builder::config::{EthConfig, EthFilterConfig}; pub use cache::{ config::EthStateCacheConfig, db::StateCacheDb, multi_consumer::MultiConsumerLruCache, diff --git a/crates/rpc/rpc-eth-types/src/simulate.rs b/crates/rpc/rpc-eth-types/src/simulate.rs index c54e31d8a7..70c1267dc5 100644 --- a/crates/rpc/rpc-eth-types/src/simulate.rs +++ b/crates/rpc/rpc-eth-types/src/simulate.rs @@ -173,7 +173,7 @@ pub fn apply_precompile_overrides( return Ok(()); } - for (source, _dest) in &moves { + for (source, _) in &moves { if precompiles.get(source).is_none() { return Err(EthSimulateError::NotAPrecompile(*source)); } @@ -197,7 +197,7 @@ pub fn apply_precompile_overrides( } } - for (_source, dest, precompile) in extracted { + for (_, dest, precompile) in extracted { precompiles.apply_precompile(&dest, |_| Some(precompile)); } diff --git a/crates/rpc/rpc-server-types/src/constants.rs b/crates/rpc/rpc-server-types/src/constants.rs index acf5294fe9..8f52f611db 100644 --- a/crates/rpc/rpc-server-types/src/constants.rs +++ b/crates/rpc/rpc-server-types/src/constants.rs @@ -132,4 +132,7 @@ pub mod cache { /// Default number of concurrent database requests. pub const DEFAULT_CONCURRENT_DB_REQUESTS: usize = 512; + + /// Default maximum number of transaction hashes to cache for lookups. + pub const DEFAULT_MAX_CACHED_TX_HASHES: u32 = 30_000; } diff --git a/docs/vocs/docs/pages/cli/op-reth/node.mdx b/docs/vocs/docs/pages/cli/op-reth/node.mdx index 98205ad008..be719a4d1b 100644 --- a/docs/vocs/docs/pages/cli/op-reth/node.mdx +++ b/docs/vocs/docs/pages/cli/op-reth/node.mdx @@ -501,6 +501,11 @@ RPC State Cache: [default: 512] + --rpc-cache.max-cached-tx-hashes + Maximum number of transaction hashes to cache for transaction lookups + + [default: 30000] + Gas Price Oracle: --gpo.blocks Number of recent blocks to check for gas price diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index 75ee10f7c1..eb8e4d437c 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -501,6 +501,11 @@ RPC State Cache: [default: 512] + --rpc-cache.max-cached-tx-hashes + Maximum number of transaction hashes to cache for transaction lookups + + [default: 30000] + Gas Price Oracle: --gpo.blocks Number of recent blocks to check for gas price