diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index 1de670e3a8..47e37c3699 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -1,16 +1,25 @@ //! Command for debugging merkle trie calculation. use crate::{ - args::{utils::genesis_value_parser, DatabaseArgs}, + args::{get_secret_key, utils::genesis_value_parser, DatabaseArgs, NetworkArgs}, dirs::{DataDirPath, MaybePlatformPath}, + runner::CliContext, + utils::get_single_header, }; +use backon::{ConstantBuilder, Retryable}; use clap::Parser; -use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx}; +use reth_beacon_consensus::BeaconConsensus; +use reth_config::Config; +use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx, DatabaseEnv}; +use reth_discv4::DEFAULT_DISCOVERY_PORT; +use reth_interfaces::{consensus::Consensus, p2p::full_block::FullBlockClient}; +use reth_network::NetworkHandle; +use reth_network_api::NetworkInfo; use reth_primitives::{ fs, stage::{StageCheckpoint, StageId}, - ChainSpec, PruneModes, + BlockHashOrNumber, ChainSpec, PruneModes, }; -use reth_provider::{ProviderFactory, StageCheckpointReader}; +use reth_provider::{BlockWriter, ProviderFactory, StageCheckpointReader}; use reth_stages::{ stages::{ AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, @@ -18,7 +27,13 @@ use reth_stages::{ }, ExecInput, PipelineError, Stage, }; -use std::sync::Arc; +use reth_tasks::TaskExecutor; +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + path::PathBuf, + sync::Arc, +}; +use tracing::{debug, info, warn}; /// `reth merkle-debug` command #[derive(Debug, Parser)] @@ -53,6 +68,13 @@ pub struct Command { #[clap(flatten)] db: DatabaseArgs, + #[clap(flatten)] + network: NetworkArgs, + + /// The number of retries per request + #[arg(long, default_value = "5")] + retries: usize, + /// The height to finish at #[arg(long)] to: u64, @@ -63,21 +85,109 @@ pub struct Command { } impl Command { + async fn build_network( + &self, + config: &Config, + task_executor: TaskExecutor, + db: Arc, + network_secret_path: PathBuf, + default_peers_path: PathBuf, + ) -> eyre::Result { + let secret_key = get_secret_key(&network_secret_path)?; + let network = self + .network + .network_config(config, self.chain.clone(), secret_key, default_peers_path) + .with_task_executor(Box::new(task_executor)) + .listener_addr(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::UNSPECIFIED, + self.network.port.unwrap_or(DEFAULT_DISCOVERY_PORT), + ))) + .discovery_addr(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::UNSPECIFIED, + self.network.discovery.port.unwrap_or(DEFAULT_DISCOVERY_PORT), + ))) + .build(ProviderFactory::new(db, self.chain.clone())) + .start_network() + .await?; + info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); + debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); + Ok(network) + } + /// Execute `merkle-debug` command - pub async fn execute(self) -> eyre::Result<()> { + pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> { + let config = Config::default(); + // add network name to data dir let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let db_path = data_dir.db_path(); fs::create_dir_all(&db_path)?; + // initialize the database let db = Arc::new(init_db(db_path, self.db.log_level)?); let factory = ProviderFactory::new(&db, self.chain.clone()); let provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?; + // Configure and build network + let network_secret_path = + self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path()); + let network = self + .build_network( + &config, + ctx.task_executor.clone(), + db.clone(), + network_secret_path, + data_dir.known_peers_path(), + ) + .await?; + + // Initialize the fetch client + info!(target: "reth::cli", target_block_number=self.to, "Downloading tip of block range"); + let fetch_client = network.fetch_client().await?; + + // fetch the header at `self.to` + let retries = self.retries.max(1); + let backoff = ConstantBuilder::default().with_max_times(retries); + let client = fetch_client.clone(); + let to_header = (move || { + get_single_header(client.clone(), BlockHashOrNumber::Number(self.to)) + }) + .retry(&backoff) + .notify(|err, _| warn!(target: "reth::cli", "Error requesting header: {err}. Retrying...")) + .await?; + info!(target: "reth::cli", target_block_number=self.to, "Finished downloading tip of block range"); + + // build the full block client + let consensus: Arc = Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))); + let block_range_client = FullBlockClient::new(fetch_client, consensus); + + // get the execution checkpoint let execution_checkpoint_block = provider_rw.get_stage_checkpoint(StageId::Execution)?.unwrap_or_default().block_number; assert!(execution_checkpoint_block < self.to, "Nothing to run"); + // get the block range from the network + info!(target: "reth::cli", target_block_number=?self.to, "Downloading range of blocks"); + let block_range = block_range_client + .get_full_block_range(to_header.hash_slow(), self.to - execution_checkpoint_block) + .await; + + // recover senders + let blocks_with_senders = + block_range.into_iter().map(|block| block.try_seal_with_senders()); + + // insert the blocks + for senders_res in blocks_with_senders { + let sealed_block = match senders_res { + Ok(senders) => senders, + Err(err) => { + warn!(target: "reth::cli", "Error sealing block with senders: {err:?}. Skipping..."); + continue + } + }; + provider_rw.insert_block(sealed_block.block, Some(sealed_block.senders))?; + } + // Check if any of hashing or merkle stages aren't on the same block number as // Execution stage or have any intermediate progress. let should_reset_stages = diff --git a/bin/reth/src/debug_cmd/mod.rs b/bin/reth/src/debug_cmd/mod.rs index e624307f68..7dfa448db4 100644 --- a/bin/reth/src/debug_cmd/mod.rs +++ b/bin/reth/src/debug_cmd/mod.rs @@ -30,7 +30,7 @@ impl Command { pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> { match self.command { Subcommands::Execution(command) => command.execute(ctx).await, - Subcommands::Merkle(command) => command.execute().await, + Subcommands::Merkle(command) => command.execute(ctx).await, Subcommands::InMemoryMerkle(command) => command.execute(ctx).await, } }