feat(cli): p2p header & body download (#698)

* p2p cli scaffolding

* refactor to fetch client

* body & header download

* p2p trusted opts

* add retries

* notify on retry

* clippy

* display err
This commit is contained in:
Roman Krasiuk
2023-01-04 21:33:08 +02:00
committed by GitHub
parent 28f67b371d
commit 973ea48e03
7 changed files with 220 additions and 4 deletions

2
Cargo.lock generated
View File

@@ -3471,6 +3471,7 @@ dependencies = [
name = "reth"
version = "0.1.0"
dependencies = [
"backon",
"clap 4.0.32",
"confy",
"dirs-next",
@@ -3494,6 +3495,7 @@ dependencies = [
"serde",
"serde_json",
"shellexpand",
"tempfile",
"thiserror",
"tokio",
"tracing",

View File

@@ -47,3 +47,5 @@ clap = { version = "4.0", features = ["derive", "cargo"] }
thiserror = "1.0"
tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] }
futures = "0.3.25"
tempfile = { version = "3.3.0" }
backon = "0.2.0"

View File

@@ -4,7 +4,7 @@ use clap::{ArgAction, Parser, Subcommand};
use tracing_subscriber::util::SubscriberInitExt;
use crate::{
db, node, test_eth_chain,
db, node, p2p, test_eth_chain,
util::reth_tracing::{self, TracingMode},
};
@@ -22,6 +22,7 @@ pub async fn run() -> eyre::Result<()> {
Commands::Node(command) => command.execute().await,
Commands::TestEthChain(command) => command.execute().await,
Commands::Db(command) => command.execute().await,
Commands::P2P(command) => command.execute().await,
}
}
@@ -37,6 +38,9 @@ pub enum Commands {
/// DB Debugging utilities
#[command(name = "db")]
Db(db::Command),
/// P2P Debugging utilities
#[command(name = "p2p")]
P2P(p2p::Command),
}
#[derive(Parser)]

View File

@@ -11,6 +11,7 @@ pub mod config;
pub mod db;
pub mod dirs;
pub mod node;
pub mod p2p;
pub mod prometheus_exporter;
pub mod test_eth_chain;
pub mod util;

View File

@@ -1,9 +1,7 @@
use tracing::error;
#[tokio::main]
async fn main() {
if let Err(err) = reth::cli::run().await {
error!("Error: {:?}", err);
eprintln!("Error: {err:?}");
std::process::exit(1);
}
}

199
bin/reth/src/p2p/mod.rs Normal file
View File

@@ -0,0 +1,199 @@
//! P2P Debugging tool
use backon::{ConstantBackoff, Retryable};
use clap::{Parser, Subcommand};
use reth_db::mdbx::{Env, EnvKind, WriteMap};
use reth_interfaces::p2p::{
bodies::client::BodiesClient,
headers::client::{HeadersClient, HeadersRequest},
};
use reth_network::FetchClient;
use reth_primitives::{BlockHashOrNumber, Header, NodeRecord, SealedHeader};
use std::sync::Arc;
use crate::{
config::Config,
dirs::ConfigPath,
util::{
chainspec::{chain_spec_value_parser, ChainSpecification},
hash_or_num_value_parser,
},
};
/// `reth p2p` command
#[derive(Debug, Parser)]
pub struct Command {
/// The path to the configuration file to use.
#[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)]
config: ConfigPath,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
///
/// Built-in chains:
/// - mainnet
/// - goerli
/// - sepolia
#[arg(
long,
value_name = "CHAIN_OR_PATH",
verbatim_doc_comment,
default_value = "mainnet",
value_parser = chain_spec_value_parser
)]
chain: ChainSpecification,
/// Disable the discovery service.
#[arg(short, long)]
disable_discovery: bool,
/// Target trusted peer
#[arg(long)]
trusted_peer: Option<NodeRecord>,
/// Connect only to trusted peers
#[arg(long)]
trusted_only: bool,
/// The number of retries per request
#[arg(long, default_value = "5")]
retries: usize,
#[clap(subcommand)]
command: Subcommands,
}
#[derive(Subcommand, Debug)]
/// `reth p2p` subcommands
pub enum Subcommands {
/// Download block header
Header {
/// The header number or hash
#[arg(value_parser = hash_or_num_value_parser)]
id: BlockHashOrNumber,
},
/// Download block body
Body {
/// The block number or hash
#[arg(value_parser = hash_or_num_value_parser)]
id: BlockHashOrNumber,
},
}
impl Command {
/// Execute `p2p` command
pub async fn execute(&self) -> eyre::Result<()> {
let tempdir = tempfile::TempDir::new()?;
let noop_db = Arc::new(Env::<WriteMap>::open(&tempdir.into_path(), EnvKind::RW)?);
let mut config: Config = confy::load_path(&self.config).unwrap_or_default();
let chain_id = self.chain.consensus.chain_id;
let genesis: Header = self.chain.genesis.clone().into();
let genesis_hash = genesis.hash_slow();
if let Some(peer) = self.trusted_peer {
config.peers.trusted_nodes.insert(peer);
}
if config.peers.trusted_nodes.is_empty() && self.trusted_only {
eyre::bail!("No trusted nodes. Set trusted peer with `--trusted-peer <enode record>` or set `--trusted-only` to `false`")
}
config.peers.connect_trusted_nodes_only = self.trusted_only;
let network = config
.network_config(noop_db, chain_id, genesis_hash, self.disable_discovery)
.start_network()
.await?;
let fetch_client = Arc::new(network.fetch_client().await?);
let retries = self.retries.max(1);
let backoff = ConstantBackoff::default().with_max_times(retries);
match self.command {
Subcommands::Header { id } => {
let header = (move || self.get_single_header(fetch_client.clone(), id))
.retry(backoff)
.notify(|err, _| println!("Error requesting header: {err}. Retrying..."))
.await?;
println!("Successfully downloaded header: {header:?}");
}
Subcommands::Body { id } => {
let hash = match id {
BlockHashOrNumber::Hash(hash) => hash,
BlockHashOrNumber::Number(number) => {
println!("Block number provided. Downloading header first...");
let client = fetch_client.clone();
let header = (move || {
self.get_single_header(
client.clone(),
BlockHashOrNumber::Number(number),
)
})
.retry(backoff.clone())
.notify(|err, _| println!("Error requesting header: {err}. Retrying..."))
.await?;
header.hash()
}
};
let (_, result) = (move || {
let client = fetch_client.clone();
async move { client.get_block_bodies(vec![hash]).await }
})
.retry(backoff)
.notify(|err, _| println!("Error requesting block: {err}. Retrying..."))
.await?
.split();
if result.len() != 1 {
eyre::bail!(
"Invalid number of headers received. Expected: 1. Received: {}",
result.len()
)
}
let body = result.into_iter().next().unwrap();
println!("Successfully downloaded body: {body:?}")
}
}
Ok(())
}
/// Get a single header from network
pub async fn get_single_header(
&self,
client: Arc<FetchClient>,
id: BlockHashOrNumber,
) -> eyre::Result<SealedHeader> {
let request = HeadersRequest {
direction: reth_primitives::HeadersDirection::Rising,
limit: 1,
start: id,
};
let (_, response) = client.get_headers(request).await?.split();
if response.0.len() != 1 {
eyre::bail!(
"Invalid number of headers received. Expected: 1. Received: {}",
response.0.len()
)
}
let header = response.0.into_iter().next().unwrap().seal();
let valid = match id {
BlockHashOrNumber::Hash(hash) => header.hash() == hash,
BlockHashOrNumber::Number(number) => header.number == number,
};
if !valid {
eyre::bail!(
"Received invalid header. Received: {:?}. Expected: {:?}",
header.num_hash(),
id
);
}
Ok(header)
}
}

View File

@@ -1,7 +1,9 @@
//! Utility functions.
use reth_primitives::{BlockHashOrNumber, H256};
use std::{
env::VarError,
path::{Path, PathBuf},
str::FromStr,
};
use walkdir::{DirEntry, WalkDir};
@@ -24,6 +26,14 @@ pub(crate) fn parse_path(value: &str) -> Result<PathBuf, shellexpand::LookupErro
shellexpand::full(value).map(|path| PathBuf::from(path.into_owned()))
}
/// Parse [BlockHashOrNumber]
pub(crate) fn hash_or_num_value_parser(value: &str) -> Result<BlockHashOrNumber, eyre::Error> {
match H256::from_str(value) {
Ok(hash) => Ok(BlockHashOrNumber::Hash(hash)),
Err(_) => Ok(BlockHashOrNumber::Number(value.parse()?)),
}
}
/// Tracing utility
pub mod reth_tracing {
use tracing::Subscriber;