fix(stages): check for empty block range and return previous checkpoint (#3008)

This commit is contained in:
Alexey Shekhirin
2023-06-09 20:58:03 +04:00
committed by GitHub
parent 4bfbd3a53d
commit 8a2281e4c9
12 changed files with 37 additions and 30 deletions

View File

@@ -148,9 +148,9 @@ pub struct ExecOutput {
}
impl ExecOutput {
/// Mark the stage as done, checkpointing at the given block number.
pub fn done(block_number: BlockNumber) -> Self {
Self { checkpoint: StageCheckpoint::new(block_number), done: true }
/// Mark the stage as done, checkpointing at the given place.
pub fn done(checkpoint: StageCheckpoint) -> Self {
Self { checkpoint, done: true }
}
}

View File

@@ -70,13 +70,11 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let range = input.next_block_range();
if range.is_empty() {
let (from, to) = range.into_inner();
info!(target: "sync::stages::bodies", from, "Target block already downloaded, skipping.");
return Ok(ExecOutput::done(to))
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let range = input.next_block_range();
// Update the header range on the downloader
self.downloader.set_download_range(range.clone())?;
let (from_block, to_block) = range.into_inner();

View File

@@ -138,6 +138,10 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let start_block = input.next_block();
let max_block = input.target();
@@ -338,7 +342,9 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if range.is_empty() {
return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
return Ok(UnwindOutput {
checkpoint: input.checkpoint.with_block_number(input.unwind_to),
})
}
// get all batches for account change

View File

@@ -135,11 +135,11 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let range = input.next_block_range();
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (from_block, to_block) = range.into_inner();
let (from_block, to_block) = input.next_block_range().into_inner();
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to

View File

@@ -57,11 +57,11 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let range = input.next_block_range();
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (from_block, to_block) = range.into_inner();
let (from_block, to_block) = input.next_block_range().into_inner();
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to

View File

@@ -205,7 +205,7 @@ where
// Nothing to sync
if gap.is_closed() {
info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached");
return Ok(ExecOutput { checkpoint: current_checkpoint, done: true })
return Ok(ExecOutput::done(current_checkpoint))
}
debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync");

View File

@@ -41,12 +41,12 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?;
let indices = tx.get_account_transition_ids_from_changeset(range.clone())?;

View File

@@ -44,13 +44,12 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let target = input.target();
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
if range.is_empty() {
return Ok(ExecOutput::done(target))
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?;
let indices = tx.get_storage_transition_ids_from_changeset(range.clone())?;

View File

@@ -149,7 +149,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let threshold = match self {
MerkleStage::Unwind => {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(ExecOutput::done(input.target()))
return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
}
MerkleStage::Execution { clean_threshold } => *clean_threshold,
#[cfg(any(test, feature = "test-utils"))]

View File

@@ -60,7 +60,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint().block_number))
return Ok(ExecOutput::done(input.checkpoint()))
}
let (tx_range, block_range, is_final_range) =

View File

@@ -54,6 +54,10 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let (start_block, end_block) = range.clone().into_inner();

View File

@@ -56,7 +56,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint().block_number))
return Ok(ExecOutput::done(input.checkpoint()))
}
let (tx_range, block_range, is_final_range) =