refactor: introduce reth-staged-sync crate (#962)

This commit is contained in:
Georgios Konstantopoulos
2023-01-21 15:34:49 -08:00
committed by GitHub
parent 17ed0955df
commit acac82fcb3
16 changed files with 65 additions and 55 deletions

View File

@@ -12,6 +12,7 @@ reth-primitives = { path = "../../crates/primitives" }
reth-db = {path = "../../crates/storage/db", features = ["mdbx", "test-utils"] }
# TODO: Temporary use of the test-utils feature
reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] }
reth-staged-sync = { path = "../../crates/staged-sync" }
reth-stages = { path = "../../crates/stages"}
reth-interfaces = { path = "../../crates/interfaces", features = ["test-utils"] }
reth-transaction-pool = { path = "../../crates/transaction-pool" }
@@ -22,7 +23,6 @@ reth-rlp = { path = "../../crates/common/rlp" }
reth-network = {path = "../../crates/net/network", features = ["serde"] }
reth-network-api = {path = "../../crates/net/network-api" }
reth-downloaders = {path = "../../crates/net/downloaders" }
reth-cli-utils = { path = "../../crates/cli/utils" }
reth-tracing = { path = "../../crates/tracing" }
reth-net-nat = { path = "../../crates/net/nat" }
reth-discv4 = { path = "../../crates/net/discv4" }
@@ -35,7 +35,7 @@ fdlimit = "0.2.1"
walkdir = "2.3"
serde = "1.0"
serde_json = "1.0"
shellexpand = "2.1"
shellexpand = "3.0.0"
dirs-next = "2.0.0"
confy = "0.5"

View File

@@ -1,158 +0,0 @@
//! Configuration files.
use std::sync::Arc;
use reth_db::database::Database;
use reth_discv4::Discv4Config;
use reth_network::{
config::{mainnet_nodes, rng_secret_key},
NetworkConfig, NetworkConfigBuilder, PeersConfig,
};
use reth_primitives::{ChainSpec, NodeRecord};
use reth_provider::ProviderImpl;
use serde::{Deserialize, Serialize};
/// Configuration for the reth node.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default)]
pub struct Config {
/// Configuration for each stage in the pipeline.
// TODO(onbjerg): Can we make this easier to maintain when we add/remove stages?
pub stages: StageConfig,
/// Configuration for the discovery service.
pub peers: PeersConfig,
}
impl Config {
/// Initializes network config from read data
pub fn network_config<DB: Database>(
&self,
db: Arc<DB>,
chain_spec: ChainSpec,
disable_discovery: bool,
bootnodes: Option<Vec<NodeRecord>>,
nat_resolution_method: reth_net_nat::NatResolver,
) -> NetworkConfig<ProviderImpl<DB>> {
let peer_config = reth_network::PeersConfig::default()
.with_trusted_nodes(self.peers.trusted_nodes.clone())
.with_connect_trusted_nodes_only(self.peers.connect_trusted_nodes_only);
let discv4 =
Discv4Config::builder().external_ip_resolver(Some(nat_resolution_method)).clone();
NetworkConfigBuilder::new(rng_secret_key())
.boot_nodes(bootnodes.unwrap_or_else(mainnet_nodes))
.peer_config(peer_config)
.discovery(discv4)
.chain_spec(chain_spec)
.set_discovery(disable_discovery)
.build(Arc::new(ProviderImpl::new(db)))
}
}
/// Configuration for each stage in the pipeline.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct StageConfig {
/// Header stage configuration.
pub headers: HeadersConfig,
/// Total difficulty stage configuration
pub total_difficulty: TotalDifficultyConfig,
/// Body stage configuration.
pub bodies: BodiesConfig,
/// Sender recovery stage configuration.
pub sender_recovery: SenderRecoveryConfig,
/// Execution stage configuration.
pub execution: ExecutionConfig,
}
/// Header stage configuration.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct HeadersConfig {
/// The maximum number of headers to download before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of headers to request from a peer at a time.
pub downloader_batch_size: u64,
/// The number of times to retry downloading a set of headers.
pub downloader_retries: usize,
}
impl Default for HeadersConfig {
fn default() -> Self {
Self { commit_threshold: 10_000, downloader_batch_size: 1000, downloader_retries: 5 }
}
}
/// Total difficulty stage configuration
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TotalDifficultyConfig {
/// The maximum number of total difficulty entries to sum up before committing progress to the
/// database.
pub commit_threshold: u64,
}
impl Default for TotalDifficultyConfig {
fn default() -> Self {
Self { commit_threshold: 100_000 }
}
}
/// Body stage configuration.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BodiesConfig {
/// The maximum number of bodies to download before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of bodies to request from a peer at a time.
pub downloader_batch_size: usize,
/// The number of times to retry downloading a set of bodies.
pub downloader_retries: usize,
/// The maximum number of body requests to have in flight at a time.
///
/// The maximum number of bodies downloaded at the same time is `downloader_batch_size *
/// downloader_concurrency`.
pub downloader_concurrency: usize,
}
impl Default for BodiesConfig {
fn default() -> Self {
Self {
commit_threshold: 5_000,
downloader_batch_size: 100,
downloader_retries: 5,
downloader_concurrency: 10,
}
}
}
/// Sender recovery stage configuration.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SenderRecoveryConfig {
/// The maximum number of blocks to process before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of transactions to recover senders for concurrently.
pub batch_size: usize,
}
impl Default for SenderRecoveryConfig {
fn default() -> Self {
Self { commit_threshold: 5_000, batch_size: 1000 }
}
}
/// Execution stage configuration.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ExecutionConfig {
/// The maximum number of blocks to execution before committing progress to the database.
pub commit_threshold: u64,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self { commit_threshold: 5_000 }
}
}
#[cfg(test)]
mod tests {
use super::Config;
#[test]
fn can_serde_config() {
let _: Config = confy::load("test", None).unwrap();
}
}

View File

@@ -1,5 +1,5 @@
//! reth data directories.
use reth_cli_utils::parse_path;
use reth_staged_sync::utils::parse_path;
use std::{
env::VarError,
fmt::{Debug, Display, Formatter},

View File

@@ -7,7 +7,6 @@
//! Rust Ethereum (reth) binary executable.
pub mod cli;
pub mod config;
pub mod db;
pub mod dirs;
pub mod node;
@@ -15,7 +14,7 @@ pub mod p2p;
pub mod prometheus_exporter;
pub mod stage;
pub mod test_eth_chain;
pub use reth_cli_utils as utils;
pub use reth_staged_sync::utils;
use clap::Args;
use reth_primitives::NodeRecord;

View File

@@ -2,7 +2,6 @@
//!
//! Starts the client
use crate::{
config::Config,
dirs::{ConfigPath, DbPath, PlatformPath},
prometheus_exporter,
utils::{chainspec::chain_spec_value_parser, init::init_db, parse_socket_address},
@@ -12,7 +11,6 @@ use clap::{crate_version, Parser};
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{stream::select as stream_select, Stream, StreamExt};
use reth_cli_utils::init::init_genesis;
use reth_consensus::BeaconConsensus;
use reth_downloaders::{bodies, headers};
use reth_interfaces::consensus::ForkchoiceState;
@@ -20,6 +18,7 @@ use reth_net_nat::NatResolver;
use reth_network::NetworkEvent;
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, H256};
use reth_staged_sync::{utils::init::init_genesis, Config};
use reth_stages::{
metrics::HeaderMetrics,
stages::{

View File

@@ -1,6 +1,5 @@
//! P2P Debugging tool
use crate::{
config::Config,
dirs::{ConfigPath, PlatformPath},
utils::{chainspec::chain_spec_value_parser, hash_or_num_value_parser},
};
@@ -14,6 +13,7 @@ use reth_interfaces::p2p::{
};
use reth_network::FetchClient;
use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord, SealedHeader};
use reth_staged_sync::Config;
use std::sync::Arc;
/// `reth p2p` command

View File

@@ -2,7 +2,6 @@
//!
//! Stage debugging tool
use crate::{
config::Config,
dirs::{ConfigPath, DbPath, PlatformPath},
prometheus_exporter,
utils::{chainspec::chain_spec_value_parser, init::init_db},
@@ -13,6 +12,7 @@ use reth_downloaders::bodies::concurrent::ConcurrentDownloader;
use reth_net_nat::NatResolver;
use reth_primitives::ChainSpec;
use reth_staged_sync::Config;
use reth_stages::{
stages::{bodies::BodyStage, execution::ExecutionStage, sender_recovery::SenderRecoveryStage},
ExecInput, Stage, StageId, Transaction, UnwindInput,

View File

@@ -25,7 +25,7 @@ impl Command {
let mut futs: FuturesUnordered<_> = self
.path
.iter()
.flat_map(|item| reth_cli_utils::find_all_files_with_postfix(item, ".json"))
.flat_map(|item| reth_staged_sync::utils::find_all_files_with_postfix(item, ".json"))
.map(|file| async { (runner::run_test(file.clone()).await, file) })
.collect();