mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
fix: still delete headers from db in headers unwind (#7579)
This commit is contained in:
@@ -321,28 +321,48 @@ where
|
||||
) -> Result<UnwindOutput, StageError> {
|
||||
self.sync_gap.take();
|
||||
|
||||
// First unwind the db tables, until the unwind_to block number. use the walker to unwind
|
||||
// HeaderNumbers based on the index in CanonicalHeaders
|
||||
provider.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
|
||||
input.unwind_to,
|
||||
)?;
|
||||
provider.unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
|
||||
provider.unwind_table_by_num::<tables::HeaderTerminalDifficulties>(input.unwind_to)?;
|
||||
let unfinalized_headers_unwound =
|
||||
provider.unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
|
||||
|
||||
// determine how many headers to unwind from the static files based on the highest block and
|
||||
// the unwind_to block
|
||||
let static_file_provider = provider.static_file_provider();
|
||||
let highest_block = static_file_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::Headers)
|
||||
.unwrap_or_default();
|
||||
let unwound_headers = highest_block - input.unwind_to;
|
||||
|
||||
for block in (input.unwind_to + 1)..=highest_block {
|
||||
let header_hash = static_file_provider
|
||||
.block_hash(block)?
|
||||
.ok_or(ProviderError::HeaderNotFound(block.into()))?;
|
||||
|
||||
provider.tx_ref().delete::<tables::HeaderNumbers>(header_hash, None)?;
|
||||
let static_file_headers_to_unwind = highest_block - input.unwind_to;
|
||||
for block_number in (input.unwind_to + 1)..=highest_block {
|
||||
let hash = static_file_provider.block_hash(block_number)?;
|
||||
// we have to delete from HeaderNumbers here as well as in the above unwind, since that
|
||||
// mapping contains entries for both headers in the db and headers in static files
|
||||
//
|
||||
// so if we are unwinding past the lowest block in the db, we have to iterate through
|
||||
// the HeaderNumbers entries that we'll delete in static files below
|
||||
if let Some(header_hash) = hash {
|
||||
provider.tx_ref().delete::<tables::HeaderNumbers>(header_hash, None)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Now unwind the static files until the unwind_to block number
|
||||
let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
|
||||
writer.prune_headers(unwound_headers)?;
|
||||
writer.prune_headers(static_file_headers_to_unwind)?;
|
||||
|
||||
// Set the stage checkpoin entities processed based on how much we unwound - we add the
|
||||
// headers unwound from static files and db
|
||||
let stage_checkpoint =
|
||||
input.checkpoint.headers_stage_checkpoint().map(|stage_checkpoint| HeadersCheckpoint {
|
||||
block_range: stage_checkpoint.block_range,
|
||||
progress: EntitiesCheckpoint {
|
||||
processed: stage_checkpoint.progress.processed.saturating_sub(unwound_headers),
|
||||
processed: stage_checkpoint.progress.processed.saturating_sub(
|
||||
static_file_headers_to_unwind + unfinalized_headers_unwound as u64,
|
||||
),
|
||||
total: stage_checkpoint.progress.total,
|
||||
},
|
||||
});
|
||||
@@ -363,9 +383,12 @@ mod tests {
|
||||
stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
|
||||
};
|
||||
use assert_matches::assert_matches;
|
||||
use reth_interfaces::test_utils::generators::random_header;
|
||||
use reth_primitives::{stage::StageUnitCheckpoint, B256};
|
||||
use reth_provider::ProviderFactory;
|
||||
use reth_interfaces::test_utils::generators::{self, random_header, random_header_range};
|
||||
use reth_primitives::{
|
||||
stage::StageUnitCheckpoint, BlockBody, SealedBlock, SealedBlockWithSenders, B256,
|
||||
};
|
||||
use reth_provider::{BlockHashReader, BlockWriter, BundleStateWithReceipts, ProviderFactory};
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState};
|
||||
use test_runner::HeadersTestRunner;
|
||||
|
||||
mod test_runner {
|
||||
@@ -376,9 +399,7 @@ mod tests {
|
||||
use reth_downloaders::headers::reverse_headers::{
|
||||
ReverseHeadersDownloader, ReverseHeadersDownloaderBuilder,
|
||||
};
|
||||
use reth_interfaces::test_utils::{
|
||||
generators, generators::random_header_range, TestHeaderDownloader, TestHeadersClient,
|
||||
};
|
||||
use reth_interfaces::test_utils::{TestHeaderDownloader, TestHeadersClient};
|
||||
use reth_provider::BlockNumReader;
|
||||
use tokio::sync::watch;
|
||||
|
||||
@@ -551,6 +572,91 @@ mod tests {
|
||||
|
||||
stage_test_suite!(HeadersTestRunner, headers);
|
||||
|
||||
/// Execute the stage with linear downloader, unwinds, and ensures that the database tables
|
||||
/// along with the static files are cleaned up.
|
||||
#[tokio::test]
|
||||
async fn execute_with_linear_downloader_unwind() {
|
||||
let mut runner = HeadersTestRunner::with_linear_downloader();
|
||||
let (checkpoint, previous_stage) = (1000, 1200);
|
||||
let input = ExecInput {
|
||||
target: Some(previous_stage),
|
||||
checkpoint: Some(StageCheckpoint::new(checkpoint)),
|
||||
};
|
||||
let headers = runner.seed_execution(input).expect("failed to seed execution");
|
||||
let rx = runner.execute(input);
|
||||
|
||||
runner.client.extend(headers.iter().rev().map(|h| h.clone().unseal())).await;
|
||||
|
||||
// skip `after_execution` hook for linear downloader
|
||||
let tip = headers.last().unwrap();
|
||||
runner.send_tip(tip.hash());
|
||||
|
||||
let result = rx.await.unwrap();
|
||||
runner.db().factory.static_file_provider().commit().unwrap();
|
||||
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint {
|
||||
block_number,
|
||||
stage_checkpoint: Some(StageUnitCheckpoint::Headers(HeadersCheckpoint {
|
||||
block_range: CheckpointBlockRange {
|
||||
from,
|
||||
to
|
||||
},
|
||||
progress: EntitiesCheckpoint {
|
||||
processed,
|
||||
total,
|
||||
}
|
||||
}))
|
||||
}, done: true }) if block_number == tip.number &&
|
||||
from == checkpoint && to == previous_stage &&
|
||||
// -1 because we don't need to download the local head
|
||||
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number
|
||||
);
|
||||
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
|
||||
assert!(runner.stage().hash_collector.is_empty());
|
||||
assert!(runner.stage().header_collector.is_empty());
|
||||
|
||||
// let's insert some blocks using append_blocks_with_state
|
||||
let sealed_headers =
|
||||
random_header_range(&mut generators::rng(), tip.number..tip.number + 10, tip.hash());
|
||||
|
||||
// make them sealed blocks with senders by converting them to empty blocks
|
||||
let sealed_blocks = sealed_headers
|
||||
.iter()
|
||||
.map(|header| {
|
||||
SealedBlockWithSenders::new(
|
||||
SealedBlock::new(header.clone(), BlockBody::default()),
|
||||
vec![],
|
||||
)
|
||||
.unwrap()
|
||||
})
|
||||
.collect();
|
||||
|
||||
// append the blocks
|
||||
let provider = runner.db().factory.provider_rw().unwrap();
|
||||
provider
|
||||
.append_blocks_with_state(
|
||||
sealed_blocks,
|
||||
BundleStateWithReceipts::default(),
|
||||
HashedPostState::default(),
|
||||
TrieUpdates::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
|
||||
// now we can unwind 10 blocks
|
||||
let unwind_input = UnwindInput {
|
||||
checkpoint: StageCheckpoint::new(tip.number + 10),
|
||||
unwind_to: tip.number,
|
||||
bad_block: None,
|
||||
};
|
||||
|
||||
let unwind_output = runner.unwind(unwind_input).await.unwrap();
|
||||
assert_eq!(unwind_output.checkpoint.block_number, tip.number);
|
||||
|
||||
// validate the unwind, ensure that the tables are cleaned up
|
||||
assert!(runner.validate_unwind(unwind_input).is_ok());
|
||||
}
|
||||
|
||||
/// Execute the stage with linear downloader
|
||||
#[tokio::test]
|
||||
async fn execute_with_linear_downloader() {
|
||||
|
||||
Reference in New Issue
Block a user