feat(rpc): add transaction hash caching to EthStateCache (#21180)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
figtracer
2026-01-26 14:37:53 +00:00
committed by GitHub
parent c7faafd183
commit ab685579f0
12 changed files with 169 additions and 11 deletions

View File

@@ -1025,6 +1025,7 @@ mod tests {
max_receipts: 2000,
max_headers: 1000,
max_concurrent_db_requests: 512,
max_cached_tx_hashes: 30_000,
},
gas_price_oracle: GasPriceOracleArgs {
blocks: 20,

View File

@@ -1,7 +1,7 @@
use clap::Args;
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
};
/// Parameters to configure RPC state cache.
@@ -36,6 +36,13 @@ pub struct RpcStateCacheArgs {
default_value_t = DEFAULT_CONCURRENT_DB_REQUESTS,
)]
pub max_concurrent_db_requests: usize,
/// Maximum number of transaction hashes to cache for transaction lookups.
#[arg(
long = "rpc-cache.max-cached-tx-hashes",
default_value_t = DEFAULT_MAX_CACHED_TX_HASHES,
)]
pub max_cached_tx_hashes: u32,
}
impl RpcStateCacheArgs {
@@ -54,6 +61,7 @@ impl Default for RpcStateCacheArgs {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}

View File

@@ -122,6 +122,7 @@ impl RethRpcServerConfig for RpcServerArgs {
max_receipts: self.rpc_state_cache.max_receipts,
max_headers: self.rpc_state_cache.max_headers,
max_concurrent_db_requests: self.rpc_state_cache.max_concurrent_db_requests,
max_cached_tx_hashes: self.rpc_state_cache.max_cached_tx_hashes,
}
}

View File

@@ -619,7 +619,20 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
> + Send {
async move {
// Try to find the transaction on disk
// First, try the RPC cache
if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
let Some(tx) = cached.recovered_transaction()
{
return Ok(Some(TransactionSource::Block {
transaction: tx.cloned(),
index: cached.tx_index as u64,
block_hash: cached.block.hash(),
block_number: cached.block.number(),
base_fee: cached.block.base_fee_per_gas(),
}));
}
// Cache miss - try to find the transaction on disk
if let Some((tx, meta)) = self
.spawn_blocking_io(move |this| {
this.provider()

View File

@@ -2,15 +2,68 @@
use std::sync::Arc;
use alloy_consensus::TxReceipt;
use alloy_consensus::{transaction::TxHashRef, TxReceipt};
use alloy_primitives::TxHash;
use reth_primitives_traits::{
BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedBlock,
Block, BlockBody, BlockTy, IndexedTx, NodePrimitives, ReceiptTy, Recovered, RecoveredBlock,
SealedBlock,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes};
use crate::utils::calculate_gas_used_and_next_log_index;
/// Cached data for a transaction lookup.
#[derive(Debug, Clone)]
pub struct CachedTransaction<B: Block, R> {
/// The block containing this transaction.
pub block: Arc<RecoveredBlock<B>>,
/// Index of the transaction within the block.
pub tx_index: usize,
/// Receipts for the block, if available.
pub receipts: Option<Arc<Vec<R>>>,
}
impl<B: Block, R> CachedTransaction<B, R> {
/// Creates a new cached transaction entry.
pub const fn new(
block: Arc<RecoveredBlock<B>>,
tx_index: usize,
receipts: Option<Arc<Vec<R>>>,
) -> Self {
Self { block, tx_index, receipts }
}
/// Returns the `Recovered<&T>` transaction at the cached index.
pub fn recovered_transaction(&self) -> Option<Recovered<&<B::Body as BlockBody>::Transaction>> {
self.block.recovered_transaction(self.tx_index)
}
/// Converts this cached transaction into an RPC receipt using the given converter.
///
/// Returns `None` if receipts are not available or the transaction index is out of bounds.
pub fn into_receipt<N, C>(
self,
converter: &C,
) -> Option<Result<<C::Network as RpcTypes>::Receipt, C::Error>>
where
N: NodePrimitives<Block = B, Receipt = R>,
R: TxReceipt + Clone,
C: RpcConvert<Primitives = N>,
{
let receipts = self.receipts?;
let receipt = receipts.get(self.tx_index)?;
let tx_hash = *self.block.body().transactions().get(self.tx_index)?.tx_hash();
let tx = self.block.find_indexed(tx_hash)?;
convert_transaction_receipt::<N, C>(
self.block.as_ref(),
receipts.as_ref(),
tx,
receipt,
converter,
)
}
}
/// A pair of an [`Arc`] wrapped [`RecoveredBlock`] and its corresponding receipts.
///
/// This type is used throughout the RPC layer to efficiently pass around

View File

@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
};
/// Settings for the [`EthStateCache`](super::EthStateCache).
@@ -27,6 +27,8 @@ pub struct EthStateCacheConfig {
///
/// Default is 512.
pub max_concurrent_db_requests: usize,
/// Maximum number of transaction hashes to cache for transaction lookups.
pub max_cached_tx_hashes: u32,
}
impl Default for EthStateCacheConfig {
@@ -36,6 +38,7 @@ impl Default for EthStateCacheConfig {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}

View File

@@ -1,17 +1,18 @@
//! Async caching support for eth RPC
use super::{EthStateCacheConfig, MultiConsumerLruCache};
use alloy_consensus::BlockHeader;
use crate::block::CachedTransaction;
use alloy_consensus::{transaction::TxHashRef, BlockHeader};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use alloy_primitives::{TxHash, B256};
use futures::{stream::FuturesOrdered, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::Chain;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
use reth_storage_api::{BlockReader, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use schnellru::{ByLength, Limiter};
use schnellru::{ByLength, Limiter, LruMap};
use std::{
future::Future,
pin::Pin,
@@ -47,6 +48,9 @@ type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
/// The type that can send the response with a chain of cached blocks
type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
/// The type that can send the response for a transaction hash lookup
type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
type BlockLruCache<B, L> =
MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
@@ -79,11 +83,13 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts: u32,
max_headers: u32,
max_concurrent_db_operations: usize,
max_cached_tx_hashes: u32,
) -> (Self, EthStateCacheService<Provider, Tasks>)
where
Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
{
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
provider,
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
@@ -93,6 +99,7 @@ impl<N: NodePrimitives> EthStateCache<N> {
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
};
let cache = Self { to_service };
(cache, service)
@@ -127,6 +134,7 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
} = config;
let (this, service) = Self::create(
provider,
@@ -135,6 +143,7 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
);
executor.spawn_critical("eth state cache", Box::pin(service));
this
@@ -255,6 +264,19 @@ impl<N: NodePrimitives> EthStateCache<N> {
Some(blocks)
}
}
/// Looks up a transaction by its hash in the cache index.
///
/// Returns the cached block, transaction index, and optionally receipts if the transaction
/// is in a cached block.
pub async fn get_transaction_by_hash(
&self,
tx_hash: TxHash,
) -> Option<CachedTransaction<N::Block, N::Receipt>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
rx.await.ok()?
}
}
/// Thrown when the cache service task dropped.
#[derive(Debug, thiserror::Error)]
@@ -317,6 +339,8 @@ pub(crate) struct EthStateCacheService<
///
/// This restricts the max concurrent fetch tasks at the same time.
rate_limiter: Arc<Semaphore>,
/// LRU index mapping transaction hashes to their block hash and index within the block.
tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
}
impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
@@ -324,6 +348,29 @@ where
Provider: BlockReader + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
/// Indexes all transactions in a block by transaction hash.
fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
}
}
/// Removes transaction index entries for a reorged block.
///
/// Only removes entries that still point to this block, preserving mappings for transactions
/// that were re-mined in a new canonical block.
fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for tx in block.body().transactions() {
if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
*mapped_hash == block_hash
{
self.tx_hash_index.remove(tx.tx_hash());
}
}
}
fn on_new_block(
&mut self,
block_hash: B256,
@@ -550,6 +597,8 @@ where
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
for block in chain_change.blocks {
// Index transactions before caching the block
this.index_block_transactions(&block);
this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
}
@@ -562,6 +611,8 @@ where
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
// Remove transaction index entries for reorged blocks
this.remove_block_transactions(&block);
this.on_reorg_block(block.hash(), Ok(Some(block)));
}
@@ -596,6 +647,15 @@ where
let _ = response_tx.send(blocks);
}
CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
let result =
this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
let block = this.full_block_cache.get(block_hash).cloned()?;
let receipts = this.receipts_cache.get(block_hash).cloned();
Some(CachedTransaction::new(block, *idx, receipts))
});
let _ = response_tx.send(result);
}
};
this.update_cached_metrics();
}
@@ -649,6 +709,11 @@ enum CacheAction<B: Block, R> {
max_blocks: usize,
response_tx: CachedParentBlocksResponseSender<B>,
},
/// Look up a transaction's cached data by its hash
GetTransactionByHash {
tx_hash: TxHash,
response_tx: TransactionHashResponseSender<B, R>,
},
}
struct BlockReceipts<R> {

View File

@@ -27,6 +27,7 @@ pub mod tx_forward;
pub mod utils;
pub use alloy_rpc_types_eth::FillTransaction;
pub use block::CachedTransaction;
pub use builder::config::{EthConfig, EthFilterConfig};
pub use cache::{
config::EthStateCacheConfig, db::StateCacheDb, multi_consumer::MultiConsumerLruCache,

View File

@@ -173,7 +173,7 @@ pub fn apply_precompile_overrides(
return Ok(());
}
for (source, _dest) in &moves {
for (source, _) in &moves {
if precompiles.get(source).is_none() {
return Err(EthSimulateError::NotAPrecompile(*source));
}
@@ -197,7 +197,7 @@ pub fn apply_precompile_overrides(
}
}
for (_source, dest, precompile) in extracted {
for (_, dest, precompile) in extracted {
precompiles.apply_precompile(&dest, |_| Some(precompile));
}

View File

@@ -132,4 +132,7 @@ pub mod cache {
/// Default number of concurrent database requests.
pub const DEFAULT_CONCURRENT_DB_REQUESTS: usize = 512;
/// Default maximum number of transaction hashes to cache for lookups.
pub const DEFAULT_MAX_CACHED_TX_HASHES: u32 = 30_000;
}

View File

@@ -501,6 +501,11 @@ RPC State Cache:
[default: 512]
--rpc-cache.max-cached-tx-hashes <MAX_CACHED_TX_HASHES>
Maximum number of transaction hashes to cache for transaction lookups
[default: 30000]
Gas Price Oracle:
--gpo.blocks <BLOCKS>
Number of recent blocks to check for gas price

View File

@@ -501,6 +501,11 @@ RPC State Cache:
[default: 512]
--rpc-cache.max-cached-tx-hashes <MAX_CACHED_TX_HASHES>
Maximum number of transaction hashes to cache for transaction lookups
[default: 30000]
Gas Price Oracle:
--gpo.blocks <BLOCKS>
Number of recent blocks to check for gas price