From 7ec4b0a5cff848c1acfdf85461aa6f955ae685a8 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 12 Jun 2023 10:19:46 -0700 Subject: [PATCH] Revert "refactor(stages): input target reached & output done checks" (#3114) --- bin/reth/src/debug_cmd/merkle.rs | 34 +++-- bin/reth/src/node/events.rs | 3 +- bin/reth/src/stage/dump/hashing_account.rs | 15 ++- bin/reth/src/stage/dump/hashing_storage.rs | 15 ++- bin/reth/src/stage/dump/merkle.rs | 18 +-- bin/reth/src/stage/run.rs | 13 +- crates/consensus/beacon/src/engine/mod.rs | 87 ++++++------ crates/consensus/beacon/src/engine/sync.rs | 6 + crates/stages/src/pipeline/event.rs | 4 - crates/stages/src/pipeline/mod.rs | 125 ++++++------------ crates/stages/src/stage.rs | 54 +++----- crates/stages/src/stages/bodies.rs | 23 +++- crates/stages/src/stages/execution.rs | 11 +- crates/stages/src/stages/finish.rs | 10 +- crates/stages/src/stages/hashing_account.rs | 25 +++- crates/stages/src/stages/hashing_storage.rs | 31 +++-- crates/stages/src/stages/headers.rs | 10 +- .../src/stages/index_account_history.rs | 11 +- .../src/stages/index_storage_history.rs | 13 +- crates/stages/src/stages/merkle.rs | 16 ++- crates/stages/src/stages/sender_recovery.rs | 29 ++-- crates/stages/src/stages/total_difficulty.rs | 20 ++- crates/stages/src/stages/tx_lookup.rs | 29 ++-- crates/stages/src/test_utils/macros.rs | 53 +++++++- crates/stages/src/test_utils/stage.rs | 31 +---- crates/storage/provider/src/transaction.rs | 6 - 26 files changed, 379 insertions(+), 313 deletions(-) diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index 326a5b16f5..193f4f331c 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -119,22 +119,30 @@ impl Command { let mut account_hashing_done = false; while !account_hashing_done { - let input = ExecInput { - target: Some(block), - checkpoint: progress.map(StageCheckpoint::new), - }; - let output = account_hashing_stage.execute(&mut tx, input).await?; - account_hashing_done = output.is_done(input); + let output = account_hashing_stage + .execute( + &mut tx, + ExecInput { + target: Some(block), + checkpoint: progress.map(StageCheckpoint::new), + }, + ) + .await?; + account_hashing_done = output.done; } let mut storage_hashing_done = false; while !storage_hashing_done { - let input = ExecInput { - target: Some(block), - checkpoint: progress.map(StageCheckpoint::new), - }; - let output = storage_hashing_stage.execute(&mut tx, input).await?; - storage_hashing_done = output.is_done(input); + let output = storage_hashing_stage + .execute( + &mut tx, + ExecInput { + target: Some(block), + checkpoint: progress.map(StageCheckpoint::new), + }, + ) + .await?; + storage_hashing_done = output.done; } let incremental_result = merkle_stage @@ -162,7 +170,7 @@ impl Command { loop { let clean_result = merkle_stage.execute(&mut tx, clean_input).await; assert!(clean_result.is_ok(), "Clean state root calculation failed"); - if clean_result.unwrap().is_done(clean_input) { + if clean_result.unwrap().done { break } } diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index fdc56ebd86..04b36b59cc 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -72,8 +72,7 @@ impl NodeState { pipeline_position, pipeline_total, stage_id, - result: ExecOutput { checkpoint }, - done, + result: ExecOutput { checkpoint, done }, } => { self.current_checkpoint = checkpoint; diff --git a/bin/reth/src/stage/dump/hashing_account.rs b/bin/reth/src/stage/dump/hashing_account.rs index 1b18674624..642fa525a6 100644 --- a/bin/reth/src/stage/dump/hashing_account.rs +++ b/bin/reth/src/stage/dump/hashing_account.rs @@ -76,11 +76,16 @@ async fn dry_run( let mut exec_output = false; while !exec_output { - let exec_input = reth_stages::ExecInput { - target: Some(to), - checkpoint: Some(StageCheckpoint::new(from)), - }; - exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input); + exec_output = exec_stage + .execute( + &mut tx, + reth_stages::ExecInput { + target: Some(to), + checkpoint: Some(StageCheckpoint::new(from)), + }, + ) + .await? + .done; } tx.drop()?; diff --git a/bin/reth/src/stage/dump/hashing_storage.rs b/bin/reth/src/stage/dump/hashing_storage.rs index 128b1299d4..6529541be8 100644 --- a/bin/reth/src/stage/dump/hashing_storage.rs +++ b/bin/reth/src/stage/dump/hashing_storage.rs @@ -73,11 +73,16 @@ async fn dry_run( let mut exec_output = false; while !exec_output { - let exec_input = reth_stages::ExecInput { - target: Some(to), - checkpoint: Some(StageCheckpoint::new(from)), - }; - exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input); + exec_output = exec_stage + .execute( + &mut tx, + reth_stages::ExecInput { + target: Some(to), + checkpoint: Some(StageCheckpoint::new(from)), + }, + ) + .await? + .done; } tx.drop()?; diff --git a/bin/reth/src/stage/dump/merkle.rs b/bin/reth/src/stage/dump/merkle.rs index 17fec33cd4..385afd3a2a 100644 --- a/bin/reth/src/stage/dump/merkle.rs +++ b/bin/reth/src/stage/dump/merkle.rs @@ -116,17 +116,19 @@ async fn dry_run( let mut tx = Transaction::new(&output_db)?; let mut exec_output = false; while !exec_output { - let exec_input = reth_stages::ExecInput { - target: Some(to), - checkpoint: Some(StageCheckpoint::new(from)), - }; exec_output = MerkleStage::Execution { - // Forces updating the root instead of calculating from scratch - clean_threshold: u64::MAX, + clean_threshold: u64::MAX, /* Forces updating the root instead of calculating from + * scratch */ } - .execute(&mut tx, exec_input) + .execute( + &mut tx, + reth_stages::ExecInput { + target: Some(to), + checkpoint: Some(StageCheckpoint::new(from)), + }, + ) .await? - .is_done(exec_input); + .done; } tx.drop()?; diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index c5dd4eb276..9f42918305 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -20,7 +20,7 @@ use reth_stages::{ IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage, }, - ExecInput, Stage, UnwindInput, + ExecInput, ExecOutput, Stage, UnwindInput, }; use std::{any::Any, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc}; use tracing::*; @@ -236,13 +236,10 @@ impl Command { checkpoint: Some(checkpoint.with_block_number(self.from)), }; - loop { - let result = exec_stage.execute(&mut tx, input).await?; - if result.is_done(input) { - break - } - - input.checkpoint = Some(result.checkpoint); + while let ExecOutput { checkpoint: stage_progress, done: false } = + exec_stage.execute(&mut tx, input).await? + { + input.checkpoint = Some(stage_progress); if self.commit { tx.commit()?; diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 795c637b13..54d9f992bd 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1370,7 +1370,6 @@ mod tests { chain_spec: Arc, pipeline_exec_outputs: VecDeque>, executor_results: Vec, - max_block: Option, ) -> (TestBeaconConsensusEngine, TestEnv>>) { reth_tracing::init_test_tracing(); let db = create_test_rw_db(); @@ -1382,13 +1381,10 @@ mod tests { // Setup pipeline let (tip_tx, tip_rx) = watch::channel(H256::default()); - let mut pipeline_builder = Pipeline::builder() + let pipeline = Pipeline::builder() .add_stages(TestStages::new(pipeline_exec_outputs, Default::default())) - .with_tip_sender(tip_tx); - if let Some(max_block) = max_block { - pipeline_builder = pipeline_builder.with_max_block(max_block); - } - let pipeline = pipeline_builder.build(db.clone()); + .with_tip_sender(tip_tx) + .build(db.clone()); // Setup blockchain tree let externals = @@ -1408,7 +1404,7 @@ mod tests { blockchain_provider, Box::::default(), Box::::default(), - max_block, + None, false, payload_builder, None, @@ -1443,7 +1439,6 @@ mod tests { chain_spec, VecDeque::from([Err(StageError::ChannelClosed)]), Vec::default(), - Some(1), ); let res = spawn_consensus_engine(consensus_engine); @@ -1473,7 +1468,6 @@ mod tests { chain_spec, VecDeque::from([Err(StageError::ChannelClosed)]), Vec::default(), - Some(1), ); let mut rx = spawn_consensus_engine(consensus_engine); @@ -1513,11 +1507,10 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec, VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(1) }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }), Err(StageError::ChannelClosed), ]), Vec::default(), - Some(2), ); let rx = spawn_consensus_engine(consensus_engine); @@ -1530,9 +1523,7 @@ mod tests { assert_matches!( rx.await, - Ok( - Err(BeaconConsensusEngineError::Pipeline(n)) - ) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -1546,12 +1537,15 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( + let (mut consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(max_block) })]), + VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(max_block), + done: true, + })]), Vec::default(), - Some(max_block), ); + consensus_engine.sync.set_max_block(max_block); let rx = spawn_consensus_engine(consensus_engine); let _ = env @@ -1588,9 +1582,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -1617,9 +1613,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1664,11 +1662,10 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec, VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), ]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1713,9 +1710,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1749,11 +1748,10 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec, VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), ]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1799,11 +1797,10 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec, VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), ]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1846,9 +1843,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -1877,9 +1876,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1921,9 +1922,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1976,9 +1979,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec, - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::from([exec_result2]), - None, ); insert_blocks(env.db.as_ref(), [&data.genesis, &block1].into_iter()); diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index a5097c4c93..a093b57bae 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -83,6 +83,12 @@ where self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64); } + /// Sets the max block value for testing + #[cfg(test)] + pub(crate) fn set_max_block(&mut self, block: BlockNumber) { + self.max_block = Some(block); + } + /// Cancels all full block requests that are in progress. pub(crate) fn clear_full_block_requests(&mut self) { self.inflight_full_block_requests.clear(); diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index 6133d89fa1..2230c4075e 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -31,8 +31,6 @@ pub enum PipelineEvent { stage_id: StageId, /// The result of executing the stage. result: ExecOutput, - /// Stage completed executing the whole block range - done: bool, }, /// Emitted when a stage is about to be unwound. Unwinding { @@ -47,8 +45,6 @@ pub enum PipelineEvent { stage_id: StageId, /// The result of unwinding the stage. result: UnwindOutput, - /// Stage completed unwinding the whole block range - done: bool, }, /// Emitted when a stage encounters an error either during execution or unwinding. Error { diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index aabc987615..2fd3611bd3 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -259,10 +259,8 @@ where continue } - let mut done = UnwindInput { checkpoint, unwind_to: to, bad_block }.target_reached(); - debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind"); - while !done { + while checkpoint.block_number > to { let input = UnwindInput { checkpoint, unwind_to: to, bad_block }; self.listeners.notify(PipelineEvent::Unwinding { stage_id, input }); @@ -270,7 +268,6 @@ where match output { Ok(unwind_output) => { checkpoint = unwind_output.checkpoint; - done = unwind_output.is_done(input); info!( target: "sync::pipeline", stage = %stage_id, @@ -287,11 +284,8 @@ where ); tx.save_stage_checkpoint(stage_id, checkpoint)?; - self.listeners.notify(PipelineEvent::Unwound { - stage_id, - result: unwind_output, - done, - }); + self.listeners + .notify(PipelineEvent::Unwound { stage_id, result: unwind_output }); tx.commit()?; } @@ -349,17 +343,8 @@ where checkpoint: prev_checkpoint, }); - let input = ExecInput { target, checkpoint: prev_checkpoint }; - let result = if input.target_reached() { - Ok(ExecOutput { checkpoint: input.checkpoint() }) - } else { - stage.execute(&mut tx, ExecInput { target, checkpoint: prev_checkpoint }).await - }; - - match result { - Ok(out @ ExecOutput { checkpoint }) => { - let done = out.is_done(input); - + match stage.execute(&mut tx, ExecInput { target, checkpoint: prev_checkpoint }).await { + Ok(out @ ExecOutput { checkpoint, done }) => { made_progress |= checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number; info!( @@ -378,7 +363,6 @@ where pipeline_total: total_stages, stage_id, result: out.clone(), - done, }); // TODO: Make the commit interval configurable @@ -520,15 +504,11 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })), ) .add_stage( TestStage::new(StageId::Other("B")) - .with_checkpoint(Some(StageCheckpoint::new(10)), &db), - ) - .add_stage( - TestStage::new(StageId::Other("C")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) .build(db); @@ -545,30 +525,27 @@ mod tests { vec![ PipelineEvent::Running { pipeline_position: 1, - pipeline_total: 3, + pipeline_total: 2, stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { pipeline_position: 1, - pipeline_total: 3, + pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(20) }, - done: true, + result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, - PipelineEvent::Skipped { stage_id: StageId::Other("B") }, PipelineEvent::Running { - pipeline_position: 3, - pipeline_total: 3, - stage_id: StageId::Other("C"), + pipeline_position: 2, + pipeline_total: 2, + stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 3, - pipeline_total: 3, - stage_id: StageId::Other("C"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true, + pipeline_position: 2, + pipeline_total: 2, + stage_id: StageId::Other("B"), + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, ] ); @@ -582,17 +559,17 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .add_stage( TestStage::new(StageId::Other("B")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .add_stage( TestStage::new(StageId::Other("C")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .with_max_block(10) @@ -623,8 +600,7 @@ mod tests { pipeline_position: 1, pipeline_total: 3, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(100) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, PipelineEvent::Running { pipeline_position: 2, @@ -636,8 +612,7 @@ mod tests { pipeline_position: 2, pipeline_total: 3, stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, PipelineEvent::Running { pipeline_position: 3, @@ -649,8 +624,7 @@ mod tests { pipeline_position: 3, pipeline_total: 3, stage_id: StageId::Other("C"), - result: ExecOutput { checkpoint: StageCheckpoint::new(20) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, // Unwinding PipelineEvent::Unwinding { @@ -664,7 +638,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("C"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, - done: true }, PipelineEvent::Unwinding { stage_id: StageId::Other("B"), @@ -677,7 +650,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("B"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, - done: true }, PipelineEvent::Unwinding { stage_id: StageId::Other("A"), @@ -690,7 +662,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, - done: true }, ] ); @@ -704,12 +675,12 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })), ) .add_stage( TestStage::new(StageId::Other("B")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) .build(db); @@ -739,8 +710,7 @@ mod tests { pipeline_position: 1, pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(100) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, PipelineEvent::Running { pipeline_position: 2, @@ -752,8 +722,7 @@ mod tests { pipeline_position: 2, pipeline_total: 2, stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, // Unwinding // Nothing to unwind in stage "B" @@ -769,7 +738,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(50) }, - done: true }, ] ); @@ -794,9 +762,9 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) })) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .add_stage( TestStage::new(StageId::Other("B")) @@ -805,7 +773,7 @@ mod tests { error: consensus::ConsensusError::BaseFeeMissing, })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) })) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) .build(db); @@ -830,8 +798,7 @@ mod tests { pipeline_position: 1, pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, PipelineEvent::Running { pipeline_position: 2, @@ -851,7 +818,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, - done: true }, PipelineEvent::Running { pipeline_position: 1, @@ -863,8 +829,7 @@ mod tests { pipeline_position: 1, pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, PipelineEvent::Running { pipeline_position: 2, @@ -876,8 +841,7 @@ mod tests { pipeline_position: 2, pipeline_total: 2, stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, ] ); @@ -887,17 +851,17 @@ mod tests { #[tokio::test] async fn pipeline_error_handling() { // Non-fatal - // let db = test_utils::create_test_db::(EnvKind::RW); - // let mut pipeline = Pipeline::builder() - // .add_stage( - // TestStage::new(StageId::Other("NonFatal")) - // .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) - // .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), - // ) - // .with_max_block(10) - // .build(db); - // let result = pipeline.run().await; - // assert_matches!(result, Ok(())); + let db = test_utils::create_test_db::(EnvKind::RW); + let mut pipeline = Pipeline::builder() + .add_stage( + TestStage::new(StageId::Other("NonFatal")) + .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), + ) + .with_max_block(10) + .build(db); + let result = pipeline.run().await; + assert_matches!(result, Ok(())); // Fatal let db = test_utils::create_test_db::(EnvKind::RW); @@ -905,7 +869,6 @@ mod tests { .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err( StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)), ))) - .with_max_block(1) .build(db); let result = pipeline.run().await; assert_matches!( diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index a1e7f10a7d..72b4da1fc5 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -10,7 +10,6 @@ use std::{ cmp::{max, min}, ops::RangeInclusive, }; -use tracing::warn; /// Stage execution input, see [Stage::execute]. #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] @@ -36,7 +35,7 @@ impl ExecInput { /// Returns `true` if the target block number has already been reached. pub fn target_reached(&self) -> bool { - ExecOutput { checkpoint: self.checkpoint.unwrap_or_default() }.is_done(*self) + self.checkpoint().block_number >= self.target() } /// Return the target block number or default. @@ -46,7 +45,8 @@ impl ExecInput { /// Return next block range that needs to be executed. pub fn next_block_range(&self) -> RangeInclusive { - self.next_block_range_with_threshold(u64::MAX) + let (range, _) = self.next_block_range_with_threshold(u64::MAX); + range } /// Return true if this is the first block range to execute. @@ -55,15 +55,19 @@ impl ExecInput { } /// Return the next block range to execute. - /// Return pair of the block range. - pub fn next_block_range_with_threshold(&self, threshold: u64) -> RangeInclusive { + /// Return pair of the block range and if this is final block range. + pub fn next_block_range_with_threshold( + &self, + threshold: u64, + ) -> (RangeInclusive, bool) { let current_block = self.checkpoint(); let start = current_block.block_number + 1; let target = self.target(); let end = min(target, current_block.block_number.saturating_add(threshold)); - start..=end + let is_final_range = end == target; + (start..=end, is_final_range) } /// Return the next block range determined the number of transactions within it. @@ -73,7 +77,7 @@ impl ExecInput { &self, tx: &Transaction<'_, DB>, tx_threshold: u64, - ) -> Result<(RangeInclusive, RangeInclusive), StageError> { + ) -> Result<(RangeInclusive, RangeInclusive, bool), StageError> { let start_block = self.next_block(); let start_block_body = tx .get::(start_block)? @@ -94,7 +98,8 @@ impl ExecInput { break } } - Ok((first_tx_number..=last_tx_number, start_block..=end_block_number)) + let is_final_range = end_block_number >= target_block; + Ok((first_tx_number..=last_tx_number, start_block..=end_block_number, is_final_range)) } } @@ -110,11 +115,6 @@ pub struct UnwindInput { } impl UnwindInput { - /// Returns `true` if the target block number has already been reached. - pub fn target_reached(&self) -> bool { - UnwindOutput { checkpoint: self.checkpoint }.is_done(*self) - } - /// Return next block range that needs to be unwound. pub fn unwind_block_range(&self) -> RangeInclusive { self.unwind_block_range_with_threshold(u64::MAX).0 @@ -124,7 +124,7 @@ impl UnwindInput { pub fn unwind_block_range_with_threshold( &self, threshold: u64, - ) -> (RangeInclusive, BlockNumber) { + ) -> (RangeInclusive, BlockNumber, bool) { // +1 is to skip the block we're unwinding to let mut start = self.unwind_to + 1; let end = self.checkpoint; @@ -133,7 +133,8 @@ impl UnwindInput { let unwind_to = start - 1; - (start..=end.block_number, unwind_to) + let is_final_range = unwind_to == self.unwind_to; + (start..=end.block_number, unwind_to, is_final_range) } } @@ -142,16 +143,14 @@ impl UnwindInput { pub struct ExecOutput { /// How far the stage got. pub checkpoint: StageCheckpoint, + /// Whether or not the stage is done. + pub done: bool, } impl ExecOutput { - /// Returns `true` if the target block number has already been reached, - /// i.e. `checkpoint.block_number >= target`. - pub fn is_done(&self, input: ExecInput) -> bool { - if self.checkpoint.block_number > input.target() { - warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the execution target"); - } - self.checkpoint.block_number >= input.target() + /// Mark the stage as done, checkpointing at the given place. + pub fn done(checkpoint: StageCheckpoint) -> Self { + Self { checkpoint, done: true } } } @@ -162,17 +161,6 @@ pub struct UnwindOutput { pub checkpoint: StageCheckpoint, } -impl UnwindOutput { - /// Returns `true` if the target block number has already been reached, - /// i.e. `checkpoint.block_number <= unwind_to`. - pub fn is_done(&self, input: UnwindInput) -> bool { - if self.checkpoint.block_number < input.unwind_to { - warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the unwind target"); - } - self.checkpoint.block_number <= input.unwind_to - } -} - /// A stage is a segmented part of the syncing process of the node. /// /// Each stage takes care of a well-defined task, such as downloading headers or executing diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index fdf07e6151..7ce5504956 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -70,6 +70,10 @@ impl Stage for BodyStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + let range = input.next_block_range(); // Update the header range on the downloader self.downloader.set_download_range(range.clone())?; @@ -147,9 +151,11 @@ impl Stage for BodyStage { // The stage is "done" if: // - We got fewer blocks than our target // - We reached our target and the target was not limited by the batch size of the stage + let done = highest_block == to_block; Ok(ExecOutput { checkpoint: StageCheckpoint::new(highest_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + done, }) } @@ -224,11 +230,15 @@ fn stage_checkpoint( #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner}; + use crate::test_utils::{ + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner, + }; use assert_matches::assert_matches; use reth_primitives::stage::StageUnitCheckpoint; use test_utils::*; + stage_test_suite_ext!(BodyTestRunner, body); + /// Checks that the stage downloads at most `batch_size` blocks. #[tokio::test] async fn partial_body_download() { @@ -261,7 +271,7 @@ mod tests { processed, // 1 seeded block body + batch size total // seeded headers })) - }}) if block_number < 200 && + }, done: false }) if block_number < 200 && processed == 1 + batch_size && total == previous_stage ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); @@ -298,7 +308,8 @@ mod tests { processed, total })) - } + }, + done: true }) if processed == total && total == previous_stage ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); @@ -333,7 +344,7 @@ mod tests { processed, total })) - }}) if block_number >= 10 && + }, done: false }) if block_number >= 10 && processed == 1 + batch_size && total == previous_stage ); let first_run_checkpoint = first_run.unwrap().checkpoint; @@ -353,7 +364,7 @@ mod tests { processed, total })) - }}) if block_number > first_run_checkpoint.block_number && + }, done: true }) if block_number > first_run_checkpoint.block_number && processed == total && total == previous_stage ); assert_matches!( @@ -393,7 +404,7 @@ mod tests { processed, total })) - }}) if block_number == previous_stage && + }, done: true }) if block_number == previous_stage && processed == total && total == previous_stage ); let checkpoint = output.unwrap().checkpoint; diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index a4d32fb65d..0a9d08f9b6 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -138,6 +138,10 @@ impl ExecutionStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + let start_block = input.next_block(); let max_block = input.target(); @@ -189,9 +193,11 @@ impl ExecutionStage { state.write_to_db(&**tx)?; trace!(target: "sync::stages::execution", took = ?start.elapsed(), "Wrote state"); + let done = stage_progress == max_block; Ok(ExecOutput { checkpoint: StageCheckpoint::new(stage_progress) .with_execution_stage_checkpoint(stage_checkpoint), + done, }) } } @@ -332,7 +338,7 @@ impl Stage for ExecutionStage { let mut account_changeset = tx.cursor_dup_write::()?; let mut storage_changeset = tx.cursor_dup_write::()?; - let (range, unwind_to) = + let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX)); if range.is_empty() { @@ -649,7 +655,8 @@ mod tests { total } })) - } + }, + done: true } if processed == total && total == block.gas_used); let tx = tx.deref_mut(); // check post state diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index d49f3e988e..e5f4f9218c 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -21,7 +21,7 @@ impl Stage for FinishStage { _tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) }) + Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) } async fn unwind( @@ -37,12 +37,14 @@ impl Stage for FinishStage { mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; use reth_interfaces::test_utils::generators::{random_header, random_header_range}; use reth_primitives::SealedHeader; + stage_test_suite_ext!(FinishTestRunner, finish); + #[derive(Default)] struct FinishTestRunner { tx: TestTransaction, @@ -87,7 +89,7 @@ mod tests { output: Option, ) -> Result<(), TestRunnerError> { if let Some(output) = output { - assert!(output.is_done(input), "stage should always be done"); + assert!(output.done, "stage should always be done"); assert_eq!( output.checkpoint.block_number, input.target(), diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 7ec413e17b..0ac7725022 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -135,6 +135,10 @@ impl Stage for AccountHashingStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + let (from_block, to_block) = input.next_block_range().into_inner(); // if there are more blocks then threshold it is faster to go over Plain state and hash all @@ -231,7 +235,7 @@ impl Stage for AccountHashingStage { }, ); - return Ok(ExecOutput { checkpoint }) + return Ok(ExecOutput { checkpoint, done: false }) } } else { // Aggregate all transition changesets and make a list of accounts that have been @@ -253,7 +257,7 @@ impl Stage for AccountHashingStage { ..Default::default() }); - Ok(ExecOutput { checkpoint }) + Ok(ExecOutput { checkpoint, done: true }) } /// Unwind the stage. @@ -262,7 +266,7 @@ impl Stage for AccountHashingStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress) = + let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); // Aggregate all transition changesets and make a list of accounts that have been changed. @@ -292,11 +296,15 @@ fn stage_checkpoint_progress( #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner}; + use crate::test_utils::{ + stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner, + }; use assert_matches::assert_matches; use reth_primitives::{stage::StageUnitCheckpoint, Account, U256}; use test_utils::*; + stage_test_suite_ext!(AccountHashingTestRunner, account_hashing); + #[tokio::test] async fn execute_clean_account_hashing() { let (previous_stage, stage_progress) = (20, 10); @@ -326,7 +334,8 @@ mod tests { }, .. })), - } + }, + done: true, }) if block_number == previous_stage && processed == total && total == runner.tx.table::().unwrap().len() as u64 @@ -383,7 +392,8 @@ mod tests { progress: EntitiesCheckpoint { processed: 5, total } } )) - } + }, + done: false }) if address == fifth_address && total == runner.tx.table::().unwrap().len() as u64 ); @@ -409,7 +419,8 @@ mod tests { progress: EntitiesCheckpoint { processed, total } } )) - } + }, + done: true }) if processed == total && total == runner.tx.table::().unwrap().len() as u64 ); diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index b8328387f3..d8dfc00541 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -57,6 +57,10 @@ impl Stage for StorageHashingStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + let (from_block, to_block) = input.next_block_range().into_inner(); // if there are more blocks then threshold it is faster to go over Plain state and hash all @@ -161,7 +165,7 @@ impl Stage for StorageHashingStage { }, ); - return Ok(ExecOutput { checkpoint }) + return Ok(ExecOutput { checkpoint, done: false }) } } else { // Aggregate all changesets and and make list of storages that have been @@ -182,7 +186,7 @@ impl Stage for StorageHashingStage { ..Default::default() }); - Ok(ExecOutput { checkpoint }) + Ok(ExecOutput { checkpoint, done: true }) } /// Unwind the stage. @@ -191,7 +195,7 @@ impl Stage for StorageHashingStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress) = + let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); tx.unwind_storage_hashing(BlockNumberAddress::range(range))?; @@ -221,8 +225,8 @@ fn stage_checkpoint_progress( mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_db::{ @@ -237,6 +241,8 @@ mod tests { stage::StageUnitCheckpoint, Address, SealedBlock, StorageEntry, H256, U256, }; + stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing); + /// Execute with low clean threshold so as to hash whole storage #[tokio::test] async fn execute_clean_storage_hashing() { @@ -260,8 +266,10 @@ mod tests { runner.seed_execution(input).expect("failed to seed execution"); loop { - if let Ok(result @ ExecOutput { checkpoint }) = runner.execute(input).await.unwrap() { - if !result.is_done(input) { + if let Ok(result @ ExecOutput { checkpoint, done }) = + runner.execute(input).await.unwrap() + { + if !done { let previous_checkpoint = input .checkpoint .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint()) @@ -351,7 +359,8 @@ mod tests { total } })) - } + }, + done: false }) if address == progress_address && storage == progress_key && total == runner.tx.table::().unwrap().len() as u64 ); @@ -396,7 +405,8 @@ mod tests { } } )) - } + }, + done: false }) if address == progress_address && storage == progress_key && total == runner.tx.table::().unwrap().len() as u64 ); @@ -427,7 +437,8 @@ mod tests { } } )) - } + }, + done: true }) if processed == total && total == runner.tx.table::().unwrap().len() as u64 ); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index dac5418844..faf8bdc522 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -208,7 +208,7 @@ where // Nothing to sync if gap.is_closed() { info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached"); - return Ok(ExecOutput { checkpoint: current_checkpoint }) + return Ok(ExecOutput::done(current_checkpoint)) } debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync"); @@ -311,10 +311,12 @@ where Ok(ExecOutput { checkpoint: StageCheckpoint::new(checkpoint) .with_headers_stage_checkpoint(stage_checkpoint), + done: true, }) } else { Ok(ExecOutput { checkpoint: current_checkpoint.with_headers_stage_checkpoint(stage_checkpoint), + done: false, }) } } @@ -585,7 +587,7 @@ mod tests { total, } })) - }}) if block_number == tip.number && + }, done: true }) if block_number == tip.number && from == checkpoint && to == previous_stage && // -1 because we don't need to download the local head processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); @@ -679,7 +681,7 @@ mod tests { total, } })) - }}) if block_number == checkpoint && + }, done: false }) if block_number == checkpoint && from == checkpoint && to == previous_stage && processed == checkpoint + 500 && total == tip.number); @@ -702,7 +704,7 @@ mod tests { total, } })) - }}) if block_number == tip.number && + }, done: true }) if block_number == tip.number && from == checkpoint && to == previous_stage && // -1 because we don't need to download the local head processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 41949c4f1b..108481eb81 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -41,7 +41,11 @@ impl Stage for IndexAccountHistoryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let range = input.next_block_range_with_threshold(self.commit_threshold); + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + + let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); let mut stage_checkpoint = stage_checkpoint( tx, @@ -62,6 +66,7 @@ impl Stage for IndexAccountHistoryStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()) .with_index_history_stage_checkpoint(stage_checkpoint), + done: is_final_range, }) } @@ -71,7 +76,7 @@ impl Stage for IndexAccountHistoryStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress) = + let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); let changesets = tx.unwind_account_history_indices(range)?; @@ -217,9 +222,9 @@ mod tests { progress: EntitiesCheckpoint { processed: 2, total: 2 } } ), + done: true } ); - assert!(out.is_done(input)); tx.commit().unwrap(); } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 470c924f30..bc2a426181 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -44,7 +44,11 @@ impl Stage for IndexStorageHistoryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let range = input.next_block_range_with_threshold(self.commit_threshold); + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + + let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); let mut stage_checkpoint = stage_checkpoint( tx, @@ -64,6 +68,7 @@ impl Stage for IndexStorageHistoryStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()) .with_index_history_stage_checkpoint(stage_checkpoint), + done: is_final_range, }) } @@ -73,7 +78,7 @@ impl Stage for IndexStorageHistoryStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress) = + let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); let changesets = tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?; @@ -228,10 +233,10 @@ mod tests { block_range: CheckpointBlockRange { from: input.next_block(), to: run_to }, progress: EntitiesCheckpoint { processed: 2, total: 2 } } - ) + ), + done: true } ); - assert!(out.is_done(input)); tx.commit().unwrap(); } diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 5de9ea1f30..4eb9f5033c 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -149,7 +149,7 @@ impl Stage for MerkleStage { let threshold = match self { MerkleStage::Unwind => { info!(target: "sync::stages::merkle::unwind", "Stage is always skipped"); - return Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) }) + return Ok(ExecOutput::done(StageCheckpoint::new(input.target()))) } MerkleStage::Execution { clean_threshold } => *clean_threshold, #[cfg(any(test, feature = "test-utils"))] @@ -227,6 +227,7 @@ impl Stage for MerkleStage { checkpoint: input .checkpoint() .with_entities_stage_checkpoint(entities_checkpoint), + done: false, }) } StateRootProgress::Complete(root, hashed_entries_walked, updates) => { @@ -266,6 +267,7 @@ impl Stage for MerkleStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(to_block) .with_entities_stage_checkpoint(entities_checkpoint), + done: true, }) } @@ -326,8 +328,8 @@ impl Stage for MerkleStage { mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_db::{ @@ -344,6 +346,8 @@ mod tests { use reth_trie::test_utils::{state_root, state_root_prehashed}; use std::collections::BTreeMap; + stage_test_suite_ext!(MerkleTestRunner, merkle); + /// Execute from genesis so as to merkelize whole state #[tokio::test] async fn execute_clean_merkle() { @@ -372,7 +376,8 @@ mod tests { processed, total })) - } + }, + done: true }) if block_number == previous_stage && processed == total && total == ( runner.tx.table::().unwrap().len() + @@ -411,7 +416,8 @@ mod tests { processed, total })) - } + }, + done: true }) if block_number == previous_stage && processed == total && total == ( runner.tx.table::().unwrap().len() + diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 23ce876ea5..ab863a8d72 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -59,7 +59,11 @@ impl Stage for SenderRecoveryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let (tx_range, block_range) = + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + + let (tx_range, block_range, is_final_range) = input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?; let end_block = *block_range.end(); @@ -69,6 +73,7 @@ impl Stage for SenderRecoveryStage { return Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + done: is_final_range, }) } @@ -146,6 +151,7 @@ impl Stage for SenderRecoveryStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + done: is_final_range, }) } @@ -155,7 +161,7 @@ impl Stage for SenderRecoveryStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); // Lookup latest tx id that we should unwind to let latest_tx_id = tx.block_body_indices(unwind_to)?.last_tx_num(); @@ -223,10 +229,12 @@ mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; + stage_test_suite_ext!(SenderRecoveryTestRunner, sender_recovery); + /// Execute a block range with a single transaction #[tokio::test] async fn execute_single_transaction() { @@ -260,7 +268,7 @@ mod tests { processed: 1, total: 1 })) - }}) if block_number == previous_stage + }, done: true }) if block_number == previous_stage ); // Validate the stage execution @@ -299,17 +307,17 @@ mod tests { .unwrap_or(previous_stage); assert_matches!(result, Ok(_)); assert_eq!( - result.as_ref().unwrap(), - &ExecOutput { + result.unwrap(), + ExecOutput { checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: runner.tx.table::().unwrap().len() as u64, total: total_transactions } - ) + ), + done: false } ); - assert!(!result.unwrap().is_done(first_input)); // Execute second time to completion runner.set_threshold(u64::MAX); @@ -324,7 +332,8 @@ mod tests { &ExecOutput { checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: total_transactions, total: total_transactions } - ) + ), + done: true } ); diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index a02f6a36b2..a9b0de7626 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -54,7 +54,11 @@ impl Stage for TotalDifficultyStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let range = input.next_block_range_with_threshold(self.commit_threshold); + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + + let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); let (start_block, end_block) = range.clone().into_inner(); debug!(target: "sync::stages::total_difficulty", start_block, end_block, "Commencing sync"); @@ -86,6 +90,7 @@ impl Stage for TotalDifficultyStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + done: is_final_range, }) } @@ -95,7 +100,7 @@ impl Stage for TotalDifficultyStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); tx.unwind_table_by_num::(unwind_to)?; @@ -127,10 +132,12 @@ mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; + stage_test_suite_ext!(TotalDifficultyTestRunner, total_difficulty); + #[tokio::test] async fn execute_with_intermediate_commit() { let threshold = 50; @@ -158,10 +165,9 @@ mod tests { processed, total })) - }}) if block_number == expected_progress && processed == 1 + threshold && + }, done: false }) if block_number == expected_progress && processed == 1 + threshold && total == runner.tx.table::().unwrap().len() as u64 ); - assert!(!result.unwrap().is_done(first_input)); // Execute second time let second_input = ExecInput { @@ -177,7 +183,7 @@ mod tests { processed, total })) - }}) if block_number == previous_stage && processed == total && + }, done: true }) if block_number == previous_stage && processed == total && total == runner.tx.table::().unwrap().len() as u64 ); diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 14b86810a6..56ea803abd 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -55,7 +55,11 @@ impl Stage for TransactionLookupStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let (tx_range, block_range) = + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + + let (tx_range, block_range, is_final_range) = input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?; let end_block = *block_range.end(); @@ -135,6 +139,7 @@ impl Stage for TransactionLookupStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + done: is_final_range, }) } @@ -144,7 +149,7 @@ impl Stage for TransactionLookupStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let (range, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); // Cursors to unwind tx hash to number let mut body_cursor = tx.cursor_read::()?; @@ -187,13 +192,16 @@ fn stage_checkpoint( mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::{random_block, random_block_range}; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256}; + // Implement stage test suite. + stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); + #[tokio::test] async fn execute_single_transaction_lookup() { let (previous_stage, stage_progress) = (500, 100); @@ -226,7 +234,7 @@ mod tests { processed, total })) - }}) if block_number == previous_stage && processed == total && + }, done: true }) if block_number == previous_stage && processed == total && total == runner.tx.table::().unwrap().len() as u64 ); @@ -265,17 +273,17 @@ mod tests { .unwrap_or(previous_stage); assert_matches!(result, Ok(_)); assert_eq!( - result.as_ref().unwrap(), - &ExecOutput { + result.unwrap(), + ExecOutput { checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: runner.tx.table::().unwrap().len() as u64, total: total_txs } - ) + ), + done: false } ); - assert!(!result.unwrap().is_done(first_input)); // Execute second time to completion runner.set_threshold(u64::MAX); @@ -290,7 +298,8 @@ mod tests { &ExecOutput { checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: total_txs, total: total_txs } - ) + ), + done: true } ); diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index 533d658475..f691d13711 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -42,8 +42,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ref output @ ExecOutput { checkpoint }) - if output.is_done(input) && checkpoint.block_number == previous_stage + Ok(ExecOutput { done, checkpoint }) + if done && checkpoint.block_number == previous_stage ); // Validate the stage execution @@ -94,8 +94,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ref output @ ExecOutput { checkpoint }) - if output.is_done(execute_input) && checkpoint.block_number == previous_stage + Ok(ExecOutput { done, checkpoint }) + if done && checkpoint.block_number == previous_stage ); assert_matches::assert_matches!(runner.validate_execution(execute_input, result.ok()),Ok(_), "execution validation"); @@ -113,8 +113,7 @@ macro_rules! stage_test_suite { // Assert the successful unwind result assert_matches::assert_matches!( rx, - Ok(output @ UnwindOutput { checkpoint }) - if output.is_done(unwind_input) && checkpoint.block_number == unwind_input.unwind_to + Ok(UnwindOutput { checkpoint }) if checkpoint.block_number == unwind_input.unwind_to ); // Validate the stage unwind @@ -124,4 +123,46 @@ macro_rules! stage_test_suite { }; } +// `execute_already_reached_target` is not suitable for the headers stage thus +// included in the test suite extension +macro_rules! stage_test_suite_ext { + ($runner:ident, $name:ident) => { + crate::test_utils::stage_test_suite!($runner, $name); + + paste::item! { + /// Check that the execution is short-circuited if the target was already reached. + #[tokio::test] + async fn [< execute_already_reached_target_ $name>] () { + let stage_progress = 1000; + + // Set up the runner + let mut runner = $runner::default(); + let input = crate::stage::ExecInput { + target: Some(stage_progress), + checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)), + }; + let seed = runner.seed_execution(input).expect("failed to seed"); + + // Run stage execution + let rx = runner.execute(input); + + // Run `after_execution` hook + runner.after_execution(seed).await.expect("failed to run after execution hook"); + + // Assert the successful result + let result = rx.await.unwrap(); + assert_matches::assert_matches!( + result, + Ok(ExecOutput { done, checkpoint }) + if done && checkpoint.block_number == stage_progress + ); + + // Validate the stage execution + assert_matches::assert_matches!(runner.validate_execution(input, result.ok()),Ok(_), "execution validation"); + } + } + }; +} + pub(crate) use stage_test_suite; +pub(crate) use stage_test_suite_ext; diff --git a/crates/stages/src/test_utils/stage.rs b/crates/stages/src/test_utils/stage.rs index 87dd786f46..81056a5e14 100644 --- a/crates/stages/src/test_utils/stage.rs +++ b/crates/stages/src/test_utils/stage.rs @@ -1,46 +1,19 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_primitives::stage::StageId; use reth_provider::Transaction; use std::collections::VecDeque; #[derive(Debug)] pub struct TestStage { id: StageId, - checkpoint: Option, exec_outputs: VecDeque>, unwind_outputs: VecDeque>, } impl TestStage { pub fn new(id: StageId) -> Self { - Self { - id, - checkpoint: None, - exec_outputs: VecDeque::new(), - unwind_outputs: VecDeque::new(), - } - } - - pub fn with_checkpoint( - mut self, - checkpoint: Option, - db: DB, - ) -> Self { - let mut tx = Transaction::new(&db).expect("initialize transaction"); - - if let Some(checkpoint) = checkpoint { - tx.save_stage_checkpoint(self.id, checkpoint) - .unwrap_or_else(|_| panic!("save stage {} checkpoint", self.id)) - } else { - tx.delete_stage_checkpoint(self.id) - .unwrap_or_else(|_| panic!("delete stage {} checkpoint", self.id)) - } - - tx.commit().expect("commit transaction"); - - self.checkpoint = checkpoint; - self + Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() } } pub fn with_exec(mut self, exec_outputs: VecDeque>) -> Self { diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index ed3f2e4949..954e615a50 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -1323,12 +1323,6 @@ where Ok(()) } - /// Delete stage checkpoint. - pub fn delete_stage_checkpoint(&self, id: StageId) -> Result<(), DbError> { - self.delete::(id.to_string(), None)?; - Ok(()) - } - /// Return full table as Vec pub fn table(&self) -> Result>, DbError> where