refactor(stages): input target reached & output done checks (#3106)

This commit is contained in:
Alexey Shekhirin
2023-06-12 20:12:19 +04:00
committed by GitHub
parent e0cdb0bc0b
commit 9a8c680e0f
26 changed files with 313 additions and 379 deletions

View File

@@ -119,30 +119,22 @@ impl Command {
let mut account_hashing_done = false;
while !account_hashing_done {
let output = account_hashing_stage
.execute(
&mut tx,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
account_hashing_done = output.done;
let input = ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
};
let output = account_hashing_stage.execute(&mut tx, input).await?;
account_hashing_done = output.is_done(input);
}
let mut storage_hashing_done = false;
while !storage_hashing_done {
let output = storage_hashing_stage
.execute(
&mut tx,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
storage_hashing_done = output.done;
let input = ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
};
let output = storage_hashing_stage.execute(&mut tx, input).await?;
storage_hashing_done = output.is_done(input);
}
let incremental_result = merkle_stage
@@ -170,7 +162,7 @@ impl Command {
loop {
let clean_result = merkle_stage.execute(&mut tx, clean_input).await;
assert!(clean_result.is_ok(), "Clean state root calculation failed");
if clean_result.unwrap().done {
if clean_result.unwrap().is_done(clean_input) {
break
}
}

View File

@@ -72,7 +72,8 @@ impl NodeState {
pipeline_position,
pipeline_total,
stage_id,
result: ExecOutput { checkpoint, done },
result: ExecOutput { checkpoint },
done,
} => {
self.current_checkpoint = checkpoint;

View File

@@ -76,16 +76,11 @@ async fn dry_run(
let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input);
}
tx.drop()?;

View File

@@ -73,16 +73,11 @@ async fn dry_run(
let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input);
}
tx.drop()?;

View File

@@ -116,19 +116,17 @@ async fn dry_run(
let mut tx = Transaction::new(&output_db)?;
let mut exec_output = false;
while !exec_output {
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = MerkleStage::Execution {
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating from
* scratch */
// Forces updating the root instead of calculating from scratch
clean_threshold: u64::MAX,
}
.execute(
&mut tx,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.execute(&mut tx, exec_input)
.await?
.done;
.is_done(exec_input);
}
tx.drop()?;

View File

@@ -20,7 +20,7 @@ use reth_stages::{
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
},
ExecInput, ExecOutput, Stage, UnwindInput,
ExecInput, Stage, UnwindInput,
};
use std::{any::Any, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc};
use tracing::*;
@@ -236,10 +236,13 @@ impl Command {
checkpoint: Some(checkpoint.with_block_number(self.from)),
};
while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut tx, input).await?
{
input.checkpoint = Some(stage_progress);
loop {
let result = exec_stage.execute(&mut tx, input).await?;
if result.is_done(input) {
break
}
input.checkpoint = Some(result.checkpoint);
if self.commit {
tx.commit()?;