mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-26 23:58:46 -05:00
perf(rpc): use Arc<BlockWithSenders on full_block_cache (#11585)
This commit is contained in:
@@ -565,7 +565,7 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes {
|
||||
.get_block_with_senders(block_hash)
|
||||
.await
|
||||
.map_err(Self::Error::from_eth_err)?;
|
||||
Ok(block.map(|block| (transaction, block.seal(block_hash))))
|
||||
Ok(block.map(|block| (transaction, (*block).clone().seal(block_hash))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
53
crates/rpc/rpc-eth-types/src/cache/mod.rs
vendored
53
crates/rpc/rpc-eth-types/src/cache/mod.rs
vendored
@@ -38,7 +38,8 @@ type BlockTransactionsResponseSender =
|
||||
oneshot::Sender<ProviderResult<Option<Vec<TransactionSigned>>>>;
|
||||
|
||||
/// The type that can send the response to a requested [`BlockWithSenders`]
|
||||
type BlockWithSendersResponseSender = oneshot::Sender<ProviderResult<Option<BlockWithSenders>>>;
|
||||
type BlockWithSendersResponseSender =
|
||||
oneshot::Sender<ProviderResult<Option<Arc<BlockWithSenders>>>>;
|
||||
|
||||
/// The type that can send the response to the requested receipts of a block.
|
||||
type ReceiptsResponseSender = oneshot::Sender<ProviderResult<Option<Arc<Vec<Receipt>>>>>;
|
||||
@@ -48,7 +49,7 @@ type EnvResponseSender = oneshot::Sender<ProviderResult<(CfgEnvWithHandlerCfg, B
|
||||
|
||||
type BlockLruCache<L> = MultiConsumerLruCache<
|
||||
B256,
|
||||
BlockWithSenders,
|
||||
Arc<BlockWithSenders>,
|
||||
L,
|
||||
Either<BlockWithSendersResponseSender, BlockTransactionsResponseSender>,
|
||||
>;
|
||||
@@ -151,7 +152,7 @@ impl EthStateCache {
|
||||
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?;
|
||||
|
||||
if let Ok(Some(block_with_senders)) = block_with_senders_res {
|
||||
Ok(Some(block_with_senders.block))
|
||||
Ok(Some(block_with_senders.block.clone()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
@@ -186,7 +187,7 @@ impl EthStateCache {
|
||||
Ok(self
|
||||
.get_block_with_senders(block_hash)
|
||||
.await?
|
||||
.map(|block| block.into_transactions_ecrecovered().collect()))
|
||||
.map(|block| (*block).clone().into_transactions_ecrecovered().collect()))
|
||||
}
|
||||
|
||||
/// Fetches both transactions and receipts for the given block hash.
|
||||
@@ -208,7 +209,7 @@ impl EthStateCache {
|
||||
pub async fn get_block_with_senders(
|
||||
&self,
|
||||
block_hash: B256,
|
||||
) -> ProviderResult<Option<BlockWithSenders>> {
|
||||
) -> ProviderResult<Option<Arc<BlockWithSenders>>> {
|
||||
let (response_tx, rx) = oneshot::channel();
|
||||
let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
|
||||
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
|
||||
@@ -221,7 +222,10 @@ impl EthStateCache {
|
||||
&self,
|
||||
block_hash: B256,
|
||||
) -> ProviderResult<Option<SealedBlockWithSenders>> {
|
||||
Ok(self.get_block_with_senders(block_hash).await?.map(|block| block.seal(block_hash)))
|
||||
Ok(self
|
||||
.get_block_with_senders(block_hash)
|
||||
.await?
|
||||
.map(|block| (*block).clone().seal(block_hash)))
|
||||
}
|
||||
|
||||
/// Requests the [Receipt] for the block hash
|
||||
@@ -288,7 +292,7 @@ pub(crate) struct EthStateCacheService<
|
||||
LimitReceipts = ByLength,
|
||||
LimitEnvs = ByLength,
|
||||
> where
|
||||
LimitBlocks: Limiter<B256, BlockWithSenders>,
|
||||
LimitBlocks: Limiter<B256, Arc<BlockWithSenders>>,
|
||||
LimitReceipts: Limiter<B256, Arc<Vec<Receipt>>>,
|
||||
LimitEnvs: Limiter<B256, (CfgEnvWithHandlerCfg, BlockEnv)>,
|
||||
{
|
||||
@@ -318,7 +322,11 @@ where
|
||||
Tasks: TaskSpawner + Clone + 'static,
|
||||
EvmConfig: ConfigureEvm<Header = Header>,
|
||||
{
|
||||
fn on_new_block(&mut self, block_hash: B256, res: ProviderResult<Option<BlockWithSenders>>) {
|
||||
fn on_new_block(
|
||||
&mut self,
|
||||
block_hash: B256,
|
||||
res: ProviderResult<Option<Arc<BlockWithSenders>>>,
|
||||
) {
|
||||
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
|
||||
// send the response to queued senders
|
||||
for tx in queued {
|
||||
@@ -328,7 +336,7 @@ where
|
||||
}
|
||||
Either::Right(transaction_tx) => {
|
||||
let _ = transaction_tx.send(res.clone().map(|maybe_block| {
|
||||
maybe_block.map(|block| block.block.body.transactions)
|
||||
maybe_block.map(|block| block.block.body.transactions.clone())
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -360,6 +368,7 @@ where
|
||||
}
|
||||
|
||||
fn on_reorg_block(&mut self, block_hash: B256, res: ProviderResult<Option<BlockWithSenders>>) {
|
||||
let res = res.map(|b| b.map(Arc::new));
|
||||
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
|
||||
// send the response to queued senders
|
||||
for tx in queued {
|
||||
@@ -369,7 +378,7 @@ where
|
||||
}
|
||||
Either::Right(transaction_tx) => {
|
||||
let _ = transaction_tx.send(res.clone().map(|maybe_block| {
|
||||
maybe_block.map(|block| block.block.body.transactions)
|
||||
maybe_block.map(|block| block.block.body.transactions.clone())
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -431,10 +440,12 @@ where
|
||||
let _permit = rate_limiter.acquire().await;
|
||||
// Only look in the database to prevent situations where we
|
||||
// looking up the tree is blocking
|
||||
let block_sender = provider.block_with_senders(
|
||||
BlockHashOrNumber::Hash(block_hash),
|
||||
TransactionVariant::WithHash,
|
||||
);
|
||||
let block_sender = provider
|
||||
.block_with_senders(
|
||||
BlockHashOrNumber::Hash(block_hash),
|
||||
TransactionVariant::WithHash,
|
||||
)
|
||||
.map(|maybe_block| maybe_block.map(Arc::new));
|
||||
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
|
||||
block_hash,
|
||||
res: block_sender,
|
||||
@@ -459,10 +470,12 @@ where
|
||||
let _permit = rate_limiter.acquire().await;
|
||||
// Only look in the database to prevent situations where we
|
||||
// looking up the tree is blocking
|
||||
let res = provider.block_with_senders(
|
||||
BlockHashOrNumber::Hash(block_hash),
|
||||
TransactionVariant::WithHash,
|
||||
);
|
||||
let res = provider
|
||||
.block_with_senders(
|
||||
BlockHashOrNumber::Hash(block_hash),
|
||||
TransactionVariant::WithHash,
|
||||
)
|
||||
.map(|b| b.map(Arc::new));
|
||||
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
|
||||
block_hash,
|
||||
res,
|
||||
@@ -561,7 +574,7 @@ where
|
||||
}
|
||||
CacheAction::CacheNewCanonicalChain { chain_change } => {
|
||||
for block in chain_change.blocks {
|
||||
this.on_new_block(block.hash(), Ok(Some(block.unseal())));
|
||||
this.on_new_block(block.hash(), Ok(Some(Arc::new(block.unseal()))));
|
||||
}
|
||||
|
||||
for block_receipts in chain_change.receipts {
|
||||
@@ -601,7 +614,7 @@ enum CacheAction {
|
||||
GetBlockTransactions { block_hash: B256, response_tx: BlockTransactionsResponseSender },
|
||||
GetEnv { block_hash: B256, response_tx: EnvResponseSender },
|
||||
GetReceipts { block_hash: B256, response_tx: ReceiptsResponseSender },
|
||||
BlockWithSendersResult { block_hash: B256, res: ProviderResult<Option<BlockWithSenders>> },
|
||||
BlockWithSendersResult { block_hash: B256, res: ProviderResult<Option<Arc<BlockWithSenders>>> },
|
||||
ReceiptsResult { block_hash: B256, res: ProviderResult<Option<Arc<Vec<Receipt>>>> },
|
||||
EnvResult { block_hash: B256, res: Box<ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>> },
|
||||
CacheNewCanonicalChain { chain_change: ChainChange },
|
||||
|
||||
Reference in New Issue
Block a user