diff --git a/bin/reth/src/args/stage_args.rs b/bin/reth/src/args/stage_args.rs index 00886b7183..f955a57810 100644 --- a/bin/reth/src/args/stage_args.rs +++ b/bin/reth/src/args/stage_args.rs @@ -17,4 +17,5 @@ pub enum StageEnum { History, AccountHistory, StorageHistory, + TotalDifficulty, } diff --git a/bin/reth/src/stage/drop.rs b/bin/reth/src/stage/drop.rs index 8fd960f21d..482f17c3c2 100644 --- a/bin/reth/src/stage/drop.rs +++ b/bin/reth/src/stage/drop.rs @@ -12,7 +12,7 @@ use reth_db::{ transaction::DbTxMut, }; use reth_primitives::{stage::StageId, ChainSpec}; -use reth_staged_sync::utils::init::insert_genesis_state; +use reth_staged_sync::utils::init::{insert_genesis_header, insert_genesis_state}; use std::sync::Arc; use tracing::info; @@ -63,6 +63,15 @@ impl Command { tool.db.update(|tx| { match &self.stage { + StageEnum::Bodies => { + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.clear::()?; + tx.put::(StageId::Bodies.to_string(), Default::default())?; + insert_genesis_header::>(tx, self.chain)?; + } StageEnum::Senders => { tx.clear::()?; tx.put::( @@ -140,6 +149,14 @@ impl Command { Default::default(), )?; } + StageEnum::TotalDifficulty => { + tx.clear::()?; + tx.put::( + StageId::TotalDifficulty.to_string(), + Default::default(), + )?; + insert_genesis_header::>(tx, self.chain)?; + } _ => { info!("Nothing to do for stage {:?}", self.stage); return Ok(()) diff --git a/crates/primitives/src/chain/spec.rs b/crates/primitives/src/chain/spec.rs index 5b0e90770b..1c264990dd 100644 --- a/crates/primitives/src/chain/spec.rs +++ b/crates/primitives/src/chain/spec.rs @@ -4,7 +4,7 @@ use crate::{ header::Head, proofs::genesis_state_root, BlockNumber, Chain, ForkFilter, ForkHash, ForkId, Genesis, GenesisAccount, Hardfork, Header, - H160, H256, U256, + SealedHeader, H160, H256, U256, }; use ethers_core::utils::Genesis as EthersGenesis; use hex_literal::hex; @@ -193,6 +193,11 @@ impl ChainSpec { } } + /// Get the sealed header for the genesis block. + pub fn sealed_genesis_header(&self) -> SealedHeader { + SealedHeader { header: self.genesis_header(), hash: self.genesis_hash() } + } + /// Get the initial base fee of the genesis block. pub fn initial_base_fee(&self) -> Option { // If London is activated at genesis, we set the initial base fee as per EIP-1559. diff --git a/crates/staged-sync/src/utils/init.rs b/crates/staged-sync/src/utils/init.rs index 782c554a1e..aaeb836400 100644 --- a/crates/staged-sync/src/utils/init.rs +++ b/crates/staged-sync/src/utils/init.rs @@ -48,8 +48,7 @@ pub fn init_genesis( ) -> Result { let genesis = chain.genesis(); - let header = chain.genesis_header(); - let hash = header.hash_slow(); + let hash = chain.genesis_hash(); let tx = db.tx()?; if let Some((_, db_hash)) = tx.cursor_read::()?.first()? { @@ -71,11 +70,7 @@ pub fn init_genesis( // Insert header let tx = db.tx_mut()?; - tx.put::(0, hash)?; - tx.put::(hash, 0)?; - tx.put::(0, Default::default())?; - tx.put::(0, header.difficulty.into())?; - tx.put::(0, header)?; + insert_genesis_header::(&tx, chain.clone())?; insert_genesis_state::(&tx, genesis)?; @@ -141,6 +136,22 @@ pub fn insert_genesis_hashes( Ok(()) } +/// Inserts header for the genesis state. +pub fn insert_genesis_header( + tx: &>::TXMut, + chain: Arc, +) -> Result<(), InitDatabaseError> { + let header = chain.sealed_genesis_header(); + + tx.put::(0, header.hash)?; + tx.put::(header.hash, 0)?; + tx.put::(0, Default::default())?; + tx.put::(0, header.difficulty.into())?; + tx.put::(0, header.header)?; + + Ok(()) +} + #[cfg(test)] mod tests { use super::{init_genesis, InitDatabaseError}; diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index ed22ca6a2f..ffcde2274b 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -5,15 +5,16 @@ use reth_db::{ database::Database, models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals}, tables, - transaction::DbTxMut, + transaction::{DbTx, DbTxMut}, + DatabaseError, }; use reth_interfaces::{ consensus::Consensus, p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}, }; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_primitives::stage::{EntitiesCheckpoint, StageCheckpoint, StageId}; use reth_provider::Transaction; -use std::sync::Arc; +use std::{ops::Deref, sync::Arc}; use tracing::*; // TODO(onbjerg): Metrics and events (gradual status for e.g. CLI) @@ -154,7 +155,11 @@ impl Stage for BodyStage { // - We reached our target and the target was not limited by the batch size of the stage let done = highest_block == to_block; info!(target: "sync::stages::bodies", stage_progress = highest_block, target = to_block, is_final_range = done, "Stage iteration finished"); - Ok(ExecOutput { checkpoint: StageCheckpoint::new(highest_block), done }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(highest_block) + .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + done, + }) } /// Unwind the stage. @@ -207,10 +212,25 @@ impl Stage for BodyStage { } info!(target: "sync::stages::bodies", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished"); - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) }) + Ok(UnwindOutput { + checkpoint: StageCheckpoint::new(input.unwind_to) + .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + }) } } +// TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know +// beforehand how many bytes we need to download. So the good solution would be to measure the +// progress in gas as a proxy to size. Execution stage uses a similar approach. +fn stage_checkpoint( + tx: &Transaction<'_, DB>, +) -> Result { + Ok(EntitiesCheckpoint { + processed: tx.deref().entries::()? as u64, + total: tx.deref().entries::()? as u64, + }) +} + #[cfg(test)] mod tests { use super::*; @@ -219,6 +239,7 @@ mod tests { PREV_STAGE_ID, }; use assert_matches::assert_matches; + use reth_primitives::stage::StageUnitCheckpoint; use test_utils::*; stage_test_suite_ext!(BodyTestRunner, body); @@ -238,7 +259,8 @@ mod tests { // Set the batch size (max we sync per stage execution) to less than the number of blocks // the previous stage synced (10 vs 20) - runner.set_batch_size(10); + let batch_size = 10; + runner.set_batch_size(batch_size); // Run the stage let rx = runner.execute(input); @@ -248,7 +270,14 @@ mod tests { let output = rx.await.unwrap(); assert_matches!( output, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: false }) if block_number < 200 + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, // 1 seeded block body + batch size + total // seeded headers + })) + }, done: false }) if block_number < 200 && + processed == 1 + batch_size && total == previous_stage ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); } @@ -278,9 +307,15 @@ mod tests { assert_matches!( output, Ok(ExecOutput { - checkpoint: StageCheckpoint { block_number: 20, stage_checkpoint: None }, + checkpoint: StageCheckpoint { + block_number: 20, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: true - }) + }) if processed == total && total == previous_stage ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); } @@ -298,7 +333,8 @@ mod tests { }; runner.seed_execution(input).expect("failed to seed execution"); - runner.set_batch_size(10); + let batch_size = 10; + runner.set_batch_size(batch_size); // Run the stage let rx = runner.execute(input); @@ -307,7 +343,14 @@ mod tests { let first_run = rx.await.unwrap(); assert_matches!( first_run, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: false }) if block_number >= 10 + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: false }) if block_number >= 10 && + processed == 1 + batch_size && total == previous_stage ); let first_run_checkpoint = first_run.unwrap().checkpoint; @@ -322,7 +365,14 @@ mod tests { let output = rx.await.unwrap(); assert_matches!( output, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: true }) if block_number > first_run_checkpoint.block_number + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: true }) if block_number > first_run_checkpoint.block_number && + processed == total && total == previous_stage ); assert_matches!( runner.validate_execution(input, output.ok()), @@ -355,7 +405,14 @@ mod tests { let output = rx.await.unwrap(); assert_matches!( output, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: true }) if block_number == previous_stage + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: true }) if block_number == previous_stage && + processed == total && total == previous_stage ); let checkpoint = output.unwrap().checkpoint; runner @@ -379,7 +436,13 @@ mod tests { let res = runner.unwind(input).await; assert_matches!( res, - Ok(UnwindOutput { checkpoint: StageCheckpoint { block_number: 1, .. } }) + Ok(UnwindOutput { checkpoint: StageCheckpoint { + block_number: 1, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed: 1, + total + })) + }}) if total == previous_stage ); assert_matches!(runner.validate_unwind(input), Ok(_), "unwind validation"); diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index ea1daf50b9..5821551828 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -4,14 +4,15 @@ use reth_db::{ database::Database, tables, transaction::{DbTx, DbTxMut}, + DatabaseError, }; use reth_interfaces::{consensus::Consensus, provider::ProviderError}; use reth_primitives::{ - stage::{StageCheckpoint, StageId}, + stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, U256, }; use reth_provider::Transaction; -use std::sync::Arc; +use std::{ops::Deref, sync::Arc}; use tracing::*; /// The total difficulty stage. @@ -81,8 +82,13 @@ impl Stage for TotalDifficultyStage { .map_err(|error| StageError::Validation { block: header.seal_slow(), error })?; cursor_td.append(block_number, td.into())?; } + info!(target: "sync::stages::total_difficulty", stage_progress = end_block, is_final_range, "Stage iteration finished"); - Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block), done: is_final_range }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(end_block) + .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + done: is_final_range, + }) } /// Unwind the stage. @@ -97,18 +103,31 @@ impl Stage for TotalDifficultyStage { tx.unwind_table_by_num::(unwind_to)?; info!(target: "sync::stages::total_difficulty", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished"); - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) }) + Ok(UnwindOutput { + checkpoint: StageCheckpoint::new(unwind_to) + .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + }) } } +fn stage_checkpoint( + tx: &Transaction<'_, DB>, +) -> Result { + Ok(EntitiesCheckpoint { + processed: tx.deref().entries::()? as u64, + total: tx.deref().entries::()? as u64, + }) +} + #[cfg(test)] mod tests { + use assert_matches::assert_matches; use reth_db::transaction::DbTx; use reth_interfaces::test_utils::{ generators::{random_header, random_header_range}, TestConsensus, }; - use reth_primitives::{BlockNumber, SealedHeader}; + use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedHeader}; use super::*; use crate::test_utils::{ @@ -137,11 +156,17 @@ mod tests { // Execute first time let result = runner.execute(first_input).await.unwrap(); let expected_progress = stage_progress + threshold; - assert!(matches!( + assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: false }) - if block_number == expected_progress - )); + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: false }) if block_number == expected_progress && processed == 1 + threshold && + total == runner.tx.table::().unwrap().len() as u64 + ); // Execute second time let second_input = ExecInput { @@ -149,11 +174,17 @@ mod tests { checkpoint: Some(StageCheckpoint::new(expected_progress)), }; let result = runner.execute(second_input).await.unwrap(); - assert!(matches!( + assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: true }) - if block_number == previous_stage - )); + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: true }) if block_number == previous_stage && processed == total && + total == runner.tx.table::().unwrap().len() as u64 + ); assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); }