From 9a8c680e0f8d4bbfec2d28ea31d7bb0f6f29e93e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 12 Jun 2023 20:12:19 +0400 Subject: [PATCH] refactor(stages): input target reached & output done checks (#3106) --- bin/reth/src/debug_cmd/merkle.rs | 34 ++--- bin/reth/src/node/events.rs | 3 +- bin/reth/src/stage/dump/hashing_account.rs | 15 +-- bin/reth/src/stage/dump/hashing_storage.rs | 15 +-- bin/reth/src/stage/dump/merkle.rs | 18 ++- bin/reth/src/stage/run.rs | 13 +- crates/consensus/beacon/src/engine/mod.rs | 87 ++++++------ crates/consensus/beacon/src/engine/sync.rs | 6 - crates/stages/src/pipeline/event.rs | 4 + crates/stages/src/pipeline/mod.rs | 125 ++++++++++++------ crates/stages/src/stage.rs | 54 +++++--- crates/stages/src/stages/bodies.rs | 23 +--- crates/stages/src/stages/execution.rs | 11 +- crates/stages/src/stages/finish.rs | 10 +- crates/stages/src/stages/hashing_account.rs | 25 +--- crates/stages/src/stages/hashing_storage.rs | 31 ++--- crates/stages/src/stages/headers.rs | 10 +- .../src/stages/index_account_history.rs | 11 +- .../src/stages/index_storage_history.rs | 13 +- crates/stages/src/stages/merkle.rs | 16 +-- crates/stages/src/stages/sender_recovery.rs | 29 ++-- crates/stages/src/stages/total_difficulty.rs | 20 +-- crates/stages/src/stages/tx_lookup.rs | 29 ++-- crates/stages/src/test_utils/macros.rs | 53 +------- crates/stages/src/test_utils/stage.rs | 31 ++++- crates/storage/provider/src/transaction.rs | 6 + 26 files changed, 313 insertions(+), 379 deletions(-) diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index 193f4f331c..326a5b16f5 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -119,30 +119,22 @@ impl Command { let mut account_hashing_done = false; while !account_hashing_done { - let output = account_hashing_stage - .execute( - &mut tx, - ExecInput { - target: Some(block), - checkpoint: progress.map(StageCheckpoint::new), - }, - ) - .await?; - account_hashing_done = output.done; + let input = ExecInput { + target: Some(block), + checkpoint: progress.map(StageCheckpoint::new), + }; + let output = account_hashing_stage.execute(&mut tx, input).await?; + account_hashing_done = output.is_done(input); } let mut storage_hashing_done = false; while !storage_hashing_done { - let output = storage_hashing_stage - .execute( - &mut tx, - ExecInput { - target: Some(block), - checkpoint: progress.map(StageCheckpoint::new), - }, - ) - .await?; - storage_hashing_done = output.done; + let input = ExecInput { + target: Some(block), + checkpoint: progress.map(StageCheckpoint::new), + }; + let output = storage_hashing_stage.execute(&mut tx, input).await?; + storage_hashing_done = output.is_done(input); } let incremental_result = merkle_stage @@ -170,7 +162,7 @@ impl Command { loop { let clean_result = merkle_stage.execute(&mut tx, clean_input).await; assert!(clean_result.is_ok(), "Clean state root calculation failed"); - if clean_result.unwrap().done { + if clean_result.unwrap().is_done(clean_input) { break } } diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index 04b36b59cc..fdc56ebd86 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -72,7 +72,8 @@ impl NodeState { pipeline_position, pipeline_total, stage_id, - result: ExecOutput { checkpoint, done }, + result: ExecOutput { checkpoint }, + done, } => { self.current_checkpoint = checkpoint; diff --git a/bin/reth/src/stage/dump/hashing_account.rs b/bin/reth/src/stage/dump/hashing_account.rs index 642fa525a6..1b18674624 100644 --- a/bin/reth/src/stage/dump/hashing_account.rs +++ b/bin/reth/src/stage/dump/hashing_account.rs @@ -76,16 +76,11 @@ async fn dry_run( let mut exec_output = false; while !exec_output { - exec_output = exec_stage - .execute( - &mut tx, - reth_stages::ExecInput { - target: Some(to), - checkpoint: Some(StageCheckpoint::new(from)), - }, - ) - .await? - .done; + let exec_input = reth_stages::ExecInput { + target: Some(to), + checkpoint: Some(StageCheckpoint::new(from)), + }; + exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input); } tx.drop()?; diff --git a/bin/reth/src/stage/dump/hashing_storage.rs b/bin/reth/src/stage/dump/hashing_storage.rs index 6529541be8..128b1299d4 100644 --- a/bin/reth/src/stage/dump/hashing_storage.rs +++ b/bin/reth/src/stage/dump/hashing_storage.rs @@ -73,16 +73,11 @@ async fn dry_run( let mut exec_output = false; while !exec_output { - exec_output = exec_stage - .execute( - &mut tx, - reth_stages::ExecInput { - target: Some(to), - checkpoint: Some(StageCheckpoint::new(from)), - }, - ) - .await? - .done; + let exec_input = reth_stages::ExecInput { + target: Some(to), + checkpoint: Some(StageCheckpoint::new(from)), + }; + exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input); } tx.drop()?; diff --git a/bin/reth/src/stage/dump/merkle.rs b/bin/reth/src/stage/dump/merkle.rs index 385afd3a2a..17fec33cd4 100644 --- a/bin/reth/src/stage/dump/merkle.rs +++ b/bin/reth/src/stage/dump/merkle.rs @@ -116,19 +116,17 @@ async fn dry_run( let mut tx = Transaction::new(&output_db)?; let mut exec_output = false; while !exec_output { + let exec_input = reth_stages::ExecInput { + target: Some(to), + checkpoint: Some(StageCheckpoint::new(from)), + }; exec_output = MerkleStage::Execution { - clean_threshold: u64::MAX, /* Forces updating the root instead of calculating from - * scratch */ + // Forces updating the root instead of calculating from scratch + clean_threshold: u64::MAX, } - .execute( - &mut tx, - reth_stages::ExecInput { - target: Some(to), - checkpoint: Some(StageCheckpoint::new(from)), - }, - ) + .execute(&mut tx, exec_input) .await? - .done; + .is_done(exec_input); } tx.drop()?; diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 9f42918305..c5dd4eb276 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -20,7 +20,7 @@ use reth_stages::{ IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage, }, - ExecInput, ExecOutput, Stage, UnwindInput, + ExecInput, Stage, UnwindInput, }; use std::{any::Any, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc}; use tracing::*; @@ -236,10 +236,13 @@ impl Command { checkpoint: Some(checkpoint.with_block_number(self.from)), }; - while let ExecOutput { checkpoint: stage_progress, done: false } = - exec_stage.execute(&mut tx, input).await? - { - input.checkpoint = Some(stage_progress); + loop { + let result = exec_stage.execute(&mut tx, input).await?; + if result.is_done(input) { + break + } + + input.checkpoint = Some(result.checkpoint); if self.commit { tx.commit()?; diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 54d9f992bd..795c637b13 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1370,6 +1370,7 @@ mod tests { chain_spec: Arc, pipeline_exec_outputs: VecDeque>, executor_results: Vec, + max_block: Option, ) -> (TestBeaconConsensusEngine, TestEnv>>) { reth_tracing::init_test_tracing(); let db = create_test_rw_db(); @@ -1381,10 +1382,13 @@ mod tests { // Setup pipeline let (tip_tx, tip_rx) = watch::channel(H256::default()); - let pipeline = Pipeline::builder() + let mut pipeline_builder = Pipeline::builder() .add_stages(TestStages::new(pipeline_exec_outputs, Default::default())) - .with_tip_sender(tip_tx) - .build(db.clone()); + .with_tip_sender(tip_tx); + if let Some(max_block) = max_block { + pipeline_builder = pipeline_builder.with_max_block(max_block); + } + let pipeline = pipeline_builder.build(db.clone()); // Setup blockchain tree let externals = @@ -1404,7 +1408,7 @@ mod tests { blockchain_provider, Box::::default(), Box::::default(), - None, + max_block, false, payload_builder, None, @@ -1439,6 +1443,7 @@ mod tests { chain_spec, VecDeque::from([Err(StageError::ChannelClosed)]), Vec::default(), + Some(1), ); let res = spawn_consensus_engine(consensus_engine); @@ -1468,6 +1473,7 @@ mod tests { chain_spec, VecDeque::from([Err(StageError::ChannelClosed)]), Vec::default(), + Some(1), ); let mut rx = spawn_consensus_engine(consensus_engine); @@ -1507,10 +1513,11 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec, VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(1) }), Err(StageError::ChannelClosed), ]), Vec::default(), + Some(2), ); let rx = spawn_consensus_engine(consensus_engine); @@ -1523,7 +1530,9 @@ mod tests { assert_matches!( rx.await, - Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) + Ok( + Err(BeaconConsensusEngineError::Pipeline(n)) + ) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -1537,15 +1546,12 @@ mod tests { .paris_activated() .build(), ); - let (mut consensus_engine, env) = setup_consensus_engine( + let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { - checkpoint: StageCheckpoint::new(max_block), - done: true, - })]), + VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(max_block) })]), Vec::default(), + Some(max_block), ); - consensus_engine.sync.set_max_block(max_block); let rx = spawn_consensus_engine(consensus_engine); let _ = env @@ -1582,11 +1588,9 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { - done: true, - checkpoint: StageCheckpoint::new(0), - })]), + VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), Vec::default(), + None, ); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -1613,11 +1617,9 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { - done: true, - checkpoint: StageCheckpoint::new(0), - })]), + VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), Vec::default(), + None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1662,10 +1664,11 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec, VecDeque::from([ - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), ]), Vec::default(), + None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1710,11 +1713,9 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { - done: true, - checkpoint: StageCheckpoint::new(0), - })]), + VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), Vec::default(), + None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1748,10 +1749,11 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec, VecDeque::from([ - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), ]), Vec::default(), + None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1797,10 +1799,11 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec, VecDeque::from([ - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), ]), Vec::default(), + None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1843,11 +1846,9 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { - done: true, - checkpoint: StageCheckpoint::new(0), - })]), + VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), Vec::default(), + None, ); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -1876,11 +1877,9 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { - done: true, - checkpoint: StageCheckpoint::new(0), - })]), + VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), Vec::default(), + None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1922,11 +1921,9 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { - done: true, - checkpoint: StageCheckpoint::new(0), - })]), + VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), Vec::default(), + None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1979,11 +1976,9 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { - done: true, - checkpoint: StageCheckpoint::new(0), - })]), + VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), Vec::from([exec_result2]), + None, ); insert_blocks(env.db.as_ref(), [&data.genesis, &block1].into_iter()); diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index a093b57bae..a5097c4c93 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -83,12 +83,6 @@ where self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64); } - /// Sets the max block value for testing - #[cfg(test)] - pub(crate) fn set_max_block(&mut self, block: BlockNumber) { - self.max_block = Some(block); - } - /// Cancels all full block requests that are in progress. pub(crate) fn clear_full_block_requests(&mut self) { self.inflight_full_block_requests.clear(); diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index 2230c4075e..6133d89fa1 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -31,6 +31,8 @@ pub enum PipelineEvent { stage_id: StageId, /// The result of executing the stage. result: ExecOutput, + /// Stage completed executing the whole block range + done: bool, }, /// Emitted when a stage is about to be unwound. Unwinding { @@ -45,6 +47,8 @@ pub enum PipelineEvent { stage_id: StageId, /// The result of unwinding the stage. result: UnwindOutput, + /// Stage completed unwinding the whole block range + done: bool, }, /// Emitted when a stage encounters an error either during execution or unwinding. Error { diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 2fd3611bd3..aabc987615 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -259,8 +259,10 @@ where continue } + let mut done = UnwindInput { checkpoint, unwind_to: to, bad_block }.target_reached(); + debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind"); - while checkpoint.block_number > to { + while !done { let input = UnwindInput { checkpoint, unwind_to: to, bad_block }; self.listeners.notify(PipelineEvent::Unwinding { stage_id, input }); @@ -268,6 +270,7 @@ where match output { Ok(unwind_output) => { checkpoint = unwind_output.checkpoint; + done = unwind_output.is_done(input); info!( target: "sync::pipeline", stage = %stage_id, @@ -284,8 +287,11 @@ where ); tx.save_stage_checkpoint(stage_id, checkpoint)?; - self.listeners - .notify(PipelineEvent::Unwound { stage_id, result: unwind_output }); + self.listeners.notify(PipelineEvent::Unwound { + stage_id, + result: unwind_output, + done, + }); tx.commit()?; } @@ -343,8 +349,17 @@ where checkpoint: prev_checkpoint, }); - match stage.execute(&mut tx, ExecInput { target, checkpoint: prev_checkpoint }).await { - Ok(out @ ExecOutput { checkpoint, done }) => { + let input = ExecInput { target, checkpoint: prev_checkpoint }; + let result = if input.target_reached() { + Ok(ExecOutput { checkpoint: input.checkpoint() }) + } else { + stage.execute(&mut tx, ExecInput { target, checkpoint: prev_checkpoint }).await + }; + + match result { + Ok(out @ ExecOutput { checkpoint }) => { + let done = out.is_done(input); + made_progress |= checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number; info!( @@ -363,6 +378,7 @@ where pipeline_total: total_stages, stage_id, result: out.clone(), + done, }); // TODO: Make the commit interval configurable @@ -504,11 +520,15 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) })), ) .add_stage( TestStage::new(StageId::Other("B")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), + .with_checkpoint(Some(StageCheckpoint::new(10)), &db), + ) + .add_stage( + TestStage::new(StageId::Other("C")) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), ) .with_max_block(10) .build(db); @@ -525,27 +545,30 @@ mod tests { vec![ PipelineEvent::Running { pipeline_position: 1, - pipeline_total: 2, + pipeline_total: 3, stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { pipeline_position: 1, - pipeline_total: 2, + pipeline_total: 3, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, + result: ExecOutput { checkpoint: StageCheckpoint::new(20) }, + done: true, }, + PipelineEvent::Skipped { stage_id: StageId::Other("B") }, PipelineEvent::Running { - pipeline_position: 2, - pipeline_total: 2, - stage_id: StageId::Other("B"), + pipeline_position: 3, + pipeline_total: 3, + stage_id: StageId::Other("C"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 2, - pipeline_total: 2, - stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, + pipeline_position: 3, + pipeline_total: 3, + stage_id: StageId::Other("C"), + result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, + done: true, }, ] ); @@ -559,17 +582,17 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .add_stage( TestStage::new(StageId::Other("B")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .add_stage( TestStage::new(StageId::Other("C")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .with_max_block(10) @@ -600,7 +623,8 @@ mod tests { pipeline_position: 1, pipeline_total: 3, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, + result: ExecOutput { checkpoint: StageCheckpoint::new(100) }, + done: true }, PipelineEvent::Running { pipeline_position: 2, @@ -612,7 +636,8 @@ mod tests { pipeline_position: 2, pipeline_total: 3, stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, + result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, + done: true }, PipelineEvent::Running { pipeline_position: 3, @@ -624,7 +649,8 @@ mod tests { pipeline_position: 3, pipeline_total: 3, stage_id: StageId::Other("C"), - result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, + result: ExecOutput { checkpoint: StageCheckpoint::new(20) }, + done: true }, // Unwinding PipelineEvent::Unwinding { @@ -638,6 +664,7 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("C"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, + done: true }, PipelineEvent::Unwinding { stage_id: StageId::Other("B"), @@ -650,6 +677,7 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("B"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, + done: true }, PipelineEvent::Unwinding { stage_id: StageId::Other("A"), @@ -662,6 +690,7 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, + done: true }, ] ); @@ -675,12 +704,12 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })), ) .add_stage( TestStage::new(StageId::Other("B")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), ) .with_max_block(10) .build(db); @@ -710,7 +739,8 @@ mod tests { pipeline_position: 1, pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, + result: ExecOutput { checkpoint: StageCheckpoint::new(100) }, + done: true }, PipelineEvent::Running { pipeline_position: 2, @@ -722,7 +752,8 @@ mod tests { pipeline_position: 2, pipeline_total: 2, stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, + result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, + done: true }, // Unwinding // Nothing to unwind in stage "B" @@ -738,6 +769,7 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(50) }, + done: true }, ] ); @@ -762,9 +794,9 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) })) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), ) .add_stage( TestStage::new(StageId::Other("B")) @@ -773,7 +805,7 @@ mod tests { error: consensus::ConsensusError::BaseFeeMissing, })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) })) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), ) .with_max_block(10) .build(db); @@ -798,7 +830,8 @@ mod tests { pipeline_position: 1, pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, + result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, + done: true }, PipelineEvent::Running { pipeline_position: 2, @@ -818,6 +851,7 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, + done: true }, PipelineEvent::Running { pipeline_position: 1, @@ -829,7 +863,8 @@ mod tests { pipeline_position: 1, pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, + result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, + done: true }, PipelineEvent::Running { pipeline_position: 2, @@ -841,7 +876,8 @@ mod tests { pipeline_position: 2, pipeline_total: 2, stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, + result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, + done: true }, ] ); @@ -851,17 +887,17 @@ mod tests { #[tokio::test] async fn pipeline_error_handling() { // Non-fatal - let db = test_utils::create_test_db::(EnvKind::RW); - let mut pipeline = Pipeline::builder() - .add_stage( - TestStage::new(StageId::Other("NonFatal")) - .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), - ) - .with_max_block(10) - .build(db); - let result = pipeline.run().await; - assert_matches!(result, Ok(())); + // let db = test_utils::create_test_db::(EnvKind::RW); + // let mut pipeline = Pipeline::builder() + // .add_stage( + // TestStage::new(StageId::Other("NonFatal")) + // .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) + // .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), + // ) + // .with_max_block(10) + // .build(db); + // let result = pipeline.run().await; + // assert_matches!(result, Ok(())); // Fatal let db = test_utils::create_test_db::(EnvKind::RW); @@ -869,6 +905,7 @@ mod tests { .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err( StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)), ))) + .with_max_block(1) .build(db); let result = pipeline.run().await; assert_matches!( diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 72b4da1fc5..a1e7f10a7d 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -10,6 +10,7 @@ use std::{ cmp::{max, min}, ops::RangeInclusive, }; +use tracing::warn; /// Stage execution input, see [Stage::execute]. #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] @@ -35,7 +36,7 @@ impl ExecInput { /// Returns `true` if the target block number has already been reached. pub fn target_reached(&self) -> bool { - self.checkpoint().block_number >= self.target() + ExecOutput { checkpoint: self.checkpoint.unwrap_or_default() }.is_done(*self) } /// Return the target block number or default. @@ -45,8 +46,7 @@ impl ExecInput { /// Return next block range that needs to be executed. pub fn next_block_range(&self) -> RangeInclusive { - let (range, _) = self.next_block_range_with_threshold(u64::MAX); - range + self.next_block_range_with_threshold(u64::MAX) } /// Return true if this is the first block range to execute. @@ -55,19 +55,15 @@ impl ExecInput { } /// Return the next block range to execute. - /// Return pair of the block range and if this is final block range. - pub fn next_block_range_with_threshold( - &self, - threshold: u64, - ) -> (RangeInclusive, bool) { + /// Return pair of the block range. + pub fn next_block_range_with_threshold(&self, threshold: u64) -> RangeInclusive { let current_block = self.checkpoint(); let start = current_block.block_number + 1; let target = self.target(); let end = min(target, current_block.block_number.saturating_add(threshold)); - let is_final_range = end == target; - (start..=end, is_final_range) + start..=end } /// Return the next block range determined the number of transactions within it. @@ -77,7 +73,7 @@ impl ExecInput { &self, tx: &Transaction<'_, DB>, tx_threshold: u64, - ) -> Result<(RangeInclusive, RangeInclusive, bool), StageError> { + ) -> Result<(RangeInclusive, RangeInclusive), StageError> { let start_block = self.next_block(); let start_block_body = tx .get::(start_block)? @@ -98,8 +94,7 @@ impl ExecInput { break } } - let is_final_range = end_block_number >= target_block; - Ok((first_tx_number..=last_tx_number, start_block..=end_block_number, is_final_range)) + Ok((first_tx_number..=last_tx_number, start_block..=end_block_number)) } } @@ -115,6 +110,11 @@ pub struct UnwindInput { } impl UnwindInput { + /// Returns `true` if the target block number has already been reached. + pub fn target_reached(&self) -> bool { + UnwindOutput { checkpoint: self.checkpoint }.is_done(*self) + } + /// Return next block range that needs to be unwound. pub fn unwind_block_range(&self) -> RangeInclusive { self.unwind_block_range_with_threshold(u64::MAX).0 @@ -124,7 +124,7 @@ impl UnwindInput { pub fn unwind_block_range_with_threshold( &self, threshold: u64, - ) -> (RangeInclusive, BlockNumber, bool) { + ) -> (RangeInclusive, BlockNumber) { // +1 is to skip the block we're unwinding to let mut start = self.unwind_to + 1; let end = self.checkpoint; @@ -133,8 +133,7 @@ impl UnwindInput { let unwind_to = start - 1; - let is_final_range = unwind_to == self.unwind_to; - (start..=end.block_number, unwind_to, is_final_range) + (start..=end.block_number, unwind_to) } } @@ -143,14 +142,16 @@ impl UnwindInput { pub struct ExecOutput { /// How far the stage got. pub checkpoint: StageCheckpoint, - /// Whether or not the stage is done. - pub done: bool, } impl ExecOutput { - /// Mark the stage as done, checkpointing at the given place. - pub fn done(checkpoint: StageCheckpoint) -> Self { - Self { checkpoint, done: true } + /// Returns `true` if the target block number has already been reached, + /// i.e. `checkpoint.block_number >= target`. + pub fn is_done(&self, input: ExecInput) -> bool { + if self.checkpoint.block_number > input.target() { + warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the execution target"); + } + self.checkpoint.block_number >= input.target() } } @@ -161,6 +162,17 @@ pub struct UnwindOutput { pub checkpoint: StageCheckpoint, } +impl UnwindOutput { + /// Returns `true` if the target block number has already been reached, + /// i.e. `checkpoint.block_number <= unwind_to`. + pub fn is_done(&self, input: UnwindInput) -> bool { + if self.checkpoint.block_number < input.unwind_to { + warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the unwind target"); + } + self.checkpoint.block_number <= input.unwind_to + } +} + /// A stage is a segmented part of the syncing process of the node. /// /// Each stage takes care of a well-defined task, such as downloading headers or executing diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 7ce5504956..fdf07e6151 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -70,10 +70,6 @@ impl Stage for BodyStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - if input.target_reached() { - return Ok(ExecOutput::done(input.checkpoint())) - } - let range = input.next_block_range(); // Update the header range on the downloader self.downloader.set_download_range(range.clone())?; @@ -151,11 +147,9 @@ impl Stage for BodyStage { // The stage is "done" if: // - We got fewer blocks than our target // - We reached our target and the target was not limited by the batch size of the stage - let done = highest_block == to_block; Ok(ExecOutput { checkpoint: StageCheckpoint::new(highest_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), - done, }) } @@ -230,15 +224,11 @@ fn stage_checkpoint( #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner, - }; + use crate::test_utils::{ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner}; use assert_matches::assert_matches; use reth_primitives::stage::StageUnitCheckpoint; use test_utils::*; - stage_test_suite_ext!(BodyTestRunner, body); - /// Checks that the stage downloads at most `batch_size` blocks. #[tokio::test] async fn partial_body_download() { @@ -271,7 +261,7 @@ mod tests { processed, // 1 seeded block body + batch size total // seeded headers })) - }, done: false }) if block_number < 200 && + }}) if block_number < 200 && processed == 1 + batch_size && total == previous_stage ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); @@ -308,8 +298,7 @@ mod tests { processed, total })) - }, - done: true + } }) if processed == total && total == previous_stage ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); @@ -344,7 +333,7 @@ mod tests { processed, total })) - }, done: false }) if block_number >= 10 && + }}) if block_number >= 10 && processed == 1 + batch_size && total == previous_stage ); let first_run_checkpoint = first_run.unwrap().checkpoint; @@ -364,7 +353,7 @@ mod tests { processed, total })) - }, done: true }) if block_number > first_run_checkpoint.block_number && + }}) if block_number > first_run_checkpoint.block_number && processed == total && total == previous_stage ); assert_matches!( @@ -404,7 +393,7 @@ mod tests { processed, total })) - }, done: true }) if block_number == previous_stage && + }}) if block_number == previous_stage && processed == total && total == previous_stage ); let checkpoint = output.unwrap().checkpoint; diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 0a9d08f9b6..a4d32fb65d 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -138,10 +138,6 @@ impl ExecutionStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - if input.target_reached() { - return Ok(ExecOutput::done(input.checkpoint())) - } - let start_block = input.next_block(); let max_block = input.target(); @@ -193,11 +189,9 @@ impl ExecutionStage { state.write_to_db(&**tx)?; trace!(target: "sync::stages::execution", took = ?start.elapsed(), "Wrote state"); - let done = stage_progress == max_block; Ok(ExecOutput { checkpoint: StageCheckpoint::new(stage_progress) .with_execution_stage_checkpoint(stage_checkpoint), - done, }) } } @@ -338,7 +332,7 @@ impl Stage for ExecutionStage { let mut account_changeset = tx.cursor_dup_write::()?; let mut storage_changeset = tx.cursor_dup_write::()?; - let (range, unwind_to, _) = + let (range, unwind_to) = input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX)); if range.is_empty() { @@ -655,8 +649,7 @@ mod tests { total } })) - }, - done: true + } } if processed == total && total == block.gas_used); let tx = tx.deref_mut(); // check post state diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index e5f4f9218c..d49f3e988e 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -21,7 +21,7 @@ impl Stage for FinishStage { _tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) + Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) }) } async fn unwind( @@ -37,14 +37,12 @@ impl Stage for FinishStage { mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, + UnwindStageTestRunner, }; use reth_interfaces::test_utils::generators::{random_header, random_header_range}; use reth_primitives::SealedHeader; - stage_test_suite_ext!(FinishTestRunner, finish); - #[derive(Default)] struct FinishTestRunner { tx: TestTransaction, @@ -89,7 +87,7 @@ mod tests { output: Option, ) -> Result<(), TestRunnerError> { if let Some(output) = output { - assert!(output.done, "stage should always be done"); + assert!(output.is_done(input), "stage should always be done"); assert_eq!( output.checkpoint.block_number, input.target(), diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 0ac7725022..7ec413e17b 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -135,10 +135,6 @@ impl Stage for AccountHashingStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - if input.target_reached() { - return Ok(ExecOutput::done(input.checkpoint())) - } - let (from_block, to_block) = input.next_block_range().into_inner(); // if there are more blocks then threshold it is faster to go over Plain state and hash all @@ -235,7 +231,7 @@ impl Stage for AccountHashingStage { }, ); - return Ok(ExecOutput { checkpoint, done: false }) + return Ok(ExecOutput { checkpoint }) } } else { // Aggregate all transition changesets and make a list of accounts that have been @@ -257,7 +253,7 @@ impl Stage for AccountHashingStage { ..Default::default() }); - Ok(ExecOutput { checkpoint, done: true }) + Ok(ExecOutput { checkpoint }) } /// Unwind the stage. @@ -266,7 +262,7 @@ impl Stage for AccountHashingStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress, _) = + let (range, unwind_progress) = input.unwind_block_range_with_threshold(self.commit_threshold); // Aggregate all transition changesets and make a list of accounts that have been changed. @@ -296,15 +292,11 @@ fn stage_checkpoint_progress( #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner, - }; + use crate::test_utils::{ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner}; use assert_matches::assert_matches; use reth_primitives::{stage::StageUnitCheckpoint, Account, U256}; use test_utils::*; - stage_test_suite_ext!(AccountHashingTestRunner, account_hashing); - #[tokio::test] async fn execute_clean_account_hashing() { let (previous_stage, stage_progress) = (20, 10); @@ -334,8 +326,7 @@ mod tests { }, .. })), - }, - done: true, + } }) if block_number == previous_stage && processed == total && total == runner.tx.table::().unwrap().len() as u64 @@ -392,8 +383,7 @@ mod tests { progress: EntitiesCheckpoint { processed: 5, total } } )) - }, - done: false + } }) if address == fifth_address && total == runner.tx.table::().unwrap().len() as u64 ); @@ -419,8 +409,7 @@ mod tests { progress: EntitiesCheckpoint { processed, total } } )) - }, - done: true + } }) if processed == total && total == runner.tx.table::().unwrap().len() as u64 ); diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index d8dfc00541..b8328387f3 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -57,10 +57,6 @@ impl Stage for StorageHashingStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - if input.target_reached() { - return Ok(ExecOutput::done(input.checkpoint())) - } - let (from_block, to_block) = input.next_block_range().into_inner(); // if there are more blocks then threshold it is faster to go over Plain state and hash all @@ -165,7 +161,7 @@ impl Stage for StorageHashingStage { }, ); - return Ok(ExecOutput { checkpoint, done: false }) + return Ok(ExecOutput { checkpoint }) } } else { // Aggregate all changesets and and make list of storages that have been @@ -186,7 +182,7 @@ impl Stage for StorageHashingStage { ..Default::default() }); - Ok(ExecOutput { checkpoint, done: true }) + Ok(ExecOutput { checkpoint }) } /// Unwind the stage. @@ -195,7 +191,7 @@ impl Stage for StorageHashingStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress, _) = + let (range, unwind_progress) = input.unwind_block_range_with_threshold(self.commit_threshold); tx.unwind_storage_hashing(BlockNumberAddress::range(range))?; @@ -225,8 +221,8 @@ fn stage_checkpoint_progress( mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, + UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_db::{ @@ -241,8 +237,6 @@ mod tests { stage::StageUnitCheckpoint, Address, SealedBlock, StorageEntry, H256, U256, }; - stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing); - /// Execute with low clean threshold so as to hash whole storage #[tokio::test] async fn execute_clean_storage_hashing() { @@ -266,10 +260,8 @@ mod tests { runner.seed_execution(input).expect("failed to seed execution"); loop { - if let Ok(result @ ExecOutput { checkpoint, done }) = - runner.execute(input).await.unwrap() - { - if !done { + if let Ok(result @ ExecOutput { checkpoint }) = runner.execute(input).await.unwrap() { + if !result.is_done(input) { let previous_checkpoint = input .checkpoint .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint()) @@ -359,8 +351,7 @@ mod tests { total } })) - }, - done: false + } }) if address == progress_address && storage == progress_key && total == runner.tx.table::().unwrap().len() as u64 ); @@ -405,8 +396,7 @@ mod tests { } } )) - }, - done: false + } }) if address == progress_address && storage == progress_key && total == runner.tx.table::().unwrap().len() as u64 ); @@ -437,8 +427,7 @@ mod tests { } } )) - }, - done: true + } }) if processed == total && total == runner.tx.table::().unwrap().len() as u64 ); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index faf8bdc522..dac5418844 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -208,7 +208,7 @@ where // Nothing to sync if gap.is_closed() { info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached"); - return Ok(ExecOutput::done(current_checkpoint)) + return Ok(ExecOutput { checkpoint: current_checkpoint }) } debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync"); @@ -311,12 +311,10 @@ where Ok(ExecOutput { checkpoint: StageCheckpoint::new(checkpoint) .with_headers_stage_checkpoint(stage_checkpoint), - done: true, }) } else { Ok(ExecOutput { checkpoint: current_checkpoint.with_headers_stage_checkpoint(stage_checkpoint), - done: false, }) } } @@ -587,7 +585,7 @@ mod tests { total, } })) - }, done: true }) if block_number == tip.number && + }}) if block_number == tip.number && from == checkpoint && to == previous_stage && // -1 because we don't need to download the local head processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); @@ -681,7 +679,7 @@ mod tests { total, } })) - }, done: false }) if block_number == checkpoint && + }}) if block_number == checkpoint && from == checkpoint && to == previous_stage && processed == checkpoint + 500 && total == tip.number); @@ -704,7 +702,7 @@ mod tests { total, } })) - }, done: true }) if block_number == tip.number && + }}) if block_number == tip.number && from == checkpoint && to == previous_stage && // -1 because we don't need to download the local head processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 108481eb81..41949c4f1b 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -41,11 +41,7 @@ impl Stage for IndexAccountHistoryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - if input.target_reached() { - return Ok(ExecOutput::done(input.checkpoint())) - } - - let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); + let range = input.next_block_range_with_threshold(self.commit_threshold); let mut stage_checkpoint = stage_checkpoint( tx, @@ -66,7 +62,6 @@ impl Stage for IndexAccountHistoryStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()) .with_index_history_stage_checkpoint(stage_checkpoint), - done: is_final_range, }) } @@ -76,7 +71,7 @@ impl Stage for IndexAccountHistoryStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress, _) = + let (range, unwind_progress) = input.unwind_block_range_with_threshold(self.commit_threshold); let changesets = tx.unwind_account_history_indices(range)?; @@ -222,9 +217,9 @@ mod tests { progress: EntitiesCheckpoint { processed: 2, total: 2 } } ), - done: true } ); + assert!(out.is_done(input)); tx.commit().unwrap(); } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index bc2a426181..470c924f30 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -44,11 +44,7 @@ impl Stage for IndexStorageHistoryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - if input.target_reached() { - return Ok(ExecOutput::done(input.checkpoint())) - } - - let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); + let range = input.next_block_range_with_threshold(self.commit_threshold); let mut stage_checkpoint = stage_checkpoint( tx, @@ -68,7 +64,6 @@ impl Stage for IndexStorageHistoryStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()) .with_index_history_stage_checkpoint(stage_checkpoint), - done: is_final_range, }) } @@ -78,7 +73,7 @@ impl Stage for IndexStorageHistoryStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress, _) = + let (range, unwind_progress) = input.unwind_block_range_with_threshold(self.commit_threshold); let changesets = tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?; @@ -233,10 +228,10 @@ mod tests { block_range: CheckpointBlockRange { from: input.next_block(), to: run_to }, progress: EntitiesCheckpoint { processed: 2, total: 2 } } - ), - done: true + ) } ); + assert!(out.is_done(input)); tx.commit().unwrap(); } diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 4eb9f5033c..5de9ea1f30 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -149,7 +149,7 @@ impl Stage for MerkleStage { let threshold = match self { MerkleStage::Unwind => { info!(target: "sync::stages::merkle::unwind", "Stage is always skipped"); - return Ok(ExecOutput::done(StageCheckpoint::new(input.target()))) + return Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) }) } MerkleStage::Execution { clean_threshold } => *clean_threshold, #[cfg(any(test, feature = "test-utils"))] @@ -227,7 +227,6 @@ impl Stage for MerkleStage { checkpoint: input .checkpoint() .with_entities_stage_checkpoint(entities_checkpoint), - done: false, }) } StateRootProgress::Complete(root, hashed_entries_walked, updates) => { @@ -267,7 +266,6 @@ impl Stage for MerkleStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(to_block) .with_entities_stage_checkpoint(entities_checkpoint), - done: true, }) } @@ -328,8 +326,8 @@ impl Stage for MerkleStage { mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, + UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_db::{ @@ -346,8 +344,6 @@ mod tests { use reth_trie::test_utils::{state_root, state_root_prehashed}; use std::collections::BTreeMap; - stage_test_suite_ext!(MerkleTestRunner, merkle); - /// Execute from genesis so as to merkelize whole state #[tokio::test] async fn execute_clean_merkle() { @@ -376,8 +372,7 @@ mod tests { processed, total })) - }, - done: true + } }) if block_number == previous_stage && processed == total && total == ( runner.tx.table::().unwrap().len() + @@ -416,8 +411,7 @@ mod tests { processed, total })) - }, - done: true + } }) if block_number == previous_stage && processed == total && total == ( runner.tx.table::().unwrap().len() + diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index ab863a8d72..23ce876ea5 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -59,11 +59,7 @@ impl Stage for SenderRecoveryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - if input.target_reached() { - return Ok(ExecOutput::done(input.checkpoint())) - } - - let (tx_range, block_range, is_final_range) = + let (tx_range, block_range) = input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?; let end_block = *block_range.end(); @@ -73,7 +69,6 @@ impl Stage for SenderRecoveryStage { return Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), - done: is_final_range, }) } @@ -151,7 +146,6 @@ impl Stage for SenderRecoveryStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), - done: is_final_range, }) } @@ -161,7 +155,7 @@ impl Stage for SenderRecoveryStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold); // Lookup latest tx id that we should unwind to let latest_tx_id = tx.block_body_indices(unwind_to)?.last_tx_num(); @@ -229,12 +223,10 @@ mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, + UnwindStageTestRunner, }; - stage_test_suite_ext!(SenderRecoveryTestRunner, sender_recovery); - /// Execute a block range with a single transaction #[tokio::test] async fn execute_single_transaction() { @@ -268,7 +260,7 @@ mod tests { processed: 1, total: 1 })) - }, done: true }) if block_number == previous_stage + }}) if block_number == previous_stage ); // Validate the stage execution @@ -307,17 +299,17 @@ mod tests { .unwrap_or(previous_stage); assert_matches!(result, Ok(_)); assert_eq!( - result.unwrap(), - ExecOutput { + result.as_ref().unwrap(), + &ExecOutput { checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: runner.tx.table::().unwrap().len() as u64, total: total_transactions } - ), - done: false + ) } ); + assert!(!result.unwrap().is_done(first_input)); // Execute second time to completion runner.set_threshold(u64::MAX); @@ -332,8 +324,7 @@ mod tests { &ExecOutput { checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: total_transactions, total: total_transactions } - ), - done: true + ) } ); diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index a9b0de7626..a02f6a36b2 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -54,11 +54,7 @@ impl Stage for TotalDifficultyStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - if input.target_reached() { - return Ok(ExecOutput::done(input.checkpoint())) - } - - let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); + let range = input.next_block_range_with_threshold(self.commit_threshold); let (start_block, end_block) = range.clone().into_inner(); debug!(target: "sync::stages::total_difficulty", start_block, end_block, "Commencing sync"); @@ -90,7 +86,6 @@ impl Stage for TotalDifficultyStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), - done: is_final_range, }) } @@ -100,7 +95,7 @@ impl Stage for TotalDifficultyStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold); tx.unwind_table_by_num::(unwind_to)?; @@ -132,12 +127,10 @@ mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, + UnwindStageTestRunner, }; - stage_test_suite_ext!(TotalDifficultyTestRunner, total_difficulty); - #[tokio::test] async fn execute_with_intermediate_commit() { let threshold = 50; @@ -165,9 +158,10 @@ mod tests { processed, total })) - }, done: false }) if block_number == expected_progress && processed == 1 + threshold && + }}) if block_number == expected_progress && processed == 1 + threshold && total == runner.tx.table::().unwrap().len() as u64 ); + assert!(!result.unwrap().is_done(first_input)); // Execute second time let second_input = ExecInput { @@ -183,7 +177,7 @@ mod tests { processed, total })) - }, done: true }) if block_number == previous_stage && processed == total && + }}) if block_number == previous_stage && processed == total && total == runner.tx.table::().unwrap().len() as u64 ); diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 56ea803abd..14b86810a6 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -55,11 +55,7 @@ impl Stage for TransactionLookupStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - if input.target_reached() { - return Ok(ExecOutput::done(input.checkpoint())) - } - - let (tx_range, block_range, is_final_range) = + let (tx_range, block_range) = input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?; let end_block = *block_range.end(); @@ -139,7 +135,6 @@ impl Stage for TransactionLookupStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), - done: is_final_range, }) } @@ -149,7 +144,7 @@ impl Stage for TransactionLookupStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (range, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold); // Cursors to unwind tx hash to number let mut body_cursor = tx.cursor_read::()?; @@ -192,16 +187,13 @@ fn stage_checkpoint( mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, + UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::{random_block, random_block_range}; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256}; - // Implement stage test suite. - stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); - #[tokio::test] async fn execute_single_transaction_lookup() { let (previous_stage, stage_progress) = (500, 100); @@ -234,7 +226,7 @@ mod tests { processed, total })) - }, done: true }) if block_number == previous_stage && processed == total && + }}) if block_number == previous_stage && processed == total && total == runner.tx.table::().unwrap().len() as u64 ); @@ -273,17 +265,17 @@ mod tests { .unwrap_or(previous_stage); assert_matches!(result, Ok(_)); assert_eq!( - result.unwrap(), - ExecOutput { + result.as_ref().unwrap(), + &ExecOutput { checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: runner.tx.table::().unwrap().len() as u64, total: total_txs } - ), - done: false + ) } ); + assert!(!result.unwrap().is_done(first_input)); // Execute second time to completion runner.set_threshold(u64::MAX); @@ -298,8 +290,7 @@ mod tests { &ExecOutput { checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: total_txs, total: total_txs } - ), - done: true + ) } ); diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index f691d13711..533d658475 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -42,8 +42,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ExecOutput { done, checkpoint }) - if done && checkpoint.block_number == previous_stage + Ok(ref output @ ExecOutput { checkpoint }) + if output.is_done(input) && checkpoint.block_number == previous_stage ); // Validate the stage execution @@ -94,8 +94,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ExecOutput { done, checkpoint }) - if done && checkpoint.block_number == previous_stage + Ok(ref output @ ExecOutput { checkpoint }) + if output.is_done(execute_input) && checkpoint.block_number == previous_stage ); assert_matches::assert_matches!(runner.validate_execution(execute_input, result.ok()),Ok(_), "execution validation"); @@ -113,7 +113,8 @@ macro_rules! stage_test_suite { // Assert the successful unwind result assert_matches::assert_matches!( rx, - Ok(UnwindOutput { checkpoint }) if checkpoint.block_number == unwind_input.unwind_to + Ok(output @ UnwindOutput { checkpoint }) + if output.is_done(unwind_input) && checkpoint.block_number == unwind_input.unwind_to ); // Validate the stage unwind @@ -123,46 +124,4 @@ macro_rules! stage_test_suite { }; } -// `execute_already_reached_target` is not suitable for the headers stage thus -// included in the test suite extension -macro_rules! stage_test_suite_ext { - ($runner:ident, $name:ident) => { - crate::test_utils::stage_test_suite!($runner, $name); - - paste::item! { - /// Check that the execution is short-circuited if the target was already reached. - #[tokio::test] - async fn [< execute_already_reached_target_ $name>] () { - let stage_progress = 1000; - - // Set up the runner - let mut runner = $runner::default(); - let input = crate::stage::ExecInput { - target: Some(stage_progress), - checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)), - }; - let seed = runner.seed_execution(input).expect("failed to seed"); - - // Run stage execution - let rx = runner.execute(input); - - // Run `after_execution` hook - runner.after_execution(seed).await.expect("failed to run after execution hook"); - - // Assert the successful result - let result = rx.await.unwrap(); - assert_matches::assert_matches!( - result, - Ok(ExecOutput { done, checkpoint }) - if done && checkpoint.block_number == stage_progress - ); - - // Validate the stage execution - assert_matches::assert_matches!(runner.validate_execution(input, result.ok()),Ok(_), "execution validation"); - } - } - }; -} - pub(crate) use stage_test_suite; -pub(crate) use stage_test_suite_ext; diff --git a/crates/stages/src/test_utils/stage.rs b/crates/stages/src/test_utils/stage.rs index 81056a5e14..87dd786f46 100644 --- a/crates/stages/src/test_utils/stage.rs +++ b/crates/stages/src/test_utils/stage.rs @@ -1,19 +1,46 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; -use reth_primitives::stage::StageId; +use reth_primitives::stage::{StageCheckpoint, StageId}; use reth_provider::Transaction; use std::collections::VecDeque; #[derive(Debug)] pub struct TestStage { id: StageId, + checkpoint: Option, exec_outputs: VecDeque>, unwind_outputs: VecDeque>, } impl TestStage { pub fn new(id: StageId) -> Self { - Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() } + Self { + id, + checkpoint: None, + exec_outputs: VecDeque::new(), + unwind_outputs: VecDeque::new(), + } + } + + pub fn with_checkpoint( + mut self, + checkpoint: Option, + db: DB, + ) -> Self { + let mut tx = Transaction::new(&db).expect("initialize transaction"); + + if let Some(checkpoint) = checkpoint { + tx.save_stage_checkpoint(self.id, checkpoint) + .unwrap_or_else(|_| panic!("save stage {} checkpoint", self.id)) + } else { + tx.delete_stage_checkpoint(self.id) + .unwrap_or_else(|_| panic!("delete stage {} checkpoint", self.id)) + } + + tx.commit().expect("commit transaction"); + + self.checkpoint = checkpoint; + self } pub fn with_exec(mut self, exec_outputs: VecDeque>) -> Self { diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index 954e615a50..ed3f2e4949 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -1323,6 +1323,12 @@ where Ok(()) } + /// Delete stage checkpoint. + pub fn delete_stage_checkpoint(&self, id: StageId) -> Result<(), DbError> { + self.delete::(id.to_string(), None)?; + Ok(()) + } + /// Return full table as Vec pub fn table(&self) -> Result>, DbError> where