mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 09:38:24 -05:00
block_with_senders in ethstatecache (#5302)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@@ -209,10 +209,22 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
|
||||
/// Returns the block with matching hash from any side-chain.
|
||||
///
|
||||
/// Caution: This will not return blocks from the canonical chain.
|
||||
#[inline]
|
||||
pub fn block_by_hash(&self, block_hash: BlockHash) -> Option<&SealedBlock> {
|
||||
self.state.block_by_hash(block_hash)
|
||||
}
|
||||
|
||||
/// Returns the block with matching hash from any side-chain.
|
||||
///
|
||||
/// Caution: This will not return blocks from the canonical chain.
|
||||
#[inline]
|
||||
pub fn block_with_senders_by_hash(
|
||||
&self,
|
||||
block_hash: BlockHash,
|
||||
) -> Option<&SealedBlockWithSenders> {
|
||||
self.state.block_with_senders_by_hash(block_hash)
|
||||
}
|
||||
|
||||
/// Returns the block's receipts with matching hash from any side-chain.
|
||||
///
|
||||
/// Caution: This will not return blocks from the canonical chain.
|
||||
|
||||
@@ -74,6 +74,10 @@ impl BlockchainTreeViewer for NoopBlockchainTree {
|
||||
None
|
||||
}
|
||||
|
||||
fn block_with_senders_by_hash(&self, _hash: BlockHash) -> Option<SealedBlockWithSenders> {
|
||||
None
|
||||
}
|
||||
|
||||
fn buffered_block_by_hash(&self, _block_hash: BlockHash) -> Option<SealedBlock> {
|
||||
None
|
||||
}
|
||||
|
||||
@@ -117,6 +117,11 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTreeViewer for ShareableBlockc
|
||||
self.tree.read().block_by_hash(block_hash).cloned()
|
||||
}
|
||||
|
||||
fn block_with_senders_by_hash(&self, block_hash: BlockHash) -> Option<SealedBlockWithSenders> {
|
||||
trace!(target: "blockchain_tree", ?block_hash, "Returning block by hash");
|
||||
self.tree.read().block_with_senders_by_hash(block_hash).cloned()
|
||||
}
|
||||
|
||||
fn buffered_block_by_hash(&self, block_hash: BlockHash) -> Option<SealedBlock> {
|
||||
self.tree.read().get_buffered_block(&block_hash).map(|b| b.block.clone())
|
||||
}
|
||||
|
||||
@@ -56,10 +56,21 @@ impl TreeState {
|
||||
/// Returns the block with matching hash from any side-chain.
|
||||
///
|
||||
/// Caution: This will not return blocks from the canonical chain.
|
||||
#[inline]
|
||||
pub(crate) fn block_by_hash(&self, block_hash: BlockHash) -> Option<&SealedBlock> {
|
||||
self.block_with_senders_by_hash(block_hash).map(|block| &block.block)
|
||||
}
|
||||
/// Returns the block with matching hash from any side-chain.
|
||||
///
|
||||
/// Caution: This will not return blocks from the canonical chain.
|
||||
#[inline]
|
||||
pub(crate) fn block_with_senders_by_hash(
|
||||
&self,
|
||||
block_hash: BlockHash,
|
||||
) -> Option<&SealedBlockWithSenders> {
|
||||
let id = self.block_indices.get_blocks_chain_id(&block_hash)?;
|
||||
let chain = self.chains.get(&id)?;
|
||||
chain.block(block_hash)
|
||||
chain.block_with_senders(block_hash)
|
||||
}
|
||||
|
||||
/// Returns the block's receipts with matching hash from any side-chain.
|
||||
|
||||
@@ -237,6 +237,12 @@ pub trait BlockchainTreeViewer: Send + Sync {
|
||||
/// disconnected from the canonical chain.
|
||||
fn block_by_hash(&self, hash: BlockHash) -> Option<SealedBlock>;
|
||||
|
||||
/// Returns the block with matching hash from the tree, if it exists.
|
||||
///
|
||||
/// Caution: This will not return blocks from the canonical chain or buffered blocks that are
|
||||
/// disconnected from the canonical chain.
|
||||
fn block_with_senders_by_hash(&self, hash: BlockHash) -> Option<SealedBlockWithSenders>;
|
||||
|
||||
/// Returns the _buffered_ (disconnected) block with matching hash from the internal buffer if
|
||||
/// it exists.
|
||||
///
|
||||
@@ -295,6 +301,11 @@ pub trait BlockchainTreeViewer: Send + Sync {
|
||||
self.block_by_hash(self.pending_block_num_hash()?.hash)
|
||||
}
|
||||
|
||||
/// Returns the pending block if there is one.
|
||||
fn pending_block_with_senders(&self) -> Option<SealedBlockWithSenders> {
|
||||
self.block_with_senders_by_hash(self.pending_block_num_hash()?.hash)
|
||||
}
|
||||
|
||||
/// Returns the pending block and its receipts in one call.
|
||||
///
|
||||
/// This exists to prevent a potential data race if the pending block changes in between
|
||||
|
||||
@@ -98,7 +98,17 @@ impl BlockWithSenders {
|
||||
(!block.body.len() != senders.len()).then_some(Self { block, senders })
|
||||
}
|
||||
|
||||
/// Seal the block with a known hash.
|
||||
///
|
||||
/// WARNING: This method does not perform validation whether the hash is correct.
|
||||
#[inline]
|
||||
pub fn seal(self, hash: B256) -> SealedBlockWithSenders {
|
||||
let Self { block, senders } = self;
|
||||
SealedBlockWithSenders { block: block.seal(hash), senders }
|
||||
}
|
||||
|
||||
/// Split Structure to its components
|
||||
#[inline]
|
||||
pub fn into_components(self) -> (Block, Vec<Address>) {
|
||||
(self.block, self.senders)
|
||||
}
|
||||
@@ -288,6 +298,13 @@ impl SealedBlockWithSenders {
|
||||
(self.block, self.senders)
|
||||
}
|
||||
|
||||
/// Returns the unsealed [BlockWithSenders]
|
||||
#[inline]
|
||||
pub fn unseal(self) -> BlockWithSenders {
|
||||
let Self { block, senders } = self;
|
||||
BlockWithSenders { block: block.unseal(), senders }
|
||||
}
|
||||
|
||||
/// Returns an iterator over all transactions in the block.
|
||||
#[inline]
|
||||
pub fn transactions(&self) -> impl Iterator<Item = &TransactionSigned> + '_ {
|
||||
|
||||
@@ -2,7 +2,9 @@
|
||||
|
||||
use crate::transaction::from_recovered_with_block_context;
|
||||
use alloy_rlp::Encodable;
|
||||
use reth_primitives::{Block as PrimitiveBlock, Header as PrimitiveHeader, B256, U256, U64};
|
||||
use reth_primitives::{
|
||||
Block as PrimitiveBlock, BlockWithSenders, Header as PrimitiveHeader, B256, U256, U64,
|
||||
};
|
||||
use reth_rpc_types::{Block, BlockError, BlockTransactions, BlockTransactionsKind, Header};
|
||||
|
||||
/// Converts the given primitive block into a [Block] response with the given
|
||||
@@ -10,7 +12,7 @@ use reth_rpc_types::{Block, BlockError, BlockTransactions, BlockTransactionsKind
|
||||
///
|
||||
/// If a `block_hash` is provided, then this is used, otherwise the block hash is computed.
|
||||
pub fn from_block(
|
||||
block: PrimitiveBlock,
|
||||
block: BlockWithSenders,
|
||||
total_difficulty: U256,
|
||||
kind: BlockTransactionsKind,
|
||||
block_hash: Option<B256>,
|
||||
@@ -29,7 +31,7 @@ pub fn from_block(
|
||||
/// This will populate the `transactions` field with only the hashes of the transactions in the
|
||||
/// block: [BlockTransactions::Hashes]
|
||||
pub fn from_block_with_tx_hashes(
|
||||
block: PrimitiveBlock,
|
||||
block: BlockWithSenders,
|
||||
total_difficulty: U256,
|
||||
block_hash: Option<B256>,
|
||||
) -> Block {
|
||||
@@ -39,7 +41,7 @@ pub fn from_block_with_tx_hashes(
|
||||
from_block_with_transactions(
|
||||
block.length(),
|
||||
block_hash,
|
||||
block,
|
||||
block.block,
|
||||
total_difficulty,
|
||||
BlockTransactions::Hashes(transactions),
|
||||
)
|
||||
@@ -51,35 +53,38 @@ pub fn from_block_with_tx_hashes(
|
||||
/// This will populate the `transactions` field with the _full_
|
||||
/// [Transaction](reth_rpc_types::Transaction) objects: [BlockTransactions::Full]
|
||||
pub fn from_block_full(
|
||||
mut block: PrimitiveBlock,
|
||||
mut block: BlockWithSenders,
|
||||
total_difficulty: U256,
|
||||
block_hash: Option<B256>,
|
||||
) -> Result<Block, BlockError> {
|
||||
let block_hash = block_hash.unwrap_or_else(|| block.header.hash_slow());
|
||||
let block_number = block.number;
|
||||
let base_fee_per_gas = block.base_fee_per_gas;
|
||||
let block_hash = block_hash.unwrap_or_else(|| block.block.header.hash_slow());
|
||||
let block_number = block.block.number;
|
||||
let base_fee_per_gas = block.block.base_fee_per_gas;
|
||||
|
||||
// NOTE: we can safely remove the body here because not needed to finalize the `Block` in
|
||||
// `from_block_with_transactions`, however we need to compute the length before
|
||||
let block_length = block.length();
|
||||
let body = std::mem::take(&mut block.body);
|
||||
let block_length = block.block.length();
|
||||
let body = std::mem::take(&mut block.block.body);
|
||||
let transactions_with_senders = body.into_iter().zip(block.senders);
|
||||
let transactions = transactions_with_senders
|
||||
.enumerate()
|
||||
.map(|(idx, (tx, sender))| {
|
||||
let signed_tx_ec_recovered = tx.with_signer(sender);
|
||||
|
||||
let mut transactions = Vec::with_capacity(block.body.len());
|
||||
for (idx, tx) in body.into_iter().enumerate() {
|
||||
let signed_tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?;
|
||||
transactions.push(from_recovered_with_block_context(
|
||||
signed_tx,
|
||||
block_hash,
|
||||
block_number,
|
||||
base_fee_per_gas,
|
||||
U256::from(idx),
|
||||
))
|
||||
}
|
||||
from_recovered_with_block_context(
|
||||
signed_tx_ec_recovered,
|
||||
block_hash,
|
||||
block_number,
|
||||
base_fee_per_gas,
|
||||
U256::from(idx),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(from_block_with_transactions(
|
||||
block_length,
|
||||
block_hash,
|
||||
block,
|
||||
block.block,
|
||||
total_difficulty,
|
||||
BlockTransactions::Full(transactions),
|
||||
))
|
||||
|
||||
@@ -146,13 +146,23 @@ where
|
||||
&self,
|
||||
block_id: impl Into<BlockId>,
|
||||
) -> EthResult<Option<reth_primitives::SealedBlock>> {
|
||||
self.block_with_senders(block_id)
|
||||
.await
|
||||
.map(|maybe_block| maybe_block.map(|block| block.block))
|
||||
}
|
||||
|
||||
/// Returns the block object for the given block id.
|
||||
pub(crate) async fn block_with_senders(
|
||||
&self,
|
||||
block_id: impl Into<BlockId>,
|
||||
) -> EthResult<Option<reth_primitives::SealedBlockWithSenders>> {
|
||||
let block_id = block_id.into();
|
||||
|
||||
if block_id.is_pending() {
|
||||
// Pending block can be fetched directly without need for caching
|
||||
let maybe_pending = self.provider().pending_block()?;
|
||||
let maybe_pending = self.provider().pending_block_with_senders()?;
|
||||
return if maybe_pending.is_some() {
|
||||
return Ok(maybe_pending)
|
||||
Ok(maybe_pending)
|
||||
} else {
|
||||
self.local_pending_block().await
|
||||
}
|
||||
@@ -163,7 +173,7 @@ where
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
Ok(self.cache().get_sealed_block(block_hash).await?)
|
||||
Ok(self.cache().get_sealed_block_with_senders(block_hash).await?)
|
||||
}
|
||||
|
||||
/// Returns the populated rpc block object for the given block id.
|
||||
@@ -175,7 +185,7 @@ where
|
||||
block_id: impl Into<BlockId>,
|
||||
full: bool,
|
||||
) -> EthResult<Option<RichBlock>> {
|
||||
let block = match self.block(block_id).await? {
|
||||
let block = match self.block_with_senders(block_id).await? {
|
||||
Some(block) => block,
|
||||
None => return Ok(None),
|
||||
};
|
||||
@@ -184,7 +194,7 @@ where
|
||||
.provider()
|
||||
.header_td_by_number(block.number)?
|
||||
.ok_or(EthApiError::UnknownBlockNumber)?;
|
||||
let block = from_block(block.into(), total_difficulty, full.into(), Some(block_hash))?;
|
||||
let block = from_block(block.unseal(), total_difficulty, full.into(), Some(block_hash))?;
|
||||
Ok(Some(block.into()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ use reth_interfaces::RethResult;
|
||||
use reth_network_api::NetworkInfo;
|
||||
use reth_primitives::{
|
||||
revm_primitives::{BlockEnv, CfgEnv},
|
||||
Address, BlockId, BlockNumberOrTag, ChainInfo, SealedBlock, B256, U256, U64,
|
||||
Address, BlockId, BlockNumberOrTag, ChainInfo, SealedBlockWithSenders, B256, U256, U64,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderBox, StateProviderFactory,
|
||||
@@ -246,7 +246,7 @@ where
|
||||
///
|
||||
/// If no pending block is available, this will derive it from the `latest` block
|
||||
pub(crate) fn pending_block_env_and_cfg(&self) -> EthResult<PendingBlockEnv> {
|
||||
let origin = if let Some(pending) = self.provider().pending_block()? {
|
||||
let origin = if let Some(pending) = self.provider().pending_block_with_senders()? {
|
||||
PendingBlockEnvOrigin::ActualPending(pending)
|
||||
} else {
|
||||
// no pending block from the CL yet, so we use the latest block and modify the env
|
||||
@@ -281,7 +281,7 @@ where
|
||||
}
|
||||
|
||||
/// Returns the locally built pending block
|
||||
pub(crate) async fn local_pending_block(&self) -> EthResult<Option<SealedBlock>> {
|
||||
pub(crate) async fn local_pending_block(&self) -> EthResult<Option<SealedBlockWithSenders>> {
|
||||
let pending = self.pending_block_env_and_cfg()?;
|
||||
if pending.origin.is_actual_pending() {
|
||||
return Ok(pending.origin.into_actual_pending())
|
||||
|
||||
@@ -9,7 +9,7 @@ use reth_primitives::{
|
||||
BlockEnv, CfgEnv, EVMError, Env, InvalidTransaction, ResultAndState, SpecId,
|
||||
},
|
||||
Block, BlockId, BlockNumberOrTag, ChainSpec, Header, IntoRecoveredTransaction, Receipt,
|
||||
Receipts, SealedBlock, SealedHeader, B256, EMPTY_OMMER_ROOT_HASH, U256,
|
||||
Receipts, SealedBlockWithSenders, SealedHeader, B256, EMPTY_OMMER_ROOT_HASH, U256,
|
||||
};
|
||||
use reth_provider::{BundleStateWithReceipts, ChainSpecProvider, StateProviderFactory};
|
||||
use reth_revm::{
|
||||
@@ -42,7 +42,7 @@ impl PendingBlockEnv {
|
||||
self,
|
||||
client: &Client,
|
||||
pool: &Pool,
|
||||
) -> EthResult<SealedBlock>
|
||||
) -> EthResult<SealedBlockWithSenders>
|
||||
where
|
||||
Client: StateProviderFactory + ChainSpecProvider,
|
||||
Pool: TransactionPool,
|
||||
@@ -61,6 +61,7 @@ impl PendingBlockEnv {
|
||||
let block_number = block_env.number.to::<u64>();
|
||||
|
||||
let mut executed_txs = Vec::new();
|
||||
let mut senders = Vec::new();
|
||||
let mut best_txs = pool.best_transactions_with_base_fee(base_fee);
|
||||
|
||||
let (withdrawals, withdrawals_root) = match origin {
|
||||
@@ -176,7 +177,9 @@ impl PendingBlockEnv {
|
||||
}));
|
||||
|
||||
// append transaction to the list of executed transactions
|
||||
executed_txs.push(tx.into_signed());
|
||||
let (tx, sender) = tx.to_components();
|
||||
executed_txs.push(tx);
|
||||
senders.push(sender);
|
||||
}
|
||||
|
||||
// executes the withdrawals and commits them to the Database and BundleState.
|
||||
@@ -236,9 +239,7 @@ impl PendingBlockEnv {
|
||||
|
||||
// seal the block
|
||||
let block = Block { header, body: executed_txs, ommers: vec![], withdrawals };
|
||||
let sealed_block = block.seal_slow();
|
||||
|
||||
Ok(sealed_block)
|
||||
Ok(SealedBlockWithSenders { block: block.seal_slow(), senders })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,7 +287,7 @@ where
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum PendingBlockEnvOrigin {
|
||||
/// The pending block as received from the CL.
|
||||
ActualPending(SealedBlock),
|
||||
ActualPending(SealedBlockWithSenders),
|
||||
/// The header of the latest block
|
||||
DerivedFromLatest(SealedHeader),
|
||||
}
|
||||
@@ -298,7 +299,7 @@ impl PendingBlockEnvOrigin {
|
||||
}
|
||||
|
||||
/// Consumes the type and returns the actual pending block.
|
||||
pub(crate) fn into_actual_pending(self) -> Option<SealedBlock> {
|
||||
pub(crate) fn into_actual_pending(self) -> Option<SealedBlockWithSenders> {
|
||||
match self {
|
||||
PendingBlockEnvOrigin::ActualPending(block) => Some(block),
|
||||
_ => None,
|
||||
@@ -337,7 +338,7 @@ impl PendingBlockEnvOrigin {
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct PendingBlock {
|
||||
/// The cached pending block
|
||||
pub(crate) block: SealedBlock,
|
||||
pub(crate) block: SealedBlockWithSenders,
|
||||
/// Timestamp when the pending block is considered outdated
|
||||
pub(crate) expires_at: Instant,
|
||||
}
|
||||
|
||||
112
crates/rpc/rpc/src/eth/cache/mod.rs
vendored
112
crates/rpc/rpc/src/eth/cache/mod.rs
vendored
@@ -2,9 +2,12 @@
|
||||
|
||||
use futures::{future::Either, Stream, StreamExt};
|
||||
use reth_interfaces::provider::{ProviderError, ProviderResult};
|
||||
use reth_primitives::{Block, Receipt, SealedBlock, TransactionSigned, B256};
|
||||
use reth_primitives::{
|
||||
Block, BlockHashOrNumber, BlockWithSenders, Receipt, SealedBlock, SealedBlockWithSenders,
|
||||
TransactionSigned, B256,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReader, BlockSource, CanonStateNotification, EvmEnvProvider, StateProviderFactory,
|
||||
BlockReader, CanonStateNotification, EvmEnvProvider, StateProviderFactory, TransactionVariant,
|
||||
};
|
||||
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
|
||||
use revm::primitives::{BlockEnv, CfgEnv};
|
||||
@@ -29,13 +32,13 @@ mod metrics;
|
||||
mod multi_consumer;
|
||||
pub use multi_consumer::MultiConsumerLruCache;
|
||||
|
||||
/// The type that can send the response to a requested [Block]
|
||||
type BlockResponseSender = oneshot::Sender<ProviderResult<Option<Block>>>;
|
||||
|
||||
/// The type that can send the response to a requested [Block]
|
||||
type BlockTransactionsResponseSender =
|
||||
oneshot::Sender<ProviderResult<Option<Vec<TransactionSigned>>>>;
|
||||
|
||||
/// The type that can send the response to a requested [BlockWithSenders]
|
||||
type BlockWithSendersResponseSender = oneshot::Sender<ProviderResult<Option<BlockWithSenders>>>;
|
||||
|
||||
/// The type that can send the response to the requested receipts of a block.
|
||||
type ReceiptsResponseSender = oneshot::Sender<ProviderResult<Option<Vec<Receipt>>>>;
|
||||
|
||||
@@ -44,9 +47,9 @@ type EnvResponseSender = oneshot::Sender<ProviderResult<(CfgEnv, BlockEnv)>>;
|
||||
|
||||
type BlockLruCache<L> = MultiConsumerLruCache<
|
||||
B256,
|
||||
Block,
|
||||
BlockWithSenders,
|
||||
L,
|
||||
Either<BlockResponseSender, BlockTransactionsResponseSender>,
|
||||
Either<BlockWithSendersResponseSender, BlockTransactionsResponseSender>,
|
||||
>;
|
||||
|
||||
type ReceiptsLruCache<L> = MultiConsumerLruCache<B256, Vec<Receipt>, L, ReceiptsResponseSender>;
|
||||
@@ -130,8 +133,15 @@ impl EthStateCache {
|
||||
/// Returns `None` if the block does not exist.
|
||||
pub(crate) async fn get_block(&self, block_hash: B256) -> ProviderResult<Option<Block>> {
|
||||
let (response_tx, rx) = oneshot::channel();
|
||||
let _ = self.to_service.send(CacheAction::GetBlock { block_hash, response_tx });
|
||||
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
|
||||
let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
|
||||
let block_with_senders_res =
|
||||
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?;
|
||||
|
||||
if let Ok(Some(block_with_senders)) = block_with_senders_res {
|
||||
Ok(Some(block_with_senders.block))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests the [Block] for the block hash, sealed with the given block hash.
|
||||
@@ -169,6 +179,28 @@ impl EthStateCache {
|
||||
Ok(transactions.zip(receipts))
|
||||
}
|
||||
|
||||
/// Requests the [BlockWithSenders] for the block hash
|
||||
///
|
||||
/// Returns `None` if the block does not exist.
|
||||
pub(crate) async fn get_block_with_senders(
|
||||
&self,
|
||||
block_hash: B256,
|
||||
) -> ProviderResult<Option<BlockWithSenders>> {
|
||||
let (response_tx, rx) = oneshot::channel();
|
||||
let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
|
||||
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
|
||||
}
|
||||
|
||||
/// Requests the [SealedBlockWithSenders] for the block hash
|
||||
///
|
||||
/// Returns `None` if the block does not exist.
|
||||
pub(crate) async fn get_sealed_block_with_senders(
|
||||
&self,
|
||||
block_hash: B256,
|
||||
) -> ProviderResult<Option<SealedBlockWithSenders>> {
|
||||
Ok(self.get_block_with_senders(block_hash).await?.map(|block| block.seal(block_hash)))
|
||||
}
|
||||
|
||||
/// Requests the [Receipt] for the block hash
|
||||
///
|
||||
/// Returns `None` if the block was not found.
|
||||
@@ -228,7 +260,7 @@ pub(crate) struct EthStateCacheService<
|
||||
LimitReceipts = ByLength,
|
||||
LimitEnvs = ByLength,
|
||||
> where
|
||||
LimitBlocks: Limiter<B256, Block>,
|
||||
LimitBlocks: Limiter<B256, BlockWithSenders>,
|
||||
LimitReceipts: Limiter<B256, Vec<Receipt>>,
|
||||
LimitEnvs: Limiter<B256, (CfgEnv, BlockEnv)>,
|
||||
{
|
||||
@@ -255,17 +287,18 @@ where
|
||||
Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static,
|
||||
Tasks: TaskSpawner + Clone + 'static,
|
||||
{
|
||||
fn on_new_block(&mut self, block_hash: B256, res: ProviderResult<Option<Block>>) {
|
||||
fn on_new_block(&mut self, block_hash: B256, res: ProviderResult<Option<BlockWithSenders>>) {
|
||||
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
|
||||
// send the response to queued senders
|
||||
for tx in queued {
|
||||
match tx {
|
||||
Either::Left(block_tx) => {
|
||||
let _ = block_tx.send(res.clone());
|
||||
Either::Left(block_with_senders) => {
|
||||
let _ = block_with_senders.send(res.clone());
|
||||
}
|
||||
Either::Right(transaction_tx) => {
|
||||
let _ = transaction_tx.send(
|
||||
res.clone().map(|maybe_block| maybe_block.map(|block| block.body)),
|
||||
res.clone()
|
||||
.map(|maybe_block| maybe_block.map(|block| block.block.body)),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -316,8 +349,7 @@ where
|
||||
}
|
||||
Some(action) => {
|
||||
match action {
|
||||
CacheAction::GetBlock { block_hash, response_tx } => {
|
||||
// check if block is cached
|
||||
CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
|
||||
if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
|
||||
let _ = response_tx.send(Ok(Some(block)));
|
||||
continue
|
||||
@@ -333,10 +365,14 @@ where
|
||||
let _permit = rate_limiter.acquire().await;
|
||||
// Only look in the database to prevent situations where we
|
||||
// looking up the tree is blocking
|
||||
let res = provider
|
||||
.find_block_by_hash(block_hash, BlockSource::Database);
|
||||
let _ = action_tx
|
||||
.send(CacheAction::BlockResult { block_hash, res });
|
||||
let block_sender = provider.block_with_senders(
|
||||
BlockHashOrNumber::Hash(block_hash),
|
||||
TransactionVariant::WithHash,
|
||||
);
|
||||
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
|
||||
block_hash,
|
||||
res: block_sender,
|
||||
});
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -357,10 +393,14 @@ where
|
||||
let _permit = rate_limiter.acquire().await;
|
||||
// Only look in the database to prevent situations where we
|
||||
// looking up the tree is blocking
|
||||
let res = provider
|
||||
.find_block_by_hash(block_hash, BlockSource::Database);
|
||||
let _ = action_tx
|
||||
.send(CacheAction::BlockResult { block_hash, res });
|
||||
let res = provider.block_with_senders(
|
||||
BlockHashOrNumber::Hash(block_hash),
|
||||
TransactionVariant::WithHash,
|
||||
);
|
||||
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
|
||||
block_hash,
|
||||
res,
|
||||
});
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -413,12 +453,20 @@ where
|
||||
}));
|
||||
}
|
||||
}
|
||||
CacheAction::BlockResult { block_hash, res } => {
|
||||
this.on_new_block(block_hash, res);
|
||||
}
|
||||
CacheAction::ReceiptsResult { block_hash, res } => {
|
||||
this.on_new_receipts(block_hash, res);
|
||||
}
|
||||
CacheAction::BlockWithSendersResult { block_hash, res } => match res {
|
||||
Ok(Some(block_with_senders)) => {
|
||||
this.on_new_block(block_hash, Ok(Some(block_with_senders)));
|
||||
}
|
||||
Ok(None) => {
|
||||
this.on_new_block(block_hash, Ok(None));
|
||||
}
|
||||
Err(e) => {
|
||||
this.on_new_block(block_hash, Err(e));
|
||||
}
|
||||
},
|
||||
CacheAction::EnvResult { block_hash, res } => {
|
||||
let res = *res;
|
||||
if let Some(queued) = this.evm_env_cache.remove(&block_hash) {
|
||||
@@ -457,14 +505,14 @@ where
|
||||
|
||||
/// All message variants sent through the channel
|
||||
enum CacheAction {
|
||||
GetBlock { block_hash: B256, response_tx: BlockResponseSender },
|
||||
GetBlockWithSenders { block_hash: B256, response_tx: BlockWithSendersResponseSender },
|
||||
GetBlockTransactions { block_hash: B256, response_tx: BlockTransactionsResponseSender },
|
||||
GetEnv { block_hash: B256, response_tx: EnvResponseSender },
|
||||
GetReceipts { block_hash: B256, response_tx: ReceiptsResponseSender },
|
||||
BlockResult { block_hash: B256, res: ProviderResult<Option<Block>> },
|
||||
BlockWithSendersResult { block_hash: B256, res: ProviderResult<Option<BlockWithSenders>> },
|
||||
ReceiptsResult { block_hash: B256, res: ProviderResult<Option<Vec<Receipt>>> },
|
||||
EnvResult { block_hash: B256, res: Box<ProviderResult<(CfgEnv, BlockEnv)>> },
|
||||
CacheNewCanonicalChain { blocks: Vec<SealedBlock>, receipts: Vec<BlockReceipts> },
|
||||
CacheNewCanonicalChain { blocks: Vec<SealedBlockWithSenders>, receipts: Vec<BlockReceipts> },
|
||||
}
|
||||
|
||||
struct BlockReceipts {
|
||||
@@ -483,13 +531,13 @@ where
|
||||
// we're only interested in new committed blocks
|
||||
let (blocks, state) = committed.inner();
|
||||
|
||||
let blocks = blocks.iter().map(|(_, block)| block.block.clone()).collect::<Vec<_>>();
|
||||
let blocks = blocks.iter().map(|(_, block)| block.clone()).collect::<Vec<_>>();
|
||||
|
||||
// also cache all receipts of the blocks
|
||||
let mut receipts = Vec::with_capacity(blocks.len());
|
||||
for block in &blocks {
|
||||
let block_receipts = BlockReceipts {
|
||||
block_hash: block.hash,
|
||||
block_hash: block.block.hash,
|
||||
receipts: state.receipts_by_block(block.number).to_vec(),
|
||||
};
|
||||
receipts.push(block_receipts);
|
||||
|
||||
@@ -71,9 +71,12 @@ impl Chain {
|
||||
|
||||
/// Returns the block with matching hash.
|
||||
pub fn block(&self, block_hash: BlockHash) -> Option<&SealedBlock> {
|
||||
self.blocks
|
||||
.iter()
|
||||
.find_map(|(_num, block)| (block.hash() == block_hash).then_some(&block.block))
|
||||
self.block_with_senders(block_hash).map(|block| &block.block)
|
||||
}
|
||||
|
||||
/// Returns the block with matching hash.
|
||||
pub fn block_with_senders(&self, block_hash: BlockHash) -> Option<&SealedBlockWithSenders> {
|
||||
self.blocks.iter().find_map(|(_num, block)| (block.hash() == block_hash).then_some(block))
|
||||
}
|
||||
|
||||
/// Return post state of the block at the `block_number` or None if block is not known
|
||||
|
||||
@@ -15,9 +15,9 @@ use reth_primitives::{
|
||||
snapshot::HighestSnapshots,
|
||||
stage::{StageCheckpoint, StageId},
|
||||
Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, ChainInfo,
|
||||
ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock, SealedHeader,
|
||||
TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal,
|
||||
B256, U256,
|
||||
ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock, SealedBlockWithSenders,
|
||||
SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber,
|
||||
Withdrawal, B256, U256,
|
||||
};
|
||||
use revm::primitives::{BlockEnv, CfgEnv};
|
||||
use std::{
|
||||
@@ -290,6 +290,10 @@ impl<DB: Database> BlockReader for ProviderFactory<DB> {
|
||||
self.provider()?.pending_block()
|
||||
}
|
||||
|
||||
fn pending_block_with_senders(&self) -> ProviderResult<Option<SealedBlockWithSenders>> {
|
||||
self.provider()?.pending_block_with_senders()
|
||||
}
|
||||
|
||||
fn pending_block_and_receipts(&self) -> ProviderResult<Option<(SealedBlock, Vec<Receipt>)>> {
|
||||
self.provider()?.pending_block_and_receipts()
|
||||
}
|
||||
|
||||
@@ -1073,6 +1073,10 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn pending_block_with_senders(&self) -> ProviderResult<Option<SealedBlockWithSenders>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn pending_block_and_receipts(&self) -> ProviderResult<Option<(SealedBlock, Vec<Receipt>)>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -252,6 +252,10 @@ where
|
||||
Ok(self.tree.pending_block())
|
||||
}
|
||||
|
||||
fn pending_block_with_senders(&self) -> ProviderResult<Option<SealedBlockWithSenders>> {
|
||||
Ok(self.tree.pending_block_with_senders())
|
||||
}
|
||||
|
||||
fn pending_block_and_receipts(&self) -> ProviderResult<Option<(SealedBlock, Vec<Receipt>)>> {
|
||||
Ok(self.tree.pending_block_and_receipts())
|
||||
}
|
||||
@@ -637,6 +641,10 @@ where
|
||||
self.tree.block_by_hash(block_hash)
|
||||
}
|
||||
|
||||
fn block_with_senders_by_hash(&self, block_hash: BlockHash) -> Option<SealedBlockWithSenders> {
|
||||
self.tree.block_with_senders_by_hash(block_hash)
|
||||
}
|
||||
|
||||
fn buffered_block_by_hash(&self, block_hash: BlockHash) -> Option<SealedBlock> {
|
||||
self.tree.buffered_block_by_hash(block_hash)
|
||||
}
|
||||
|
||||
@@ -12,8 +12,8 @@ use reth_interfaces::provider::{ProviderError, ProviderResult};
|
||||
use reth_primitives::{
|
||||
keccak256, trie::AccountProof, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId,
|
||||
BlockNumber, BlockWithSenders, Bytecode, Bytes, ChainInfo, ChainSpec, Header, Receipt,
|
||||
SealedBlock, SealedHeader, StorageKey, StorageValue, TransactionMeta, TransactionSigned,
|
||||
TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
|
||||
SealedBlock, SealedBlockWithSenders, SealedHeader, StorageKey, StorageValue, TransactionMeta,
|
||||
TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
|
||||
};
|
||||
use reth_trie::updates::TrieUpdates;
|
||||
use revm::primitives::{BlockEnv, CfgEnv};
|
||||
@@ -438,6 +438,10 @@ impl BlockReader for MockEthProvider {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn pending_block_with_senders(&self) -> ProviderResult<Option<SealedBlockWithSenders>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn pending_block_and_receipts(&self) -> ProviderResult<Option<(SealedBlock, Vec<Receipt>)>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -14,8 +14,8 @@ use reth_primitives::{
|
||||
trie::AccountProof,
|
||||
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber, Bytecode,
|
||||
ChainInfo, ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock,
|
||||
SealedHeader, StorageKey, StorageValue, TransactionMeta, TransactionSigned,
|
||||
TransactionSignedNoHash, TxHash, TxNumber, B256, MAINNET, U256,
|
||||
SealedBlockWithSenders, SealedHeader, StorageKey, StorageValue, TransactionMeta,
|
||||
TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, MAINNET, U256,
|
||||
};
|
||||
use reth_trie::updates::TrieUpdates;
|
||||
use revm::primitives::{BlockEnv, CfgEnv};
|
||||
@@ -85,6 +85,10 @@ impl BlockReader for NoopProvider {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn pending_block_with_senders(&self) -> ProviderResult<Option<SealedBlockWithSenders>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn pending_block_and_receipts(&self) -> ProviderResult<Option<(SealedBlock, Vec<Receipt>)>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -85,6 +85,12 @@ pub trait BlockReader:
|
||||
/// and the caller does not know the hash.
|
||||
fn pending_block(&self) -> ProviderResult<Option<SealedBlock>>;
|
||||
|
||||
/// Returns the pending block if available
|
||||
///
|
||||
/// Note: This returns a [SealedBlockWithSenders] because it's expected that this is sealed by
|
||||
/// the provider and the caller does not know the hash.
|
||||
fn pending_block_with_senders(&self) -> ProviderResult<Option<SealedBlockWithSenders>>;
|
||||
|
||||
/// Returns the pending block and receipts if available.
|
||||
fn pending_block_and_receipts(&self) -> ProviderResult<Option<(SealedBlock, Vec<Receipt>)>>;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user