diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index 909203e02b..c05fc6c54b 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -120,22 +120,30 @@ impl Command { let mut account_hashing_done = false; while !account_hashing_done { - let input = ExecInput { - target: Some(block), - checkpoint: progress.map(StageCheckpoint::new), - }; - let output = account_hashing_stage.execute(&mut provider_rw, input).await?; - account_hashing_done = output.is_done(input); + let output = account_hashing_stage + .execute( + &mut provider_rw, + ExecInput { + target: Some(block), + checkpoint: progress.map(StageCheckpoint::new), + }, + ) + .await?; + account_hashing_done = output.done; } let mut storage_hashing_done = false; while !storage_hashing_done { - let input = ExecInput { - target: Some(block), - checkpoint: progress.map(StageCheckpoint::new), - }; - let output = storage_hashing_stage.execute(&mut provider_rw, input).await?; - storage_hashing_done = output.is_done(input); + let output = storage_hashing_stage + .execute( + &mut provider_rw, + ExecInput { + target: Some(block), + checkpoint: progress.map(StageCheckpoint::new), + }, + ) + .await?; + storage_hashing_done = output.done; } let incremental_result = merkle_stage @@ -165,7 +173,7 @@ impl Command { loop { let clean_result = merkle_stage.execute(&mut provider_rw, clean_input).await; assert!(clean_result.is_ok(), "Clean state root calculation failed"); - if clean_result.unwrap().is_done(clean_input) { + if clean_result.unwrap().done { break } } diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index fdc56ebd86..04b36b59cc 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -72,8 +72,7 @@ impl NodeState { pipeline_position, pipeline_total, stage_id, - result: ExecOutput { checkpoint }, - done, + result: ExecOutput { checkpoint, done }, } => { self.current_checkpoint = checkpoint; diff --git a/bin/reth/src/stage/dump/hashing_account.rs b/bin/reth/src/stage/dump/hashing_account.rs index 690c1f40e6..d63a14cc82 100644 --- a/bin/reth/src/stage/dump/hashing_account.rs +++ b/bin/reth/src/stage/dump/hashing_account.rs @@ -77,11 +77,16 @@ async fn dry_run( let mut exec_output = false; while !exec_output { - let exec_input = reth_stages::ExecInput { - target: Some(to), - checkpoint: Some(StageCheckpoint::new(from)), - }; - exec_output = exec_stage.execute(&mut provider, exec_input).await?.is_done(exec_input); + exec_output = exec_stage + .execute( + &mut provider, + reth_stages::ExecInput { + target: Some(to), + checkpoint: Some(StageCheckpoint::new(from)), + }, + ) + .await? + .done; } info!(target: "reth::cli", "Success."); diff --git a/bin/reth/src/stage/dump/hashing_storage.rs b/bin/reth/src/stage/dump/hashing_storage.rs index a022ef30dd..6e717544c7 100644 --- a/bin/reth/src/stage/dump/hashing_storage.rs +++ b/bin/reth/src/stage/dump/hashing_storage.rs @@ -76,11 +76,16 @@ async fn dry_run( let mut exec_output = false; while !exec_output { - let exec_input = reth_stages::ExecInput { - target: Some(to), - checkpoint: Some(StageCheckpoint::new(from)), - }; - exec_output = exec_stage.execute(&mut provider, exec_input).await?.is_done(exec_input); + exec_output = exec_stage + .execute( + &mut provider, + reth_stages::ExecInput { + target: Some(to), + checkpoint: Some(StageCheckpoint::new(from)), + }, + ) + .await? + .done; } info!(target: "reth::cli", "Success."); diff --git a/bin/reth/src/stage/dump/merkle.rs b/bin/reth/src/stage/dump/merkle.rs index 1d2e05005f..3eb38283be 100644 --- a/bin/reth/src/stage/dump/merkle.rs +++ b/bin/reth/src/stage/dump/merkle.rs @@ -119,17 +119,20 @@ async fn dry_run( let mut provider = shareable_db.provider_rw()?; let mut exec_output = false; while !exec_output { - let exec_input = reth_stages::ExecInput { - target: Some(to), - checkpoint: Some(StageCheckpoint::new(from)), - }; exec_output = MerkleStage::Execution { - // Forces updating the root instead of calculating from scratch - clean_threshold: u64::MAX, + clean_threshold: u64::MAX, /* Forces updating the root instead of calculating + * from + * scratch */ } - .execute(&mut provider, exec_input) + .execute( + &mut provider, + reth_stages::ExecInput { + target: Some(to), + checkpoint: Some(StageCheckpoint::new(from)), + }, + ) .await? - .is_done(exec_input); + .done; } info!(target: "reth::cli", "Success."); diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 9f3ade81c4..cc3cb3eb3b 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -20,7 +20,7 @@ use reth_stages::{ IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage, }, - ExecInput, PipelineError, Stage, UnwindInput, + ExecInput, ExecOutput, PipelineError, Stage, UnwindInput, }; use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc}; use tracing::*; @@ -243,13 +243,10 @@ impl Command { checkpoint: Some(checkpoint.with_block_number(self.from)), }; - loop { - let result = exec_stage.execute(&mut provider_rw, input).await?; - if result.is_done(input) { - break - } - - input.checkpoint = Some(result.checkpoint); + while let ExecOutput { checkpoint: stage_progress, done: false } = + exec_stage.execute(&mut provider_rw, input).await? + { + input.checkpoint = Some(stage_progress); if self.commit { provider_rw.commit()?; diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 0875c3f7fb..70f5555552 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1369,7 +1369,6 @@ mod tests { chain_spec: Arc, pipeline_exec_outputs: VecDeque>, executor_results: Vec, - max_block: Option, ) -> (TestBeaconConsensusEngine, TestEnv>>) { reth_tracing::init_test_tracing(); let db = create_test_rw_db(); @@ -1381,13 +1380,10 @@ mod tests { // Setup pipeline let (tip_tx, tip_rx) = watch::channel(H256::default()); - let mut pipeline_builder = Pipeline::builder() + let pipeline = Pipeline::builder() .add_stages(TestStages::new(pipeline_exec_outputs, Default::default())) - .with_tip_sender(tip_tx); - if let Some(max_block) = max_block { - pipeline_builder = pipeline_builder.with_max_block(max_block); - } - let pipeline = pipeline_builder.build(db.clone(), chain_spec.clone()); + .with_tip_sender(tip_tx) + .build(db.clone(), chain_spec.clone()); // Setup blockchain tree let externals = @@ -1407,7 +1403,7 @@ mod tests { blockchain_provider, Box::::default(), Box::::default(), - max_block, + None, false, payload_builder, None, @@ -1442,7 +1438,6 @@ mod tests { chain_spec.clone(), VecDeque::from([Err(StageError::ChannelClosed)]), Vec::default(), - Some(1), ); let res = spawn_consensus_engine(consensus_engine); @@ -1472,7 +1467,6 @@ mod tests { chain_spec.clone(), VecDeque::from([Err(StageError::ChannelClosed)]), Vec::default(), - Some(1), ); let mut rx = spawn_consensus_engine(consensus_engine); @@ -1512,11 +1506,10 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(1) }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }), Err(StageError::ChannelClosed), ]), Vec::default(), - Some(2), ); let rx = spawn_consensus_engine(consensus_engine); @@ -1529,9 +1522,7 @@ mod tests { assert_matches!( rx.await, - Ok( - Err(BeaconConsensusEngineError::Pipeline(n)) - ) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -1545,12 +1536,15 @@ mod tests { .paris_activated() .build(), ); - let (consensus_engine, env) = setup_consensus_engine( + let (mut consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(max_block) })]), + VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(max_block), + done: true, + })]), Vec::default(), - Some(max_block), ); + consensus_engine.sync.set_max_block(max_block); let rx = spawn_consensus_engine(consensus_engine); let _ = env @@ -1590,9 +1584,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -1619,9 +1615,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1666,11 +1664,10 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), ]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1715,9 +1712,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1751,11 +1750,10 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), ]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1805,11 +1803,10 @@ mod tests { let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), + Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }), ]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1852,9 +1849,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -1883,9 +1882,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1931,9 +1932,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::default(), - None, ); let genesis = random_block(0, None, None, Some(0)); @@ -1986,9 +1989,11 @@ mod tests { ); let (consensus_engine, env) = setup_consensus_engine( chain_spec.clone(), - VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]), + VecDeque::from([Ok(ExecOutput { + done: true, + checkpoint: StageCheckpoint::new(0), + })]), Vec::from([exec_result2]), - None, ); insert_blocks( diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index a5097c4c93..a093b57bae 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -83,6 +83,12 @@ where self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64); } + /// Sets the max block value for testing + #[cfg(test)] + pub(crate) fn set_max_block(&mut self, block: BlockNumber) { + self.max_block = Some(block); + } + /// Cancels all full block requests that are in progress. pub(crate) fn clear_full_block_requests(&mut self) { self.inflight_full_block_requests.clear(); diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index 6133d89fa1..2230c4075e 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -31,8 +31,6 @@ pub enum PipelineEvent { stage_id: StageId, /// The result of executing the stage. result: ExecOutput, - /// Stage completed executing the whole block range - done: bool, }, /// Emitted when a stage is about to be unwound. Unwinding { @@ -47,8 +45,6 @@ pub enum PipelineEvent { stage_id: StageId, /// The result of unwinding the stage. result: UnwindOutput, - /// Stage completed unwinding the whole block range - done: bool, }, /// Emitted when a stage encounters an error either during execution or unwinding. Error { diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index a4b6c681e0..6586365e84 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -262,10 +262,8 @@ where continue } - let mut done = UnwindInput { checkpoint, unwind_to: to, bad_block }.target_reached(); - debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind"); - while !done { + while checkpoint.block_number > to { let input = UnwindInput { checkpoint, unwind_to: to, bad_block }; self.listeners.notify(PipelineEvent::Unwinding { stage_id, input }); @@ -273,7 +271,6 @@ where match output { Ok(unwind_output) => { checkpoint = unwind_output.checkpoint; - done = unwind_output.is_done(input); info!( target: "sync::pipeline", stage = %stage_id, @@ -290,11 +287,8 @@ where ); provider_rw.save_stage_checkpoint(stage_id, checkpoint)?; - self.listeners.notify(PipelineEvent::Unwound { - stage_id, - result: unwind_output, - done, - }); + self.listeners + .notify(PipelineEvent::Unwound { stage_id, result: unwind_output }); provider_rw.commit()?; provider_rw = @@ -355,18 +349,11 @@ where checkpoint: prev_checkpoint, }); - let input = ExecInput { target, checkpoint: prev_checkpoint }; - let result = if input.target_reached() { - Ok(ExecOutput { checkpoint: input.checkpoint() }) - } else { - stage - .execute(&mut provider_rw, ExecInput { target, checkpoint: prev_checkpoint }) - .await - }; - - match result { - Ok(out @ ExecOutput { checkpoint }) => { - let done = out.is_done(input); + match stage + .execute(&mut provider_rw, ExecInput { target, checkpoint: prev_checkpoint }) + .await + { + Ok(out @ ExecOutput { checkpoint, done }) => { made_progress |= checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number; info!( @@ -385,7 +372,6 @@ where pipeline_total: total_stages, stage_id, result: out.clone(), - done, }); // TODO: Make the commit interval configurable @@ -484,10 +470,7 @@ impl std::fmt::Debug for Pipeline { #[cfg(test)] mod tests { use super::*; - use crate::{ - test_utils::{TestStage, TestTransaction}, - UnwindOutput, - }; + use crate::{test_utils::TestStage, UnwindOutput}; use assert_matches::assert_matches; use reth_db::mdbx::{self, test_utils, EnvKind}; use reth_interfaces::{ @@ -526,23 +509,19 @@ mod tests { /// Runs a simple pipeline. #[tokio::test] async fn run_pipeline() { - let tx = TestTransaction::default(); + let db = test_utils::create_test_db::(EnvKind::RW); let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })), ) .add_stage( TestStage::new(StageId::Other("B")) - .with_checkpoint(Some(StageCheckpoint::new(10)), tx.inner()), - ) - .add_stage( - TestStage::new(StageId::Other("C")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) - .build(tx.inner_raw(), MAINNET.clone()); + .build(db, MAINNET.clone()); let events = pipeline.events(); // Run pipeline @@ -556,30 +535,27 @@ mod tests { vec![ PipelineEvent::Running { pipeline_position: 1, - pipeline_total: 3, + pipeline_total: 2, stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { pipeline_position: 1, - pipeline_total: 3, + pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(20) }, - done: true, + result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, - PipelineEvent::Skipped { stage_id: StageId::Other("B") }, PipelineEvent::Running { - pipeline_position: 3, - pipeline_total: 3, - stage_id: StageId::Other("C"), + pipeline_position: 2, + pipeline_total: 2, + stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 3, - pipeline_total: 3, - stage_id: StageId::Other("C"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true, + pipeline_position: 2, + pipeline_total: 2, + stage_id: StageId::Other("B"), + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, ] ); @@ -593,17 +569,17 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .add_stage( TestStage::new(StageId::Other("B")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .add_stage( TestStage::new(StageId::Other("C")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .with_max_block(10) @@ -634,8 +610,7 @@ mod tests { pipeline_position: 1, pipeline_total: 3, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(100) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, PipelineEvent::Running { pipeline_position: 2, @@ -647,8 +622,7 @@ mod tests { pipeline_position: 2, pipeline_total: 3, stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, PipelineEvent::Running { pipeline_position: 3, @@ -660,8 +634,7 @@ mod tests { pipeline_position: 3, pipeline_total: 3, stage_id: StageId::Other("C"), - result: ExecOutput { checkpoint: StageCheckpoint::new(20) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, // Unwinding PipelineEvent::Unwinding { @@ -675,7 +648,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("C"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, - done: true }, PipelineEvent::Unwinding { stage_id: StageId::Other("B"), @@ -688,7 +660,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("B"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, - done: true }, PipelineEvent::Unwinding { stage_id: StageId::Other("A"), @@ -701,7 +672,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, - done: true }, ] ); @@ -715,12 +685,12 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })), ) .add_stage( TestStage::new(StageId::Other("B")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) .build(db, MAINNET.clone()); @@ -750,8 +720,7 @@ mod tests { pipeline_position: 1, pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(100) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, PipelineEvent::Running { pipeline_position: 2, @@ -763,8 +732,7 @@ mod tests { pipeline_position: 2, pipeline_total: 2, stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, // Unwinding // Nothing to unwind in stage "B" @@ -780,7 +748,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(50) }, - done: true }, ] ); @@ -805,9 +772,9 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) })) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .add_stage( TestStage::new(StageId::Other("B")) @@ -816,7 +783,7 @@ mod tests { error: consensus::ConsensusError::BaseFeeMissing, })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) })) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) .build(db, MAINNET.clone()); @@ -841,8 +808,7 @@ mod tests { pipeline_position: 1, pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, PipelineEvent::Running { pipeline_position: 2, @@ -862,7 +828,6 @@ mod tests { PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, - done: true }, PipelineEvent::Running { pipeline_position: 1, @@ -874,8 +839,7 @@ mod tests { pipeline_position: 1, pipeline_total: 2, stage_id: StageId::Other("A"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, PipelineEvent::Running { pipeline_position: 2, @@ -887,8 +851,7 @@ mod tests { pipeline_position: 2, pipeline_total: 2, stage_id: StageId::Other("B"), - result: ExecOutput { checkpoint: StageCheckpoint::new(10) }, - done: true + result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, ] ); @@ -898,17 +861,17 @@ mod tests { #[tokio::test] async fn pipeline_error_handling() { // Non-fatal - // let db = test_utils::create_test_db::(EnvKind::RW); - // let mut pipeline = Pipeline::builder() - // .add_stage( - // TestStage::new(StageId::Other("NonFatal")) - // .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) - // .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })), - // ) - // .with_max_block(10) - // .build(db, MAINNET.clone()); - // let result = pipeline.run().await; - // assert_matches!(result, Ok(())); + let db = test_utils::create_test_db::(EnvKind::RW); + let mut pipeline = Pipeline::builder() + .add_stage( + TestStage::new(StageId::Other("NonFatal")) + .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), + ) + .with_max_block(10) + .build(db, MAINNET.clone()); + let result = pipeline.run().await; + assert_matches!(result, Ok(())); // Fatal let db = test_utils::create_test_db::(EnvKind::RW); @@ -916,7 +879,6 @@ mod tests { .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err( StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)), ))) - .with_max_block(1) .build(db, MAINNET.clone()); let result = pipeline.run().await; assert_matches!( diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index b5f1311baf..a7de484994 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -10,7 +10,6 @@ use std::{ cmp::{max, min}, ops::RangeInclusive, }; -use tracing::warn; /// Stage execution input, see [Stage::execute]. #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] @@ -36,7 +35,7 @@ impl ExecInput { /// Returns `true` if the target block number has already been reached. pub fn target_reached(&self) -> bool { - ExecOutput { checkpoint: self.checkpoint.unwrap_or_default() }.is_done(*self) + self.checkpoint().block_number >= self.target() } /// Return the target block number or default. @@ -46,7 +45,8 @@ impl ExecInput { /// Return next block range that needs to be executed. pub fn next_block_range(&self) -> RangeInclusive { - self.next_block_range_with_threshold(u64::MAX) + let (range, _) = self.next_block_range_with_threshold(u64::MAX); + range } /// Return true if this is the first block range to execute. @@ -55,15 +55,19 @@ impl ExecInput { } /// Return the next block range to execute. - /// Return pair of the block range. - pub fn next_block_range_with_threshold(&self, threshold: u64) -> RangeInclusive { + /// Return pair of the block range and if this is final block range. + pub fn next_block_range_with_threshold( + &self, + threshold: u64, + ) -> (RangeInclusive, bool) { let current_block = self.checkpoint(); let start = current_block.block_number + 1; let target = self.target(); let end = min(target, current_block.block_number.saturating_add(threshold)); - start..=end + let is_final_range = end == target; + (start..=end, is_final_range) } /// Return the next block range determined the number of transactions within it. @@ -73,7 +77,7 @@ impl ExecInput { &self, provider: &DatabaseProviderRW<'_, DB>, tx_threshold: u64, - ) -> Result<(RangeInclusive, RangeInclusive), StageError> { + ) -> Result<(RangeInclusive, RangeInclusive, bool), StageError> { let start_block = self.next_block(); let start_block_body = provider .tx_ref() @@ -96,7 +100,8 @@ impl ExecInput { break } } - Ok((first_tx_number..=last_tx_number, start_block..=end_block_number)) + let is_final_range = end_block_number >= target_block; + Ok((first_tx_number..=last_tx_number, start_block..=end_block_number, is_final_range)) } } @@ -112,11 +117,6 @@ pub struct UnwindInput { } impl UnwindInput { - /// Returns `true` if the target block number has already been reached. - pub fn target_reached(&self) -> bool { - UnwindOutput { checkpoint: self.checkpoint }.is_done(*self) - } - /// Return next block range that needs to be unwound. pub fn unwind_block_range(&self) -> RangeInclusive { self.unwind_block_range_with_threshold(u64::MAX).0 @@ -126,7 +126,7 @@ impl UnwindInput { pub fn unwind_block_range_with_threshold( &self, threshold: u64, - ) -> (RangeInclusive, BlockNumber) { + ) -> (RangeInclusive, BlockNumber, bool) { // +1 is to skip the block we're unwinding to let mut start = self.unwind_to + 1; let end = self.checkpoint; @@ -135,7 +135,8 @@ impl UnwindInput { let unwind_to = start - 1; - (start..=end.block_number, unwind_to) + let is_final_range = unwind_to == self.unwind_to; + (start..=end.block_number, unwind_to, is_final_range) } } @@ -144,16 +145,14 @@ impl UnwindInput { pub struct ExecOutput { /// How far the stage got. pub checkpoint: StageCheckpoint, + /// Whether or not the stage is done. + pub done: bool, } impl ExecOutput { - /// Returns `true` if the target block number has already been reached, - /// i.e. `checkpoint.block_number >= target`. - pub fn is_done(&self, input: ExecInput) -> bool { - if self.checkpoint.block_number > input.target() { - warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the execution target"); - } - self.checkpoint.block_number >= input.target() + /// Mark the stage as done, checkpointing at the given place. + pub fn done(checkpoint: StageCheckpoint) -> Self { + Self { checkpoint, done: true } } } @@ -164,17 +163,6 @@ pub struct UnwindOutput { pub checkpoint: StageCheckpoint, } -impl UnwindOutput { - /// Returns `true` if the target block number has already been reached, - /// i.e. `checkpoint.block_number <= unwind_to`. - pub fn is_done(&self, input: UnwindInput) -> bool { - if self.checkpoint.block_number < input.unwind_to { - warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the unwind target"); - } - self.checkpoint.block_number <= input.unwind_to - } -} - /// A stage is a segmented part of the syncing process of the node. /// /// Each stage takes care of a well-defined task, such as downloading headers or executing diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index cdeca70e0b..0108f78280 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -70,6 +70,10 @@ impl Stage for BodyStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + let range = input.next_block_range(); // Update the header range on the downloader self.downloader.set_download_range(range.clone())?; @@ -148,9 +152,11 @@ impl Stage for BodyStage { // The stage is "done" if: // - We got fewer blocks than our target // - We reached our target and the target was not limited by the batch size of the stage + let done = highest_block == to_block; Ok(ExecOutput { checkpoint: StageCheckpoint::new(highest_block) .with_entities_stage_checkpoint(stage_checkpoint(provider)?), + done, }) } @@ -226,11 +232,15 @@ fn stage_checkpoint( #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner}; + use crate::test_utils::{ + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner, + }; use assert_matches::assert_matches; use reth_primitives::stage::StageUnitCheckpoint; use test_utils::*; + stage_test_suite_ext!(BodyTestRunner, body); + /// Checks that the stage downloads at most `batch_size` blocks. #[tokio::test] async fn partial_body_download() { @@ -263,7 +273,7 @@ mod tests { processed, // 1 seeded block body + batch size total // seeded headers })) - }}) if block_number < 200 && + }, done: false }) if block_number < 200 && processed == 1 + batch_size && total == previous_stage ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); @@ -300,7 +310,8 @@ mod tests { processed, total })) - } + }, + done: true }) if processed == total && total == previous_stage ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); @@ -335,7 +346,7 @@ mod tests { processed, total })) - }}) if block_number >= 10 && + }, done: false }) if block_number >= 10 && processed == 1 + batch_size && total == previous_stage ); let first_run_checkpoint = first_run.unwrap().checkpoint; @@ -355,7 +366,7 @@ mod tests { processed, total })) - }}) if block_number > first_run_checkpoint.block_number && + }, done: true }) if block_number > first_run_checkpoint.block_number && processed == total && total == previous_stage ); assert_matches!( @@ -395,7 +406,7 @@ mod tests { processed, total })) - }}) if block_number == previous_stage && + }, done: true }) if block_number == previous_stage && processed == total && total == previous_stage ); let checkpoint = output.unwrap().checkpoint; diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index b58dd56887..f6b40ee05a 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -143,6 +143,10 @@ impl ExecutionStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + let start_block = input.next_block(); let max_block = input.target(); @@ -195,9 +199,11 @@ impl ExecutionStage { state.write_to_db(provider.tx_ref())?; trace!(target: "sync::stages::execution", took = ?start.elapsed(), "Wrote state"); + let done = stage_progress == max_block; Ok(ExecOutput { checkpoint: StageCheckpoint::new(stage_progress) .with_execution_stage_checkpoint(stage_checkpoint), + done, }) } } @@ -339,7 +345,7 @@ impl Stage for ExecutionStage { let mut account_changeset = tx.cursor_dup_write::()?; let mut storage_changeset = tx.cursor_dup_write::()?; - let (range, unwind_to) = + let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX)); if range.is_empty() { @@ -663,7 +669,8 @@ mod tests { total } })) - } + }, + done: true } if processed == total && total == block.gas_used); let mut provider = db.provider_rw().unwrap(); let tx = provider.tx_mut(); diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index 4955565e0b..bae21c8c76 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -21,7 +21,7 @@ impl Stage for FinishStage { _provider: &mut DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { - Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) }) + Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) } async fn unwind( @@ -37,12 +37,14 @@ impl Stage for FinishStage { mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; use reth_interfaces::test_utils::generators::{random_header, random_header_range}; use reth_primitives::SealedHeader; + stage_test_suite_ext!(FinishTestRunner, finish); + #[derive(Default)] struct FinishTestRunner { tx: TestTransaction, @@ -87,7 +89,7 @@ mod tests { output: Option, ) -> Result<(), TestRunnerError> { if let Some(output) = output { - assert!(output.is_done(input), "stage should always be done"); + assert!(output.done, "stage should always be done"); assert_eq!( output.checkpoint.block_number, input.target(), diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 75df5e9193..ab56a33984 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -135,6 +135,10 @@ impl Stage for AccountHashingStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + let (from_block, to_block) = input.next_block_range().into_inner(); // if there are more blocks then threshold it is faster to go over Plain state and hash all @@ -232,7 +236,7 @@ impl Stage for AccountHashingStage { }, ); - return Ok(ExecOutput { checkpoint }) + return Ok(ExecOutput { checkpoint, done: false }) } } else { // Aggregate all transition changesets and make a list of accounts that have been @@ -254,7 +258,7 @@ impl Stage for AccountHashingStage { ..Default::default() }); - Ok(ExecOutput { checkpoint }) + Ok(ExecOutput { checkpoint, done: true }) } /// Unwind the stage. @@ -263,7 +267,7 @@ impl Stage for AccountHashingStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress) = + let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); // Aggregate all transition changesets and make a list of accounts that have been changed. @@ -293,11 +297,15 @@ fn stage_checkpoint_progress( #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner}; + use crate::test_utils::{ + stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner, + }; use assert_matches::assert_matches; use reth_primitives::{stage::StageUnitCheckpoint, Account, U256}; use test_utils::*; + stage_test_suite_ext!(AccountHashingTestRunner, account_hashing); + #[tokio::test] async fn execute_clean_account_hashing() { let (previous_stage, stage_progress) = (20, 10); @@ -327,7 +335,8 @@ mod tests { }, .. })), - } + }, + done: true, }) if block_number == previous_stage && processed == total && total == runner.tx.table::().unwrap().len() as u64 @@ -384,7 +393,8 @@ mod tests { progress: EntitiesCheckpoint { processed: 5, total } } )) - } + }, + done: false }) if address == fifth_address && total == runner.tx.table::().unwrap().len() as u64 ); @@ -410,7 +420,8 @@ mod tests { progress: EntitiesCheckpoint { processed, total } } )) - } + }, + done: true }) if processed == total && total == runner.tx.table::().unwrap().len() as u64 ); diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 3218fdfcf9..acb109b0e9 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -58,6 +58,9 @@ impl Stage for StorageHashingStage { input: ExecInput, ) -> Result { let tx = provider.tx_ref(); + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } let (from_block, to_block) = input.next_block_range().into_inner(); @@ -163,7 +166,7 @@ impl Stage for StorageHashingStage { }, ); - return Ok(ExecOutput { checkpoint }) + return Ok(ExecOutput { checkpoint, done: false }) } } else { // Aggregate all changesets and and make list of storages that have been @@ -185,7 +188,7 @@ impl Stage for StorageHashingStage { ..Default::default() }); - Ok(ExecOutput { checkpoint }) + Ok(ExecOutput { checkpoint, done: true }) } /// Unwind the stage. @@ -194,7 +197,7 @@ impl Stage for StorageHashingStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress) = + let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); provider.unwind_storage_hashing(BlockNumberAddress::range(range))?; @@ -224,8 +227,8 @@ fn stage_checkpoint_progress( mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_db::{ @@ -240,6 +243,8 @@ mod tests { stage::StageUnitCheckpoint, Address, SealedBlock, StorageEntry, H256, U256, }; + stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing); + /// Execute with low clean threshold so as to hash whole storage #[tokio::test] async fn execute_clean_storage_hashing() { @@ -263,8 +268,10 @@ mod tests { runner.seed_execution(input).expect("failed to seed execution"); loop { - if let Ok(result @ ExecOutput { checkpoint }) = runner.execute(input).await.unwrap() { - if !result.is_done(input) { + if let Ok(result @ ExecOutput { checkpoint, done }) = + runner.execute(input).await.unwrap() + { + if !done { let previous_checkpoint = input .checkpoint .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint()) @@ -354,7 +361,8 @@ mod tests { total } })) - } + }, + done: false }) if address == progress_address && storage == progress_key && total == runner.tx.table::().unwrap().len() as u64 ); @@ -399,7 +407,8 @@ mod tests { } } )) - } + }, + done: false }) if address == progress_address && storage == progress_key && total == runner.tx.table::().unwrap().len() as u64 ); @@ -430,7 +439,8 @@ mod tests { } } )) - } + }, + done: true }) if processed == total && total == runner.tx.table::().unwrap().len() as u64 ); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 21560ae0b4..ad857d6351 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -210,7 +210,7 @@ where // Nothing to sync if gap.is_closed() { info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached"); - return Ok(ExecOutput { checkpoint: current_checkpoint }) + return Ok(ExecOutput::done(current_checkpoint)) } debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync"); @@ -313,10 +313,12 @@ where Ok(ExecOutput { checkpoint: StageCheckpoint::new(checkpoint) .with_headers_stage_checkpoint(stage_checkpoint), + done: true, }) } else { Ok(ExecOutput { checkpoint: current_checkpoint.with_headers_stage_checkpoint(stage_checkpoint), + done: false, }) } } @@ -589,7 +591,7 @@ mod tests { total, } })) - }}) if block_number == tip.number && + }, done: true }) if block_number == tip.number && from == checkpoint && to == previous_stage && // -1 because we don't need to download the local head processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); @@ -685,7 +687,7 @@ mod tests { total, } })) - }}) if block_number == checkpoint && + }, done: false }) if block_number == checkpoint && from == checkpoint && to == previous_stage && processed == checkpoint + 500 && total == tip.number); @@ -708,7 +710,7 @@ mod tests { total, } })) - }}) if block_number == tip.number && + }, done: true }) if block_number == tip.number && from == checkpoint && to == previous_stage && // -1 because we don't need to download the local head processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index acbce16bd0..f965009092 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -38,7 +38,11 @@ impl Stage for IndexAccountHistoryStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { - let range = input.next_block_range_with_threshold(self.commit_threshold); + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + + let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); let mut stage_checkpoint = stage_checkpoint( provider, @@ -59,6 +63,7 @@ impl Stage for IndexAccountHistoryStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()) .with_index_history_stage_checkpoint(stage_checkpoint), + done: is_final_range, }) } @@ -68,7 +73,7 @@ impl Stage for IndexAccountHistoryStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress) = + let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); let changesets = provider.unwind_account_history_indices(range)?; @@ -217,9 +222,9 @@ mod tests { progress: EntitiesCheckpoint { processed: 2, total: 2 } } ), + done: true } ); - assert!(out.is_done(input)); provider.commit().unwrap(); } @@ -457,10 +462,10 @@ mod tests { block_range: CheckpointBlockRange { from: 1, to: 5 }, progress: EntitiesCheckpoint { processed: 1, total: 2 } } - ) + ), + done: false } ); - assert!(!out.is_done(input)); input.checkpoint = Some(out.checkpoint); let out = stage.execute(&mut provider, input).await.unwrap(); @@ -472,10 +477,10 @@ mod tests { block_range: CheckpointBlockRange { from: 5, to: 5 }, progress: EntitiesCheckpoint { processed: 2, total: 2 } } - ) + ), + done: true } ); - assert!(out.is_done(input)); provider.commit().unwrap(); } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 1b4db0bb48..cc354a4daf 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -41,7 +41,11 @@ impl Stage for IndexStorageHistoryStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { - let range = input.next_block_range_with_threshold(self.commit_threshold); + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + + let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); let mut stage_checkpoint = stage_checkpoint( provider, @@ -61,6 +65,7 @@ impl Stage for IndexStorageHistoryStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()) .with_index_history_stage_checkpoint(stage_checkpoint), + done: is_final_range, }) } @@ -70,7 +75,7 @@ impl Stage for IndexStorageHistoryStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: UnwindInput, ) -> Result { - let (range, unwind_progress) = + let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); let changesets = @@ -229,10 +234,10 @@ mod tests { block_range: CheckpointBlockRange { from: input.next_block(), to: run_to }, progress: EntitiesCheckpoint { processed: 2, total: 2 } } - ) + ), + done: true } ); - assert!(out.is_done(input)); provider.commit().unwrap(); } @@ -473,10 +478,10 @@ mod tests { block_range: CheckpointBlockRange { from: 1, to: 5 }, progress: EntitiesCheckpoint { processed: 1, total: 2 } } - ) + ), + done: false } ); - assert!(!out.is_done(input)); input.checkpoint = Some(out.checkpoint); let out = stage.execute(&mut provider, input).await.unwrap(); @@ -488,10 +493,10 @@ mod tests { block_range: CheckpointBlockRange { from: 5, to: 5 }, progress: EntitiesCheckpoint { processed: 2, total: 2 } } - ) + ), + done: true } ); - assert!(out.is_done(input)); provider.commit().unwrap(); } diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 8e7ff43908..bfb0344e43 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -144,7 +144,7 @@ impl Stage for MerkleStage { let threshold = match self { MerkleStage::Unwind => { info!(target: "sync::stages::merkle::unwind", "Stage is always skipped"); - return Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) }) + return Ok(ExecOutput::done(StageCheckpoint::new(input.target()))) } MerkleStage::Execution { clean_threshold } => *clean_threshold, #[cfg(any(test, feature = "test-utils"))] @@ -226,6 +226,7 @@ impl Stage for MerkleStage { checkpoint: input .checkpoint() .with_entities_stage_checkpoint(entities_checkpoint), + done: false, }) } StateRootProgress::Complete(root, hashed_entries_walked, updates) => { @@ -266,6 +267,7 @@ impl Stage for MerkleStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(to_block) .with_entities_stage_checkpoint(entities_checkpoint), + done: true, }) } @@ -328,8 +330,8 @@ impl Stage for MerkleStage { mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_db::{ @@ -346,6 +348,8 @@ mod tests { use reth_trie::test_utils::{state_root, state_root_prehashed}; use std::collections::BTreeMap; + stage_test_suite_ext!(MerkleTestRunner, merkle); + /// Execute from genesis so as to merkelize whole state #[tokio::test] async fn execute_clean_merkle() { @@ -374,7 +378,8 @@ mod tests { processed, total })) - } + }, + done: true }) if block_number == previous_stage && processed == total && total == ( runner.tx.table::().unwrap().len() + @@ -413,7 +418,8 @@ mod tests { processed, total })) - } + }, + done: true }) if block_number == previous_stage && processed == total && total == ( runner.tx.table::().unwrap().len() + diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 0b24372662..bafa6938f2 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -59,7 +59,11 @@ impl Stage for SenderRecoveryStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { - let (tx_range, block_range) = + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + + let (tx_range, block_range, is_final_range) = input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?; let end_block = *block_range.end(); @@ -69,6 +73,7 @@ impl Stage for SenderRecoveryStage { return Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(provider)?), + done: is_final_range, }) } @@ -150,6 +155,7 @@ impl Stage for SenderRecoveryStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(provider)?), + done: is_final_range, }) } @@ -159,7 +165,7 @@ impl Stage for SenderRecoveryStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: UnwindInput, ) -> Result { - let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); // Lookup latest tx id that we should unwind to let latest_tx_id = provider.block_body_indices(unwind_to)?.last_tx_num(); @@ -227,10 +233,12 @@ mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; + stage_test_suite_ext!(SenderRecoveryTestRunner, sender_recovery); + /// Execute a block range with a single transaction #[tokio::test] async fn execute_single_transaction() { @@ -264,7 +272,7 @@ mod tests { processed: 1, total: 1 })) - }}) if block_number == previous_stage + }, done: true }) if block_number == previous_stage ); // Validate the stage execution @@ -303,17 +311,17 @@ mod tests { .unwrap_or(previous_stage); assert_matches!(result, Ok(_)); assert_eq!( - result.as_ref().unwrap(), - &ExecOutput { + result.unwrap(), + ExecOutput { checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: runner.tx.table::().unwrap().len() as u64, total: total_transactions } - ) + ), + done: false } ); - assert!(!result.unwrap().is_done(first_input)); // Execute second time to completion runner.set_threshold(u64::MAX); @@ -328,7 +336,8 @@ mod tests { &ExecOutput { checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: total_transactions, total: total_transactions } - ) + ), + done: true } ); diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index 9562016136..41afa82130 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -55,8 +55,11 @@ impl Stage for TotalDifficultyStage { input: ExecInput, ) -> Result { let tx = provider.tx_ref(); + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } - let range = input.next_block_range_with_threshold(self.commit_threshold); + let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); let (start_block, end_block) = range.clone().into_inner(); debug!(target: "sync::stages::total_difficulty", start_block, end_block, "Commencing sync"); @@ -88,6 +91,7 @@ impl Stage for TotalDifficultyStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(provider)?), + done: is_final_range, }) } @@ -97,7 +101,7 @@ impl Stage for TotalDifficultyStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: UnwindInput, ) -> Result { - let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); provider.unwind_table_by_num::(unwind_to)?; @@ -129,10 +133,12 @@ mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; + stage_test_suite_ext!(TotalDifficultyTestRunner, total_difficulty); + #[tokio::test] async fn execute_with_intermediate_commit() { let threshold = 50; @@ -160,10 +166,9 @@ mod tests { processed, total })) - }}) if block_number == expected_progress && processed == 1 + threshold && + }, done: false }) if block_number == expected_progress && processed == 1 + threshold && total == runner.tx.table::().unwrap().len() as u64 ); - assert!(!result.unwrap().is_done(first_input)); // Execute second time let second_input = ExecInput { @@ -179,7 +184,7 @@ mod tests { processed, total })) - }}) if block_number == previous_stage && processed == total && + }, done: true }) if block_number == previous_stage && processed == total && total == runner.tx.table::().unwrap().len() as u64 ); diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 3d77fc6af6..11757dc40d 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -54,7 +54,10 @@ impl Stage for TransactionLookupStage { provider: &mut DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { - let (tx_range, block_range) = + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + let (tx_range, block_range, is_final_range) = input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?; let end_block = *block_range.end(); @@ -135,6 +138,7 @@ impl Stage for TransactionLookupStage { Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block) .with_entities_stage_checkpoint(stage_checkpoint(provider)?), + done: is_final_range, }) } @@ -145,7 +149,7 @@ impl Stage for TransactionLookupStage { input: UnwindInput, ) -> Result { let tx = provider.tx_ref(); - let (range, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); // Cursors to unwind tx hash to number let mut body_cursor = tx.cursor_read::()?; @@ -188,13 +192,16 @@ fn stage_checkpoint( mod tests { use super::*; use crate::test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, - UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::{random_block, random_block_range}; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256}; + // Implement stage test suite. + stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); + #[tokio::test] async fn execute_single_transaction_lookup() { let (previous_stage, stage_progress) = (500, 100); @@ -227,7 +234,7 @@ mod tests { processed, total })) - }}) if block_number == previous_stage && processed == total && + }, done: true }) if block_number == previous_stage && processed == total && total == runner.tx.table::().unwrap().len() as u64 ); @@ -266,17 +273,17 @@ mod tests { .unwrap_or(previous_stage); assert_matches!(result, Ok(_)); assert_eq!( - result.as_ref().unwrap(), - &ExecOutput { + result.unwrap(), + ExecOutput { checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: runner.tx.table::().unwrap().len() as u64, total: total_txs } - ) + ), + done: false } ); - assert!(!result.unwrap().is_done(first_input)); // Execute second time to completion runner.set_threshold(u64::MAX); @@ -291,7 +298,8 @@ mod tests { &ExecOutput { checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint( EntitiesCheckpoint { processed: total_txs, total: total_txs } - ) + ), + done: true } ); diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index 533d658475..f691d13711 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -42,8 +42,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ref output @ ExecOutput { checkpoint }) - if output.is_done(input) && checkpoint.block_number == previous_stage + Ok(ExecOutput { done, checkpoint }) + if done && checkpoint.block_number == previous_stage ); // Validate the stage execution @@ -94,8 +94,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ref output @ ExecOutput { checkpoint }) - if output.is_done(execute_input) && checkpoint.block_number == previous_stage + Ok(ExecOutput { done, checkpoint }) + if done && checkpoint.block_number == previous_stage ); assert_matches::assert_matches!(runner.validate_execution(execute_input, result.ok()),Ok(_), "execution validation"); @@ -113,8 +113,7 @@ macro_rules! stage_test_suite { // Assert the successful unwind result assert_matches::assert_matches!( rx, - Ok(output @ UnwindOutput { checkpoint }) - if output.is_done(unwind_input) && checkpoint.block_number == unwind_input.unwind_to + Ok(UnwindOutput { checkpoint }) if checkpoint.block_number == unwind_input.unwind_to ); // Validate the stage unwind @@ -124,4 +123,46 @@ macro_rules! stage_test_suite { }; } +// `execute_already_reached_target` is not suitable for the headers stage thus +// included in the test suite extension +macro_rules! stage_test_suite_ext { + ($runner:ident, $name:ident) => { + crate::test_utils::stage_test_suite!($runner, $name); + + paste::item! { + /// Check that the execution is short-circuited if the target was already reached. + #[tokio::test] + async fn [< execute_already_reached_target_ $name>] () { + let stage_progress = 1000; + + // Set up the runner + let mut runner = $runner::default(); + let input = crate::stage::ExecInput { + target: Some(stage_progress), + checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)), + }; + let seed = runner.seed_execution(input).expect("failed to seed"); + + // Run stage execution + let rx = runner.execute(input); + + // Run `after_execution` hook + runner.after_execution(seed).await.expect("failed to run after execution hook"); + + // Assert the successful result + let result = rx.await.unwrap(); + assert_matches::assert_matches!( + result, + Ok(ExecOutput { done, checkpoint }) + if done && checkpoint.block_number == stage_progress + ); + + // Validate the stage execution + assert_matches::assert_matches!(runner.validate_execution(input, result.ok()),Ok(_), "execution validation"); + } + } + }; +} + pub(crate) use stage_test_suite; +pub(crate) use stage_test_suite_ext; diff --git a/crates/stages/src/test_utils/stage.rs b/crates/stages/src/test_utils/stage.rs index 231ce3880e..028b74218f 100644 --- a/crates/stages/src/test_utils/stage.rs +++ b/crates/stages/src/test_utils/stage.rs @@ -1,49 +1,19 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; -use reth_primitives::{ - stage::{StageCheckpoint, StageId}, - MAINNET, -}; -use reth_provider::{DatabaseProviderRW, ShareableDatabase}; +use reth_primitives::stage::StageId; +use reth_provider::DatabaseProviderRW; use std::collections::VecDeque; #[derive(Debug)] pub struct TestStage { id: StageId, - checkpoint: Option, exec_outputs: VecDeque>, unwind_outputs: VecDeque>, } impl TestStage { pub fn new(id: StageId) -> Self { - Self { - id, - checkpoint: None, - exec_outputs: VecDeque::new(), - unwind_outputs: VecDeque::new(), - } - } - - pub fn with_checkpoint( - mut self, - checkpoint: Option, - provider: DatabaseProviderRW<'_, DB>, - ) -> Self { - if let Some(checkpoint) = checkpoint { - provider - .save_stage_checkpoint(self.id, checkpoint) - .unwrap_or_else(|_| panic!("save stage {} checkpoint", self.id)) - } else { - provider - .delete_stage_checkpoint(self.id) - .unwrap_or_else(|_| panic!("delete stage {} checkpoint", self.id)) - } - - provider.commit().expect("provider commit"); - - self.checkpoint = checkpoint; - self + Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() } } pub fn with_exec(mut self, exec_outputs: VecDeque>) -> Self { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index ffe173763a..a355a09e63 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1108,12 +1108,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(()) } - /// Delete stage checkpoint. - pub fn delete_stage_checkpoint(&self, id: StageId) -> std::result::Result<(), DatabaseError> { - self.tx.delete::(id.to_string(), None)?; - Ok(()) - } - /// Get stage checkpoint progress. pub fn get_stage_checkpoint_progress( &self,