mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-28 00:28:20 -05:00
refactor: unify code paths for receipts removal (#12887)
This commit is contained in:
@@ -8,15 +8,13 @@ use reth_codecs::Compact;
|
||||
use reth_primitives_traits::{Block, BlockBody};
|
||||
use tracing::*;
|
||||
|
||||
use alloy_primitives::TxNumber;
|
||||
use reth_db::{tables, transaction::DbTx};
|
||||
use reth_db_api::{cursor::DbCursorRO, transaction::DbTxMut};
|
||||
use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse};
|
||||
use reth_primitives::StaticFileSegment;
|
||||
use reth_provider::{
|
||||
providers::{StaticFileProvider, StaticFileWriter},
|
||||
BlockReader, BlockWriter, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader,
|
||||
StorageLocation,
|
||||
providers::StaticFileWriter, BlockReader, BlockWriter, DBProvider, ProviderError,
|
||||
StaticFileProviderFactory, StatsReader, StorageLocation,
|
||||
};
|
||||
use reth_stages_api::{
|
||||
EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
|
||||
@@ -24,6 +22,8 @@ use reth_stages_api::{
|
||||
};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
|
||||
use super::missing_static_data_error;
|
||||
|
||||
/// The body stage downloads block bodies.
|
||||
///
|
||||
/// The body stage downloads block bodies for all block headers stored locally in storage.
|
||||
@@ -128,6 +128,7 @@ impl<D: BodyDownloader> BodyStage<D> {
|
||||
next_static_file_tx_num.saturating_sub(1),
|
||||
&static_file_provider,
|
||||
provider,
|
||||
StaticFileSegment::Transactions,
|
||||
)?)
|
||||
}
|
||||
} else {
|
||||
@@ -135,6 +136,7 @@ impl<D: BodyDownloader> BodyStage<D> {
|
||||
next_static_file_tx_num.saturating_sub(1),
|
||||
&static_file_provider,
|
||||
provider,
|
||||
StaticFileSegment::Transactions,
|
||||
)?)
|
||||
}
|
||||
}
|
||||
@@ -242,42 +244,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when database is ahead of static files. Attempts to find the first block we are missing
|
||||
/// transactions for.
|
||||
fn missing_static_data_error<Provider>(
|
||||
last_tx_num: TxNumber,
|
||||
static_file_provider: &StaticFileProvider<Provider::Primitives>,
|
||||
provider: &Provider,
|
||||
) -> Result<StageError, ProviderError>
|
||||
where
|
||||
Provider: BlockReader + StaticFileProviderFactory,
|
||||
{
|
||||
let mut last_block = static_file_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::Transactions)
|
||||
.unwrap_or_default();
|
||||
|
||||
// To be extra safe, we make sure that the last tx num matches the last block from its indices.
|
||||
// If not, get it.
|
||||
loop {
|
||||
if let Some(indices) = provider.block_body_indices(last_block)? {
|
||||
if indices.last_tx_num() <= last_tx_num {
|
||||
break
|
||||
}
|
||||
}
|
||||
if last_block == 0 {
|
||||
break
|
||||
}
|
||||
last_block -= 1;
|
||||
}
|
||||
|
||||
let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
|
||||
|
||||
Ok(StageError::MissingStaticFileData {
|
||||
block: missing_block,
|
||||
segment: StaticFileSegment::Transactions,
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know
|
||||
// beforehand how many bytes we need to download. So the good solution would be to measure the
|
||||
// progress in gas as a proxy to size. Execution stage uses a similar approach.
|
||||
|
||||
@@ -14,7 +14,7 @@ use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource};
|
||||
use reth_primitives::{SealedHeader, StaticFileSegment};
|
||||
use reth_primitives_traits::{format_gas_throughput, Block, BlockBody, NodePrimitives};
|
||||
use reth_provider::{
|
||||
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
|
||||
providers::{StaticFileProvider, StaticFileWriter},
|
||||
BlockHashReader, BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef,
|
||||
OriginalValuesKnown, ProviderError, StateChangeWriter, StateCommitmentProvider, StateWriter,
|
||||
StaticFileProviderFactory, StatsReader, StorageLocation, TransactionVariant,
|
||||
@@ -35,6 +35,8 @@ use std::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
use super::missing_static_data_error;
|
||||
|
||||
/// The execution stage executes all transactions and
|
||||
/// update history indexes.
|
||||
///
|
||||
@@ -169,6 +171,88 @@ impl<E> ExecutionStage<E> {
|
||||
}
|
||||
Ok(prune_modes)
|
||||
}
|
||||
|
||||
/// Performs consistency check on static files.
|
||||
///
|
||||
/// This function compares the highest receipt number recorded in the database with that in the
|
||||
/// static file to detect any discrepancies due to unexpected shutdowns or database rollbacks.
|
||||
/// **If the height in the static file is higher**, it rolls back (unwinds) the static file.
|
||||
/// **Conversely, if the height in the database is lower**, it triggers a rollback in the
|
||||
/// database (by returning [`StageError`]) until the heights in both the database and static
|
||||
/// file match.
|
||||
fn ensure_consistency<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
checkpoint: u64,
|
||||
unwind_to: Option<u64>,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider,
|
||||
{
|
||||
// If thre's any receipts pruning configured, receipts are written directly to database and
|
||||
// inconsistencies are expected.
|
||||
if self.prune_modes.has_receipts_pruning() {
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// Get next expected receipt number
|
||||
let tx = provider.tx_ref();
|
||||
let next_receipt_num = tx
|
||||
.cursor_read::<tables::BlockBodyIndices>()?
|
||||
.seek_exact(checkpoint)?
|
||||
.map(|(_, value)| value.next_tx_num())
|
||||
.unwrap_or(0);
|
||||
|
||||
let static_file_provider = provider.static_file_provider();
|
||||
|
||||
// Get next expected receipt number in static files
|
||||
let next_static_file_receipt_num = static_file_provider
|
||||
.get_highest_static_file_tx(StaticFileSegment::Receipts)
|
||||
.map(|num| num + 1)
|
||||
.unwrap_or(0);
|
||||
|
||||
// Check if we had any unexpected shutdown after committing to static files, but
|
||||
// NOT committing to database.
|
||||
match next_static_file_receipt_num.cmp(&next_receipt_num) {
|
||||
// It can be equal when it's a chain of empty blocks, but we still need to update the
|
||||
// last block in the range.
|
||||
Ordering::Greater | Ordering::Equal => {
|
||||
let mut static_file_producer =
|
||||
static_file_provider.latest_writer(StaticFileSegment::Receipts)?;
|
||||
static_file_producer
|
||||
.prune_receipts(next_static_file_receipt_num - next_receipt_num, checkpoint)?;
|
||||
// Since this is a database <-> static file inconsistency, we commit the change
|
||||
// straight away.
|
||||
static_file_producer.commit()?;
|
||||
}
|
||||
Ordering::Less => {
|
||||
// If we are already in the process of unwind, this might be fine because we will
|
||||
// fix the inconsistency right away.
|
||||
if let Some(unwind_to) = unwind_to {
|
||||
let next_receipt_num_after_unwind = provider
|
||||
.tx_ref()
|
||||
.get::<tables::BlockBodyIndices>(unwind_to)?
|
||||
.map(|b| b.next_tx_num())
|
||||
.ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?;
|
||||
|
||||
if next_receipt_num_after_unwind > next_static_file_receipt_num {
|
||||
// This means we need a deeper unwind.
|
||||
} else {
|
||||
return Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
return Err(missing_static_data_error(
|
||||
next_static_file_receipt_num.saturating_sub(1),
|
||||
&static_file_provider,
|
||||
provider,
|
||||
StaticFileSegment::Receipts,
|
||||
)?)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, Provider> Stage<Provider> for ExecutionStage<E>
|
||||
@@ -209,20 +293,7 @@ where
|
||||
let prune_modes = self.adjust_prune_modes(provider, start_block, max_block)?;
|
||||
let static_file_provider = provider.static_file_provider();
|
||||
|
||||
// We only use static files for Receipts, if there is no receipt pruning of any kind.
|
||||
let write_receipts_to = if self.prune_modes.receipts.is_none() &&
|
||||
self.prune_modes.receipts_log_filter.is_empty()
|
||||
{
|
||||
debug!(target: "sync::stages::execution", start = start_block, "Preparing static file producer");
|
||||
let mut producer =
|
||||
prepare_static_file_producer(provider, &static_file_provider, start_block)?;
|
||||
// Since there might be a database <-> static file inconsistency (read
|
||||
// `prepare_static_file_producer` for context), we commit the change straight away.
|
||||
producer.commit()?;
|
||||
StorageLocation::StaticFiles
|
||||
} else {
|
||||
StorageLocation::Database
|
||||
};
|
||||
self.ensure_consistency(provider, input.checkpoint().block_number, None)?;
|
||||
|
||||
let db = StateProviderDatabase(LatestStateProviderRef::new(provider));
|
||||
let mut executor = self.executor_provider.batch_executor(db);
|
||||
@@ -361,7 +432,7 @@ where
|
||||
let time = Instant::now();
|
||||
|
||||
// write output
|
||||
provider.write_to_storage(state, OriginalValuesKnown::Yes, write_receipts_to)?;
|
||||
provider.write_to_storage(state, OriginalValuesKnown::Yes, StorageLocation::StaticFiles)?;
|
||||
|
||||
let db_write_duration = time.elapsed();
|
||||
debug!(
|
||||
@@ -408,10 +479,13 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
self.ensure_consistency(provider, input.checkpoint.block_number, Some(unwind_to))?;
|
||||
|
||||
// Unwind account and storage changesets, as well as receipts.
|
||||
//
|
||||
// This also updates `PlainStorageState` and `PlainAccountState`.
|
||||
let bundle_state_with_receipts = provider.take_state_above(unwind_to)?;
|
||||
let bundle_state_with_receipts =
|
||||
provider.take_state_above(unwind_to, StorageLocation::Both)?;
|
||||
|
||||
// Prepare the input for post unwind commit hook, where an `ExExNotification` will be sent.
|
||||
if self.exex_manager_handle.has_exexs() {
|
||||
@@ -432,25 +506,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
let static_file_provider = provider.static_file_provider();
|
||||
|
||||
// Unwind all receipts for transactions in the block range
|
||||
if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() {
|
||||
// We only use static files for Receipts, if there is no receipt pruning of any kind.
|
||||
|
||||
// prepare_static_file_producer does a consistency check that will unwind static files
|
||||
// if the expected highest receipt in the files is higher than the database.
|
||||
// Which is essentially what happens here when we unwind this stage.
|
||||
let _static_file_producer =
|
||||
prepare_static_file_producer(provider, &static_file_provider, *range.start())?;
|
||||
} else {
|
||||
// If there is any kind of receipt pruning/filtering we use the database, since static
|
||||
// files do not support filters.
|
||||
//
|
||||
// If we hit this case, the receipts have already been unwound by the call to
|
||||
// `take_state`.
|
||||
}
|
||||
|
||||
// Update the checkpoint.
|
||||
let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint();
|
||||
if let Some(stage_checkpoint) = stage_checkpoint.as_mut() {
|
||||
@@ -576,85 +631,6 @@ fn calculate_gas_used_from_headers<N: NodePrimitives>(
|
||||
Ok(gas_total)
|
||||
}
|
||||
|
||||
/// Returns a `StaticFileProviderRWRefMut` static file producer after performing a consistency
|
||||
/// check.
|
||||
///
|
||||
/// This function compares the highest receipt number recorded in the database with that in the
|
||||
/// static file to detect any discrepancies due to unexpected shutdowns or database rollbacks. **If
|
||||
/// the height in the static file is higher**, it rolls back (unwinds) the static file.
|
||||
/// **Conversely, if the height in the database is lower**, it triggers a rollback in the database
|
||||
/// (by returning [`StageError`]) until the heights in both the database and static file match.
|
||||
fn prepare_static_file_producer<'a, 'b, Provider>(
|
||||
provider: &'b Provider,
|
||||
static_file_provider: &'a StaticFileProvider<Provider::Primitives>,
|
||||
start_block: u64,
|
||||
) -> Result<StaticFileProviderRWRefMut<'a, Provider::Primitives>, StageError>
|
||||
where
|
||||
Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider,
|
||||
'b: 'a,
|
||||
{
|
||||
// Get next expected receipt number
|
||||
let tx = provider.tx_ref();
|
||||
let next_receipt_num = tx
|
||||
.cursor_read::<tables::BlockBodyIndices>()?
|
||||
.seek_exact(start_block)?
|
||||
.map(|(_, value)| value.first_tx_num)
|
||||
.unwrap_or(0);
|
||||
|
||||
// Get next expected receipt number in static files
|
||||
let next_static_file_receipt_num = static_file_provider
|
||||
.get_highest_static_file_tx(StaticFileSegment::Receipts)
|
||||
.map(|num| num + 1)
|
||||
.unwrap_or(0);
|
||||
|
||||
let mut static_file_producer =
|
||||
static_file_provider.get_writer(start_block, StaticFileSegment::Receipts)?;
|
||||
|
||||
// Check if we had any unexpected shutdown after committing to static files, but
|
||||
// NOT committing to database.
|
||||
match next_static_file_receipt_num.cmp(&next_receipt_num) {
|
||||
// It can be equal when it's a chain of empty blocks, but we still need to update the last
|
||||
// block in the range.
|
||||
Ordering::Greater | Ordering::Equal => static_file_producer.prune_receipts(
|
||||
next_static_file_receipt_num - next_receipt_num,
|
||||
start_block.saturating_sub(1),
|
||||
)?,
|
||||
Ordering::Less => {
|
||||
let mut last_block = static_file_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::Receipts)
|
||||
.unwrap_or(0);
|
||||
|
||||
let last_receipt_num = static_file_provider
|
||||
.get_highest_static_file_tx(StaticFileSegment::Receipts)
|
||||
.unwrap_or(0);
|
||||
|
||||
// To be extra safe, we make sure that the last receipt num matches the last block from
|
||||
// its indices. If not, get it.
|
||||
loop {
|
||||
if let Some(indices) = provider.block_body_indices(last_block)? {
|
||||
if indices.last_tx_num() <= last_receipt_num {
|
||||
break
|
||||
}
|
||||
}
|
||||
if last_block == 0 {
|
||||
break
|
||||
}
|
||||
last_block -= 1;
|
||||
}
|
||||
|
||||
let missing_block =
|
||||
Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
|
||||
|
||||
return Err(StageError::MissingStaticFileData {
|
||||
block: missing_block,
|
||||
segment: StaticFileSegment::Receipts,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Ok(static_file_producer)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -900,7 +876,7 @@ mod tests {
|
||||
|
||||
// Tests node with database and node with static files
|
||||
for mut mode in modes {
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
let mut provider = factory.database_provider_rw().unwrap();
|
||||
|
||||
if let Some(mode) = &mut mode {
|
||||
// Simulating a full node where we write receipts to database
|
||||
@@ -909,6 +885,7 @@ mod tests {
|
||||
|
||||
let mut execution_stage = stage();
|
||||
execution_stage.prune_modes = mode.clone().unwrap_or_default();
|
||||
provider.set_prune_modes(mode.clone().unwrap_or_default());
|
||||
|
||||
let output = execution_stage.execute(&provider, input).unwrap();
|
||||
provider.commit().unwrap();
|
||||
@@ -973,9 +950,10 @@ mod tests {
|
||||
"Post changed of a account"
|
||||
);
|
||||
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
let mut provider = factory.database_provider_rw().unwrap();
|
||||
let mut stage = stage();
|
||||
stage.prune_modes = mode.unwrap_or_default();
|
||||
stage.prune_modes = mode.clone().unwrap_or_default();
|
||||
provider.set_prune_modes(mode.unwrap_or_default());
|
||||
|
||||
let _result = stage
|
||||
.unwind(
|
||||
@@ -1050,6 +1028,7 @@ mod tests {
|
||||
// Test Execution
|
||||
let mut execution_stage = stage();
|
||||
execution_stage.prune_modes = mode.clone().unwrap_or_default();
|
||||
provider.set_prune_modes(mode.clone().unwrap_or_default());
|
||||
|
||||
let result = execution_stage.execute(&provider, input).unwrap();
|
||||
provider.commit().unwrap();
|
||||
@@ -1057,7 +1036,8 @@ mod tests {
|
||||
// Test Unwind
|
||||
provider = factory.database_provider_rw().unwrap();
|
||||
let mut stage = stage();
|
||||
stage.prune_modes = mode.unwrap_or_default();
|
||||
stage.prune_modes = mode.clone().unwrap_or_default();
|
||||
provider.set_prune_modes(mode.clone().unwrap_or_default());
|
||||
|
||||
let result = stage
|
||||
.unwind(
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
//! Utils for `stages`.
|
||||
use alloy_primitives::BlockNumber;
|
||||
use alloy_primitives::{BlockNumber, TxNumber};
|
||||
use reth_config::config::EtlConfig;
|
||||
use reth_db::BlockNumberList;
|
||||
use reth_db_api::{
|
||||
@@ -10,7 +10,11 @@ use reth_db_api::{
|
||||
DatabaseError,
|
||||
};
|
||||
use reth_etl::Collector;
|
||||
use reth_provider::DBProvider;
|
||||
use reth_primitives::StaticFileSegment;
|
||||
use reth_provider::{
|
||||
providers::StaticFileProvider, BlockReader, DBProvider, ProviderError,
|
||||
StaticFileProviderFactory,
|
||||
};
|
||||
use reth_stages_api::StageError;
|
||||
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
|
||||
use tracing::info;
|
||||
@@ -244,3 +248,36 @@ impl LoadMode {
|
||||
matches!(self, Self::Flush)
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when database is ahead of static files. Attempts to find the first block we are missing
|
||||
/// transactions for.
|
||||
pub(crate) fn missing_static_data_error<Provider>(
|
||||
last_tx_num: TxNumber,
|
||||
static_file_provider: &StaticFileProvider<Provider::Primitives>,
|
||||
provider: &Provider,
|
||||
segment: StaticFileSegment,
|
||||
) -> Result<StageError, ProviderError>
|
||||
where
|
||||
Provider: BlockReader + StaticFileProviderFactory,
|
||||
{
|
||||
let mut last_block =
|
||||
static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
|
||||
|
||||
// To be extra safe, we make sure that the last tx num matches the last block from its indices.
|
||||
// If not, get it.
|
||||
loop {
|
||||
if let Some(indices) = provider.block_body_indices(last_block)? {
|
||||
if indices.last_tx_num() <= last_tx_num {
|
||||
break
|
||||
}
|
||||
}
|
||||
if last_block == 0 {
|
||||
break
|
||||
}
|
||||
last_block -= 1;
|
||||
}
|
||||
|
||||
let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
|
||||
|
||||
Ok(StageError::MissingStaticFileData { block: missing_block, segment })
|
||||
}
|
||||
|
||||
@@ -206,6 +206,12 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
|
||||
Ok(Box::new(state_provider))
|
||||
}
|
||||
|
||||
#[cfg(feature = "test-utils")]
|
||||
/// Sets the prune modes for provider.
|
||||
pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
|
||||
self.prune_modes = prune_modes;
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
|
||||
@@ -335,6 +341,34 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes receipts from all transactions starting with provided number (inclusive).
|
||||
fn remove_receipts_from(
|
||||
&self,
|
||||
from_tx: TxNumber,
|
||||
last_block: BlockNumber,
|
||||
remove_from: StorageLocation,
|
||||
) -> ProviderResult<()> {
|
||||
if remove_from.database() {
|
||||
// iterate over block body and remove receipts
|
||||
self.remove::<tables::Receipts>(from_tx..)?;
|
||||
}
|
||||
|
||||
if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
|
||||
let static_file_receipt_num =
|
||||
self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
|
||||
|
||||
let to_delete = static_file_receipt_num
|
||||
.map(|static_num| (static_num + 1).saturating_sub(from_tx))
|
||||
.unwrap_or_default();
|
||||
|
||||
self.static_file_provider
|
||||
.latest_writer(StaticFileSegment::Receipts)?
|
||||
.prune_receipts(to_delete, last_block)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
|
||||
@@ -1951,7 +1985,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateChangeWriter
|
||||
/// 1. Take the old value from the changeset
|
||||
/// 2. Take the new value from the local state
|
||||
/// 3. Set the local state to the value in the changeset
|
||||
fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
|
||||
fn remove_state_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_receipts_from: StorageLocation,
|
||||
) -> ProviderResult<()> {
|
||||
let range = block + 1..=self.last_block_number()?;
|
||||
|
||||
if range.is_empty() {
|
||||
@@ -1964,8 +2002,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateChangeWriter
|
||||
// get transaction receipts
|
||||
let from_transaction_num =
|
||||
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
|
||||
let to_transaction_num =
|
||||
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
|
||||
|
||||
let storage_range = BlockNumberAddress::range(range.clone());
|
||||
|
||||
@@ -2018,8 +2054,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateChangeWriter
|
||||
}
|
||||
}
|
||||
|
||||
// iterate over block body and remove receipts
|
||||
self.remove::<tables::Receipts>(from_transaction_num..=to_transaction_num)?;
|
||||
self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2045,7 +2080,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateChangeWriter
|
||||
/// 1. Take the old value from the changeset
|
||||
/// 2. Take the new value from the local state
|
||||
/// 3. Set the local state to the value in the changeset
|
||||
fn take_state_above(&self, block: BlockNumber) -> ProviderResult<ExecutionOutcome> {
|
||||
fn take_state_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_receipts_from: StorageLocation,
|
||||
) -> ProviderResult<ExecutionOutcome> {
|
||||
let range = block + 1..=self.last_block_number()?;
|
||||
|
||||
if range.is_empty() {
|
||||
@@ -2115,22 +2154,45 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateChangeWriter
|
||||
}
|
||||
}
|
||||
|
||||
// iterate over block body and create ExecutionResult
|
||||
let mut receipt_iter =
|
||||
self.take::<tables::Receipts>(from_transaction_num..=to_transaction_num)?.into_iter();
|
||||
// Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
|
||||
let mut receipts_iter = self
|
||||
.static_file_provider
|
||||
.get_range_with_static_file_or_database(
|
||||
StaticFileSegment::Receipts,
|
||||
from_transaction_num..to_transaction_num + 1,
|
||||
|static_file, range, _| {
|
||||
static_file
|
||||
.receipts_by_tx_range(range.clone())
|
||||
.map(|r| range.into_iter().zip(r).collect())
|
||||
},
|
||||
|range, _| {
|
||||
self.tx
|
||||
.cursor_read::<tables::Receipts>()?
|
||||
.walk_range(range)?
|
||||
.map(|r| r.map_err(Into::into))
|
||||
.collect()
|
||||
},
|
||||
|_| true,
|
||||
)?
|
||||
.into_iter()
|
||||
.peekable();
|
||||
|
||||
let mut receipts = Vec::with_capacity(block_bodies.len());
|
||||
// loop break if we are at the end of the blocks.
|
||||
for (_, block_body) in block_bodies {
|
||||
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
|
||||
for _ in block_body.tx_num_range() {
|
||||
if let Some((_, receipt)) = receipt_iter.next() {
|
||||
block_receipts.push(Some(receipt));
|
||||
for num in block_body.tx_num_range() {
|
||||
if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
|
||||
block_receipts.push(receipts_iter.next().map(|(_, r)| r));
|
||||
} else {
|
||||
block_receipts.push(None);
|
||||
}
|
||||
}
|
||||
receipts.push(block_receipts);
|
||||
}
|
||||
|
||||
self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
|
||||
|
||||
Ok(ExecutionOutcome::new_init(
|
||||
state,
|
||||
reverts,
|
||||
@@ -2594,20 +2656,20 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecu
|
||||
fn take_block_and_execution_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_transactions_from: StorageLocation,
|
||||
remove_from: StorageLocation,
|
||||
) -> ProviderResult<Chain<Self::Primitives>> {
|
||||
let range = block + 1..=self.last_block_number()?;
|
||||
|
||||
self.unwind_trie_state_range(range.clone())?;
|
||||
|
||||
// get execution res
|
||||
let execution_state = self.take_state_above(block)?;
|
||||
let execution_state = self.take_state_above(block, remove_from)?;
|
||||
|
||||
let blocks = self.sealed_block_with_senders_range(range)?;
|
||||
|
||||
// remove block bodies it is needed for both get block range and get block execution results
|
||||
// that is why it is deleted afterwards.
|
||||
self.remove_blocks_above(block, remove_transactions_from)?;
|
||||
self.remove_blocks_above(block, remove_from)?;
|
||||
|
||||
// Update pipeline progress
|
||||
self.update_pipeline_stages(block, true)?;
|
||||
@@ -2618,18 +2680,18 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecu
|
||||
fn remove_block_and_execution_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_transactions_from: StorageLocation,
|
||||
remove_from: StorageLocation,
|
||||
) -> ProviderResult<()> {
|
||||
let range = block + 1..=self.last_block_number()?;
|
||||
|
||||
self.unwind_trie_state_range(range)?;
|
||||
|
||||
// remove execution res
|
||||
self.remove_state_above(block)?;
|
||||
self.remove_state_above(block, remove_from)?;
|
||||
|
||||
// remove block bodies it is needed for both get block range and get block execution results
|
||||
// that is why it is deleted afterwards.
|
||||
self.remove_blocks_above(block, remove_transactions_from)?;
|
||||
self.remove_blocks_above(block, remove_from)?;
|
||||
|
||||
// Update pipeline progress
|
||||
self.update_pipeline_stages(block, true)?;
|
||||
|
||||
@@ -37,19 +37,25 @@ pub trait BlockExecutionWriter:
|
||||
/// Take all of the blocks above the provided number and their execution result
|
||||
///
|
||||
/// The passed block number will stay in the database.
|
||||
///
|
||||
/// Accepts [`StorageLocation`] specifying from where should transactions and receipts be
|
||||
/// removed.
|
||||
fn take_block_and_execution_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_transactions_from: StorageLocation,
|
||||
remove_from: StorageLocation,
|
||||
) -> ProviderResult<Chain<Self::Primitives>>;
|
||||
|
||||
/// Remove all of the blocks above the provided number and their execution result
|
||||
///
|
||||
/// The passed block number will stay in the database.
|
||||
///
|
||||
/// Accepts [`StorageLocation`] specifying from where should transactions and receipts be
|
||||
/// removed.
|
||||
fn remove_block_and_execution_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_transactions_from: StorageLocation,
|
||||
remove_from: StorageLocation,
|
||||
) -> ProviderResult<()>;
|
||||
}
|
||||
|
||||
@@ -57,17 +63,17 @@ impl<T: BlockExecutionWriter> BlockExecutionWriter for &T {
|
||||
fn take_block_and_execution_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_transactions_from: StorageLocation,
|
||||
remove_from: StorageLocation,
|
||||
) -> ProviderResult<Chain<Self::Primitives>> {
|
||||
(*self).take_block_and_execution_above(block, remove_transactions_from)
|
||||
(*self).take_block_and_execution_above(block, remove_from)
|
||||
}
|
||||
|
||||
fn remove_block_and_execution_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_transactions_from: StorageLocation,
|
||||
remove_from: StorageLocation,
|
||||
) -> ProviderResult<()> {
|
||||
(*self).remove_block_and_execution_above(block, remove_transactions_from)
|
||||
(*self).remove_block_and_execution_above(block, remove_from)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -40,9 +40,17 @@ pub trait StateChangeWriter {
|
||||
|
||||
/// Remove the block range of state above the given block. The state of the passed block is not
|
||||
/// removed.
|
||||
fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()>;
|
||||
fn remove_state_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_receipts_from: StorageLocation,
|
||||
) -> ProviderResult<()>;
|
||||
|
||||
/// Take the block range of state, recreating the [`ExecutionOutcome`]. The state of the passed
|
||||
/// block is not removed.
|
||||
fn take_state_above(&self, block: BlockNumber) -> ProviderResult<ExecutionOutcome>;
|
||||
fn take_state_above(
|
||||
&self,
|
||||
block: BlockNumber,
|
||||
remove_receipts_from: StorageLocation,
|
||||
) -> ProviderResult<ExecutionOutcome>;
|
||||
}
|
||||
|
||||
@@ -189,25 +189,16 @@ where
|
||||
/// database and static files. This is exclusive, i.e., it only removes blocks above
|
||||
/// `block_number`, and does not remove `block_number`.
|
||||
pub fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> {
|
||||
// IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block
|
||||
debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number");
|
||||
self.database().remove_block_and_execution_above(block_number, StorageLocation::Both)?;
|
||||
|
||||
// Get highest static file block for the total block range
|
||||
let highest_static_file_block = self
|
||||
.static_file()
|
||||
.get_highest_static_file_block(StaticFileSegment::Headers)
|
||||
.expect("todo: error handling, headers should exist");
|
||||
|
||||
// Get the total txs for the block range, so we have the correct number of columns for
|
||||
// receipts and transactions
|
||||
// IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block
|
||||
let tx_range = self
|
||||
.database()
|
||||
.transaction_range_by_block_range(block_number + 1..=highest_static_file_block)?;
|
||||
// We are using end + 1 - start here because the returned range is inclusive.
|
||||
let total_txs = (tx_range.end() + 1).saturating_sub(*tx_range.start());
|
||||
|
||||
// IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block
|
||||
debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number");
|
||||
self.database().remove_block_and_execution_above(block_number, StorageLocation::Both)?;
|
||||
|
||||
// IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
|
||||
// we remove only what is ABOVE the block.
|
||||
//
|
||||
@@ -218,12 +209,6 @@ where
|
||||
.get_writer(block_number, StaticFileSegment::Headers)?
|
||||
.prune_headers(highest_static_file_block.saturating_sub(block_number))?;
|
||||
|
||||
if !self.database().prune_modes_ref().has_receipts_pruning() {
|
||||
self.static_file()
|
||||
.get_writer(block_number, StaticFileSegment::Receipts)?
|
||||
.prune_receipts(total_txs, block_number)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user