refactor: simplify fetching blocks (#2060)

This commit is contained in:
Bjerg
2023-03-31 19:27:21 +02:00
committed by GitHub
parent 8eba4cad2c
commit b1643f4ca6
3 changed files with 71 additions and 73 deletions

View File

@@ -40,6 +40,13 @@ impl Block {
withdrawals: self.withdrawals,
}
}
/// Transform into a [`BlockWithSenders`].
pub fn with_senders(self, senders: Vec<Address>) -> BlockWithSenders {
assert_eq!(self.body.len(), senders.len(), "Unequal number of senders");
BlockWithSenders { block: self, senders }
}
}
impl Deref for Block {
@@ -49,6 +56,41 @@ impl Deref for Block {
}
}
/// Sealed block with senders recovered from transactions.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct BlockWithSenders {
/// Block
pub block: Block,
/// List of senders that match the transactions in the block
pub senders: Vec<Address>,
}
impl BlockWithSenders {
/// New block with senders. Return none if len of tx and senders does not match
pub fn new(block: Block, senders: Vec<Address>) -> Option<Self> {
(!block.body.len() != senders.len()).then_some(Self { block, senders })
}
/// Split Structure to its components
pub fn into_components(self) -> (Block, Vec<Address>) {
(self.block, self.senders)
}
}
impl Deref for BlockWithSenders {
type Target = Block;
fn deref(&self) -> &Self::Target {
&self.block
}
}
#[cfg(any(test, feature = "test-utils"))]
impl std::ops::DerefMut for BlockWithSenders {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.block
}
}
/// Sealed Ethereum full block.
///
/// Withdrawals can be optionally included at the end of the RLP encoded message.

View File

@@ -39,8 +39,8 @@ pub mod proofs;
pub use account::{Account, Bytecode};
pub use bits::H512;
pub use block::{
Block, BlockBody, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, SealedBlock,
SealedBlockWithSenders,
Block, BlockBody, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, BlockWithSenders,
SealedBlock, SealedBlockWithSenders,
};
pub use bloom::Bloom;
pub use chain::{

View File

@@ -10,9 +10,8 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::provider::ProviderError;
use reth_metrics_derive::Metrics;
use reth_primitives::{Address, Block, U256};
use reth_primitives::{Address, Block, BlockNumber, BlockWithSenders, U256};
use reth_provider::{
post_state::PostState, BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction,
};
@@ -85,6 +84,27 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
}
}
// TODO: This should be in the block provider trait once we consolidate
// SharedDatabase/Transaction
fn read_block_with_senders<DB: Database>(
tx: &Transaction<'_, DB>,
block_number: BlockNumber,
) -> Result<(BlockWithSenders, U256), StageError> {
let header = tx.get_header(block_number)?;
let td = tx.get_td(block_number)?;
let ommers = tx.get::<tables::BlockOmmers>(block_number)?.unwrap_or_default().ommers;
let withdrawals = tx.get::<tables::BlockWithdrawals>(block_number)?.map(|v| v.withdrawals);
let (transactions, senders): (Vec<_>, Vec<_>) = tx
.get_block_transaction_range(block_number..=block_number)?
.into_iter()
.flat_map(|(_, txs)| txs.into_iter())
.map(|tx| tx.to_components())
.unzip();
Ok((Block { header, body: transactions, ommers, withdrawals }.with_senders(senders), td))
}
/// Execute the stage.
pub fn execute_inner<DB: Database>(
&self,
@@ -95,83 +115,19 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
exec_or_return!(input, self.commit_threshold, "sync::stages::execution");
let last_block = input.stage_progress.unwrap_or_default();
// Get header with canonical hashes.
let mut headers_cursor = tx.cursor_read::<tables::Headers>()?;
// Get total difficulty
let mut td_cursor = tx.cursor_read::<tables::HeaderTD>()?;
// Get bodies with canonical hashes.
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodies>()?;
// Get ommers with canonical hashes.
let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
// Get block withdrawals.
let mut withdrawals_cursor = tx.cursor_read::<tables::BlockWithdrawals>()?;
// Get transaction of the block that we are executing.
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
// Skip sender recovery and load signer from database.
let mut tx_sender = tx.cursor_read::<tables::TxSenders>()?;
// Get block headers and bodies
let block_batch = headers_cursor
.walk_range(start_block..=end_block)?
.map(|entry| -> Result<_, StageError> {
let (number, header) = entry?;
let (_, td) = td_cursor
.seek_exact(number)?
.ok_or(ProviderError::TotalDifficulty { number })?;
let (_, body) =
bodies_cursor.seek_exact(number)?.ok_or(ProviderError::BlockBody { number })?;
let (_, stored_ommers) = ommers_cursor.seek_exact(number)?.unwrap_or_default();
let withdrawals =
withdrawals_cursor.seek_exact(number)?.map(|(_, w)| w.withdrawals);
Ok((header, td.into(), body, stored_ommers.ommers, withdrawals))
})
.collect::<Result<Vec<_>, _>>()?;
// Create state provider with cached state
let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx));
// Fetch transactions, execute them and generate results
let mut state = PostState::default();
for (header, td, body, ommers, withdrawals) in block_batch.into_iter() {
let block_number = header.number;
tracing::trace!(target: "sync::stages::execution", ?block_number, "Execute block.");
// iterate over all transactions
let mut tx_walker = tx_cursor.walk(Some(body.start_tx_id))?;
let mut transactions = Vec::with_capacity(body.tx_count as usize);
// get next N transactions.
for index in body.tx_id_range() {
let (tx_index, tx) =
tx_walker.next().ok_or(ProviderError::EndOfTransactionTable)??;
if tx_index != index {
error!(target: "sync::stages::execution", block = block_number, expected = index, found = tx_index, ?body, "Transaction gap");
return Err(ProviderError::TransactionsGap { missing: tx_index }.into())
}
transactions.push(tx);
}
// take signers
let mut tx_sender_walker = tx_sender.walk(Some(body.start_tx_id))?;
let mut signers = Vec::with_capacity(body.tx_count as usize);
for index in body.tx_id_range() {
let (tx_index, tx) =
tx_sender_walker.next().ok_or(ProviderError::EndOfTransactionSenderTable)??;
if tx_index != index {
error!(target: "sync::stages::execution", block = block_number, expected = index, found = tx_index, ?body, "Signer gap");
return Err(ProviderError::TransactionsSignerGap { missing: tx_index }.into())
}
signers.push(tx);
}
trace!(target: "sync::stages::execution", number = block_number, txs = transactions.len(), "Executing block");
for block_number in start_block..=end_block {
let (block, td) = Self::read_block_with_senders(tx, block_number)?;
// Configure the executor to use the current state.
trace!(target: "sync::stages::execution", number = block_number, txs = block.body.len(), "Executing block");
let (block, senders) = block.into_components();
let block_state = executor
.execute_and_verify_receipt(
&Block { header, body: transactions, ommers, withdrawals },
td,
Some(signers),
)
.execute_and_verify_receipt(&block, td, Some(senders))
.map_err(|error| StageError::ExecutionError { block: block_number, error })?;
if let Some(last_receipt) = block_state.receipts().last() {
self.metrics