Archify EthStateCache (#5744)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Aditya Pandey
2023-12-15 18:52:19 +05:30
committed by GitHub
parent 84b1bb01db
commit a0129b6b88
6 changed files with 44 additions and 29 deletions

View File

@@ -1,5 +1,7 @@
//! Contains RPC handler implementations specific to blocks.
use std::sync::Arc;
use crate::{
eth::{
api::transactions::build_transaction_receipt_with_block_receipts,
@@ -65,7 +67,10 @@ where
let mut block_and_receipts = None;
if block_id.is_pending() {
block_and_receipts = self.provider().pending_block_and_receipts()?;
block_and_receipts = self
.provider()
.pending_block_and_receipts()?
.map(|(sb, receipts)| (sb, Arc::new(receipts)));
} else if let Some(block_hash) = self.provider().block_hash_for_id(block_id)? {
block_and_receipts = self.cache().get_block_and_receipts(block_hash).await?;
}
@@ -87,7 +92,7 @@ where
let receipts = block
.body
.into_iter()
.zip(receipts.clone())
.zip(receipts.iter())
.enumerate()
.map(|(idx, (tx, receipt))| {
let meta = TransactionMeta {
@@ -106,7 +111,7 @@ where
build_transaction_receipt_with_block_receipts(
tx,
meta,
receipt,
receipt.clone(),
&receipts,
#[cfg(feature = "optimism")]
op_tx_meta,

View File

@@ -70,7 +70,7 @@ impl FeeHistoryCache {
/// Insert block data into the cache.
async fn insert_blocks<I>(&self, blocks: I)
where
I: Iterator<Item = (SealedBlock, Vec<Receipt>)>,
I: Iterator<Item = (SealedBlock, Arc<Vec<Receipt>>)>,
{
let mut entries = self.inner.entries.write().await;
@@ -244,7 +244,7 @@ pub async fn fee_history_cache_new_blocks_task<St, Provider>(
let (blocks, receipts): (Vec<_>, Vec<_>) = committed
.blocks_and_receipts()
.map(|(block, receipts)| {
(block.block.clone(), receipts.iter().flatten().cloned().collect::<Vec<_>>())
(block.block.clone(), Arc::new(receipts.iter().flatten().cloned().collect::<Vec<_>>()))
})
.unzip();
fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await;

View File

@@ -40,7 +40,7 @@ type BlockTransactionsResponseSender =
type BlockWithSendersResponseSender = oneshot::Sender<ProviderResult<Option<BlockWithSenders>>>;
/// The type that can send the response to the requested receipts of a block.
type ReceiptsResponseSender = oneshot::Sender<ProviderResult<Option<Vec<Receipt>>>>;
type ReceiptsResponseSender = oneshot::Sender<ProviderResult<Option<Arc<Vec<Receipt>>>>>;
/// The type that can send the response to a requested env
type EnvResponseSender = oneshot::Sender<ProviderResult<(CfgEnv, BlockEnv)>>;
@@ -52,7 +52,8 @@ type BlockLruCache<L> = MultiConsumerLruCache<
Either<BlockWithSendersResponseSender, BlockTransactionsResponseSender>,
>;
type ReceiptsLruCache<L> = MultiConsumerLruCache<B256, Vec<Receipt>, L, ReceiptsResponseSender>;
type ReceiptsLruCache<L> =
MultiConsumerLruCache<B256, Arc<Vec<Receipt>>, L, ReceiptsResponseSender>;
type EnvLruCache<L> = MultiConsumerLruCache<B256, (CfgEnv, BlockEnv), L, EnvResponseSender>;
@@ -180,7 +181,7 @@ impl EthStateCache {
pub async fn get_transactions_and_receipts(
&self,
block_hash: B256,
) -> ProviderResult<Option<(Vec<TransactionSigned>, Vec<Receipt>)>> {
) -> ProviderResult<Option<(Vec<TransactionSigned>, Arc<Vec<Receipt>>)>> {
let transactions = self.get_block_transactions(block_hash);
let receipts = self.get_receipts(block_hash);
@@ -214,7 +215,10 @@ impl EthStateCache {
/// Requests the [Receipt] for the block hash
///
/// Returns `None` if the block was not found.
pub async fn get_receipts(&self, block_hash: B256) -> ProviderResult<Option<Vec<Receipt>>> {
pub async fn get_receipts(
&self,
block_hash: B256,
) -> ProviderResult<Option<Arc<Vec<Receipt>>>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
@@ -224,7 +228,7 @@ impl EthStateCache {
pub async fn get_block_and_receipts(
&self,
block_hash: B256,
) -> ProviderResult<Option<(SealedBlock, Vec<Receipt>)>> {
) -> ProviderResult<Option<(SealedBlock, Arc<Vec<Receipt>>)>> {
let block = self.get_sealed_block(block_hash);
let receipts = self.get_receipts(block_hash);
@@ -268,7 +272,7 @@ pub(crate) struct EthStateCacheService<
LimitEnvs = ByLength,
> where
LimitBlocks: Limiter<B256, BlockWithSenders>,
LimitReceipts: Limiter<B256, Vec<Receipt>>,
LimitReceipts: Limiter<B256, Arc<Vec<Receipt>>>,
LimitEnvs: Limiter<B256, (CfgEnv, BlockEnv)>,
{
/// The type used to lookup data from disk
@@ -318,7 +322,11 @@ where
}
}
fn on_new_receipts(&mut self, block_hash: B256, res: ProviderResult<Option<Vec<Receipt>>>) {
fn on_new_receipts(
&mut self,
block_hash: B256,
res: ProviderResult<Option<Arc<Vec<Receipt>>>>,
) {
if let Some(queued) = self.receipts_cache.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
@@ -426,7 +434,10 @@ where
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
let res = provider.receipts_by_block(block_hash.into());
let res = provider
.receipts_by_block(block_hash.into())
.map(|maybe_receipts| maybe_receipts.map(Arc::new));
let _ = action_tx
.send(CacheAction::ReceiptsResult { block_hash, res });
}));
@@ -496,9 +507,9 @@ where
for block_receipts in receipts {
this.on_new_receipts(
block_receipts.block_hash,
Ok(Some(
Ok(Some(Arc::new(
block_receipts.receipts.into_iter().flatten().collect(),
)),
))),
);
}
}
@@ -517,7 +528,7 @@ enum CacheAction {
GetEnv { block_hash: B256, response_tx: EnvResponseSender },
GetReceipts { block_hash: B256, response_tx: ReceiptsResponseSender },
BlockWithSendersResult { block_hash: B256, res: ProviderResult<Option<BlockWithSenders>> },
ReceiptsResult { block_hash: B256, res: ProviderResult<Option<Vec<Receipt>>> },
ReceiptsResult { block_hash: B256, res: ProviderResult<Option<Arc<Vec<Receipt>>>> },
EnvResult { block_hash: B256, res: Box<ProviderResult<(CfgEnv, BlockEnv)>> },
CacheNewCanonicalChain { blocks: Vec<SealedBlockWithSenders>, receipts: Vec<BlockReceipts> },
}

View File

@@ -359,7 +359,7 @@ where
&mut all_logs,
&filter,
(block_hash, block.number).into(),
block.body.into_iter().map(|tx| tx.hash()).zip(receipts),
block.body.into_iter().map(|tx| tx.hash()).zip(receipts.iter()),
false,
);
}
@@ -406,7 +406,7 @@ where
async fn block_and_receipts_by_number(
&self,
hash_or_number: BlockHashOrNumber,
) -> EthResult<Option<(SealedBlock, Vec<Receipt>)>> {
) -> EthResult<Option<(SealedBlock, Arc<Vec<Receipt>>)>> {
let block_hash = match self.provider.convert_block_hash(hash_or_number)? {
Some(hash) => hash,
None => return Ok(None),
@@ -467,7 +467,7 @@ where
&mut all_logs,
&filter_params,
(block.number, block_hash).into(),
block.body.into_iter().map(|tx| tx.hash()).zip(receipts),
block.body.into_iter().map(|tx| tx.hash()).zip(receipts.iter()),
false,
);

View File

@@ -3,14 +3,14 @@ use reth_rpc_types::{FilteredParams, Log};
use reth_rpc_types_compat::log::from_primitive_log;
/// Returns all matching logs of a block's receipts grouped with the hash of their transaction.
pub(crate) fn matching_block_logs<I>(
pub(crate) fn matching_block_logs<'a, I>(
filter: &FilteredParams,
block: BlockNumHash,
tx_and_receipts: I,
removed: bool,
) -> Vec<Log>
where
I: IntoIterator<Item = (TxHash, Receipt)>,
I: IntoIterator<Item = (TxHash, &'a Receipt)>,
{
let mut all_logs = Vec::new();
append_matching_block_logs(&mut all_logs, filter, block, tx_and_receipts, removed);
@@ -18,26 +18,25 @@ where
}
/// Appends all matching logs of a block's receipts grouped with the hash of their transaction
pub(crate) fn append_matching_block_logs<I>(
pub(crate) fn append_matching_block_logs<'a, I>(
all_logs: &mut Vec<Log>,
filter: &FilteredParams,
block: BlockNumHash,
tx_and_receipts: I,
removed: bool,
) where
I: IntoIterator<Item = (TxHash, Receipt)>,
I: IntoIterator<Item = (TxHash, &'a Receipt)>,
{
let block_number_u256 = U256::from(block.number);
// tracks the index of a log in the entire block
let mut log_index: u32 = 0;
for (transaction_idx, (transaction_hash, receipt)) in tx_and_receipts.into_iter().enumerate() {
let logs = receipt.logs;
for log in logs.into_iter() {
if log_matches_filter(block, &log, filter) {
for log in receipt.logs.iter() {
if log_matches_filter(block, log, filter) {
let log = Log {
address: log.address,
topics: log.topics,
data: log.data,
topics: log.topics.clone(),
data: log.data.clone(),
block_hash: Some(block.hash),
block_number: Some(block_number_u256),
transaction_hash: Some(transaction_hash),

View File

@@ -311,7 +311,7 @@ where
let all_logs = logs_utils::matching_block_logs(
&filter,
block_receipts.block,
block_receipts.tx_receipts,
block_receipts.tx_receipts.iter().map(|(tx, receipt)| (*tx, receipt)),
removed,
);
futures::stream::iter(all_logs)