From 973ea48e03f30055d2550c524ddf0cdd38bdf346 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Wed, 4 Jan 2023 21:33:08 +0200 Subject: [PATCH] feat(cli): p2p header & body download (#698) * p2p cli scaffolding * refactor to fetch client * body & header download * p2p trusted opts * add retries * notify on retry * clippy * display err --- Cargo.lock | 2 + bin/reth/Cargo.toml | 2 + bin/reth/src/cli.rs | 6 +- bin/reth/src/lib.rs | 1 + bin/reth/src/main.rs | 4 +- bin/reth/src/p2p/mod.rs | 199 +++++++++++++++++++++++++++++++++++++++ bin/reth/src/util/mod.rs | 10 ++ 7 files changed, 220 insertions(+), 4 deletions(-) create mode 100644 bin/reth/src/p2p/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 5291e49c36..cda3bb62f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3471,6 +3471,7 @@ dependencies = [ name = "reth" version = "0.1.0" dependencies = [ + "backon", "clap 4.0.32", "confy", "dirs-next", @@ -3494,6 +3495,7 @@ dependencies = [ "serde", "serde_json", "shellexpand", + "tempfile", "thiserror", "tokio", "tracing", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 4e62c292d7..b50d075d93 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -47,3 +47,5 @@ clap = { version = "4.0", features = ["derive", "cargo"] } thiserror = "1.0" tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] } futures = "0.3.25" +tempfile = { version = "3.3.0" } +backon = "0.2.0" diff --git a/bin/reth/src/cli.rs b/bin/reth/src/cli.rs index b0be7f0dfe..ec210bf83d 100644 --- a/bin/reth/src/cli.rs +++ b/bin/reth/src/cli.rs @@ -4,7 +4,7 @@ use clap::{ArgAction, Parser, Subcommand}; use tracing_subscriber::util::SubscriberInitExt; use crate::{ - db, node, test_eth_chain, + db, node, p2p, test_eth_chain, util::reth_tracing::{self, TracingMode}, }; @@ -22,6 +22,7 @@ pub async fn run() -> eyre::Result<()> { Commands::Node(command) => command.execute().await, Commands::TestEthChain(command) => command.execute().await, Commands::Db(command) => command.execute().await, + Commands::P2P(command) => command.execute().await, } } @@ -37,6 +38,9 @@ pub enum Commands { /// DB Debugging utilities #[command(name = "db")] Db(db::Command), + /// P2P Debugging utilities + #[command(name = "p2p")] + P2P(p2p::Command), } #[derive(Parser)] diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 1c58970b0c..3261009fed 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -11,6 +11,7 @@ pub mod config; pub mod db; pub mod dirs; pub mod node; +pub mod p2p; pub mod prometheus_exporter; pub mod test_eth_chain; pub mod util; diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index 079f392583..11ac01de69 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -1,9 +1,7 @@ -use tracing::error; - #[tokio::main] async fn main() { if let Err(err) = reth::cli::run().await { - error!("Error: {:?}", err); + eprintln!("Error: {err:?}"); std::process::exit(1); } } diff --git a/bin/reth/src/p2p/mod.rs b/bin/reth/src/p2p/mod.rs new file mode 100644 index 0000000000..792fc89519 --- /dev/null +++ b/bin/reth/src/p2p/mod.rs @@ -0,0 +1,199 @@ +//! P2P Debugging tool +use backon::{ConstantBackoff, Retryable}; +use clap::{Parser, Subcommand}; +use reth_db::mdbx::{Env, EnvKind, WriteMap}; +use reth_interfaces::p2p::{ + bodies::client::BodiesClient, + headers::client::{HeadersClient, HeadersRequest}, +}; +use reth_network::FetchClient; +use reth_primitives::{BlockHashOrNumber, Header, NodeRecord, SealedHeader}; +use std::sync::Arc; + +use crate::{ + config::Config, + dirs::ConfigPath, + util::{ + chainspec::{chain_spec_value_parser, ChainSpecification}, + hash_or_num_value_parser, + }, +}; + +/// `reth p2p` command +#[derive(Debug, Parser)] +pub struct Command { + /// The path to the configuration file to use. + #[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)] + config: ConfigPath, + + /// The chain this node is running. + /// + /// Possible values are either a built-in chain or the path to a chain specification file. + /// + /// Built-in chains: + /// - mainnet + /// - goerli + /// - sepolia + #[arg( + long, + value_name = "CHAIN_OR_PATH", + verbatim_doc_comment, + default_value = "mainnet", + value_parser = chain_spec_value_parser + )] + chain: ChainSpecification, + + /// Disable the discovery service. + #[arg(short, long)] + disable_discovery: bool, + + /// Target trusted peer + #[arg(long)] + trusted_peer: Option, + + /// Connect only to trusted peers + #[arg(long)] + trusted_only: bool, + + /// The number of retries per request + #[arg(long, default_value = "5")] + retries: usize, + + #[clap(subcommand)] + command: Subcommands, +} + +#[derive(Subcommand, Debug)] +/// `reth p2p` subcommands +pub enum Subcommands { + /// Download block header + Header { + /// The header number or hash + #[arg(value_parser = hash_or_num_value_parser)] + id: BlockHashOrNumber, + }, + /// Download block body + Body { + /// The block number or hash + #[arg(value_parser = hash_or_num_value_parser)] + id: BlockHashOrNumber, + }, +} +impl Command { + /// Execute `p2p` command + pub async fn execute(&self) -> eyre::Result<()> { + let tempdir = tempfile::TempDir::new()?; + let noop_db = Arc::new(Env::::open(&tempdir.into_path(), EnvKind::RW)?); + + let mut config: Config = confy::load_path(&self.config).unwrap_or_default(); + + let chain_id = self.chain.consensus.chain_id; + let genesis: Header = self.chain.genesis.clone().into(); + let genesis_hash = genesis.hash_slow(); + + if let Some(peer) = self.trusted_peer { + config.peers.trusted_nodes.insert(peer); + } + + if config.peers.trusted_nodes.is_empty() && self.trusted_only { + eyre::bail!("No trusted nodes. Set trusted peer with `--trusted-peer ` or set `--trusted-only` to `false`") + } + + config.peers.connect_trusted_nodes_only = self.trusted_only; + + let network = config + .network_config(noop_db, chain_id, genesis_hash, self.disable_discovery) + .start_network() + .await?; + + let fetch_client = Arc::new(network.fetch_client().await?); + let retries = self.retries.max(1); + let backoff = ConstantBackoff::default().with_max_times(retries); + + match self.command { + Subcommands::Header { id } => { + let header = (move || self.get_single_header(fetch_client.clone(), id)) + .retry(backoff) + .notify(|err, _| println!("Error requesting header: {err}. Retrying...")) + .await?; + println!("Successfully downloaded header: {header:?}"); + } + Subcommands::Body { id } => { + let hash = match id { + BlockHashOrNumber::Hash(hash) => hash, + BlockHashOrNumber::Number(number) => { + println!("Block number provided. Downloading header first..."); + let client = fetch_client.clone(); + let header = (move || { + self.get_single_header( + client.clone(), + BlockHashOrNumber::Number(number), + ) + }) + .retry(backoff.clone()) + .notify(|err, _| println!("Error requesting header: {err}. Retrying...")) + .await?; + header.hash() + } + }; + let (_, result) = (move || { + let client = fetch_client.clone(); + async move { client.get_block_bodies(vec![hash]).await } + }) + .retry(backoff) + .notify(|err, _| println!("Error requesting block: {err}. Retrying...")) + .await? + .split(); + if result.len() != 1 { + eyre::bail!( + "Invalid number of headers received. Expected: 1. Received: {}", + result.len() + ) + } + let body = result.into_iter().next().unwrap(); + println!("Successfully downloaded body: {body:?}") + } + } + + Ok(()) + } + + /// Get a single header from network + pub async fn get_single_header( + &self, + client: Arc, + id: BlockHashOrNumber, + ) -> eyre::Result { + let request = HeadersRequest { + direction: reth_primitives::HeadersDirection::Rising, + limit: 1, + start: id, + }; + + let (_, response) = client.get_headers(request).await?.split(); + + if response.0.len() != 1 { + eyre::bail!( + "Invalid number of headers received. Expected: 1. Received: {}", + response.0.len() + ) + } + + let header = response.0.into_iter().next().unwrap().seal(); + + let valid = match id { + BlockHashOrNumber::Hash(hash) => header.hash() == hash, + BlockHashOrNumber::Number(number) => header.number == number, + }; + + if !valid { + eyre::bail!( + "Received invalid header. Received: {:?}. Expected: {:?}", + header.num_hash(), + id + ); + } + + Ok(header) + } +} diff --git a/bin/reth/src/util/mod.rs b/bin/reth/src/util/mod.rs index 14f0861601..28cfb73a86 100644 --- a/bin/reth/src/util/mod.rs +++ b/bin/reth/src/util/mod.rs @@ -1,7 +1,9 @@ //! Utility functions. +use reth_primitives::{BlockHashOrNumber, H256}; use std::{ env::VarError, path::{Path, PathBuf}, + str::FromStr, }; use walkdir::{DirEntry, WalkDir}; @@ -24,6 +26,14 @@ pub(crate) fn parse_path(value: &str) -> Result Result { + match H256::from_str(value) { + Ok(hash) => Ok(BlockHashOrNumber::Hash(hash)), + Err(_) => Ok(BlockHashOrNumber::Number(value.parse()?)), + } +} + /// Tracing utility pub mod reth_tracing { use tracing::Subscriber;