feat: download blocks in merkle debug script (#4137)

This commit is contained in:
Dan Cline
2023-08-14 07:00:48 -04:00
committed by GitHub
parent 72236490ce
commit dfdfea8d72
2 changed files with 117 additions and 7 deletions

View File

@@ -1,16 +1,25 @@
//! Command for debugging merkle trie calculation.
use crate::{
args::{utils::genesis_value_parser, DatabaseArgs},
args::{get_secret_key, utils::genesis_value_parser, DatabaseArgs, NetworkArgs},
dirs::{DataDirPath, MaybePlatformPath},
runner::CliContext,
utils::get_single_header,
};
use backon::{ConstantBuilder, Retryable};
use clap::Parser;
use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx};
use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx, DatabaseEnv};
use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_interfaces::{consensus::Consensus, p2p::full_block::FullBlockClient};
use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
use reth_primitives::{
fs,
stage::{StageCheckpoint, StageId},
ChainSpec, PruneModes,
BlockHashOrNumber, ChainSpec, PruneModes,
};
use reth_provider::{ProviderFactory, StageCheckpointReader};
use reth_provider::{BlockWriter, ProviderFactory, StageCheckpointReader};
use reth_stages::{
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage,
@@ -18,7 +27,13 @@ use reth_stages::{
},
ExecInput, PipelineError, Stage,
};
use std::sync::Arc;
use reth_tasks::TaskExecutor;
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
path::PathBuf,
sync::Arc,
};
use tracing::{debug, info, warn};
/// `reth merkle-debug` command
#[derive(Debug, Parser)]
@@ -53,6 +68,13 @@ pub struct Command {
#[clap(flatten)]
db: DatabaseArgs,
#[clap(flatten)]
network: NetworkArgs,
/// The number of retries per request
#[arg(long, default_value = "5")]
retries: usize,
/// The height to finish at
#[arg(long)]
to: u64,
@@ -63,21 +85,109 @@ pub struct Command {
}
impl Command {
async fn build_network(
&self,
config: &Config,
task_executor: TaskExecutor,
db: Arc<DatabaseEnv>,
network_secret_path: PathBuf,
default_peers_path: PathBuf,
) -> eyre::Result<NetworkHandle> {
let secret_key = get_secret_key(&network_secret_path)?;
let network = self
.network
.network_config(config, self.chain.clone(), secret_key, default_peers_path)
.with_task_executor(Box::new(task_executor))
.listener_addr(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
self.network.port.unwrap_or(DEFAULT_DISCOVERY_PORT),
)))
.discovery_addr(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
self.network.discovery.port.unwrap_or(DEFAULT_DISCOVERY_PORT),
)))
.build(ProviderFactory::new(db, self.chain.clone()))
.start_network()
.await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
Ok(network)
}
/// Execute `merkle-debug` command
pub async fn execute(self) -> eyre::Result<()> {
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
let config = Config::default();
// add network name to data dir
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let db_path = data_dir.db_path();
fs::create_dir_all(&db_path)?;
// initialize the database
let db = Arc::new(init_db(db_path, self.db.log_level)?);
let factory = ProviderFactory::new(&db, self.chain.clone());
let provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?;
// Configure and build network
let network_secret_path =
self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path());
let network = self
.build_network(
&config,
ctx.task_executor.clone(),
db.clone(),
network_secret_path,
data_dir.known_peers_path(),
)
.await?;
// Initialize the fetch client
info!(target: "reth::cli", target_block_number=self.to, "Downloading tip of block range");
let fetch_client = network.fetch_client().await?;
// fetch the header at `self.to`
let retries = self.retries.max(1);
let backoff = ConstantBuilder::default().with_max_times(retries);
let client = fetch_client.clone();
let to_header = (move || {
get_single_header(client.clone(), BlockHashOrNumber::Number(self.to))
})
.retry(&backoff)
.notify(|err, _| warn!(target: "reth::cli", "Error requesting header: {err}. Retrying..."))
.await?;
info!(target: "reth::cli", target_block_number=self.to, "Finished downloading tip of block range");
// build the full block client
let consensus: Arc<dyn Consensus> = Arc::new(BeaconConsensus::new(Arc::clone(&self.chain)));
let block_range_client = FullBlockClient::new(fetch_client, consensus);
// get the execution checkpoint
let execution_checkpoint_block =
provider_rw.get_stage_checkpoint(StageId::Execution)?.unwrap_or_default().block_number;
assert!(execution_checkpoint_block < self.to, "Nothing to run");
// get the block range from the network
info!(target: "reth::cli", target_block_number=?self.to, "Downloading range of blocks");
let block_range = block_range_client
.get_full_block_range(to_header.hash_slow(), self.to - execution_checkpoint_block)
.await;
// recover senders
let blocks_with_senders =
block_range.into_iter().map(|block| block.try_seal_with_senders());
// insert the blocks
for senders_res in blocks_with_senders {
let sealed_block = match senders_res {
Ok(senders) => senders,
Err(err) => {
warn!(target: "reth::cli", "Error sealing block with senders: {err:?}. Skipping...");
continue
}
};
provider_rw.insert_block(sealed_block.block, Some(sealed_block.senders))?;
}
// Check if any of hashing or merkle stages aren't on the same block number as
// Execution stage or have any intermediate progress.
let should_reset_stages =

View File

@@ -30,7 +30,7 @@ impl Command {
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
match self.command {
Subcommands::Execution(command) => command.execute(ctx).await,
Subcommands::Merkle(command) => command.execute().await,
Subcommands::Merkle(command) => command.execute(ctx).await,
Subcommands::InMemoryMerkle(command) => command.execute(ctx).await,
}
}