diff --git a/Cargo.lock b/Cargo.lock index a1ee82ef3f..05a2e8cfd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4472,6 +4472,7 @@ dependencies = [ "reth-transaction-pool", "secp256k1 0.24.3", "serde", + "serde_json", "serial_test", "tempfile", "thiserror", diff --git a/bin/reth/src/dirs.rs b/bin/reth/src/dirs.rs index 53757d3256..bac618cbc9 100644 --- a/bin/reth/src/dirs.rs +++ b/bin/reth/src/dirs.rs @@ -68,6 +68,19 @@ impl XdgPath for ConfigPath { } } +/// Returns the path to the default reth known peers file. +/// +/// Refer to [dirs_next::config_dir] for cross-platform behavior. +#[derive(Default, Debug, Clone)] +#[non_exhaustive] +pub struct KnownPeersPath; + +impl XdgPath for KnownPeersPath { + fn resolve() -> Option { + database_path().map(|p| p.join("known-peers.json")) + } +} + /// Returns the path to the reth logs directory. /// /// Refer to [dirs_next::cache_dir] for cross-platform behavior. diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 1bfaf37412..fbf0e3567b 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -15,6 +15,7 @@ pub mod prometheus_exporter; pub mod stage; pub mod test_eth_chain; pub mod test_vectors; +use dirs::{KnownPeersPath, PlatformPath}; pub use reth_staged_sync::utils; use clap::Args; @@ -42,4 +43,14 @@ struct NetworkOpts { /// Will fall back to a network-specific default if not specified. #[arg(long, value_delimiter = ',')] bootnodes: Option>, + + /// The path to the known peers file. Connected peers are + /// dumped to this file on node shutdown, and read on startup. + /// Cannot be used with --no-persist-peers + #[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)] + peers_file: PlatformPath, + + /// Do not persist peers. Cannot be used with --peers-file + #[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")] + no_persist_peers: bool, } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 2f15e709a3..aa30d776f2 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -26,8 +26,7 @@ use reth_stages::{ prelude::*, stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage}, }; -use std::{net::SocketAddr, sync::Arc, time::Duration}; -use tokio::select; +use std::{io, net::SocketAddr, path::Path, sync::Arc, time::Duration}; use tracing::{debug, info, warn}; /// Start the node @@ -116,6 +115,7 @@ impl Command { info!(target: "reth::cli", "Connecting to P2P network"); let netconf = self.load_network_config(&config, &db); let network = netconf.start_network().await?; + info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); // TODO(mattsse): cleanup, add cli args @@ -139,7 +139,14 @@ impl Command { // Run pipeline info!(target: "reth::cli", "Starting sync pipeline"); - pipeline.run(db.clone()).await?; + tokio::select! { + res = pipeline.run(db.clone()) => res?, + _ = tokio::signal::ctrl_c() => {}, + }; + + if !self.network.no_persist_peers { + dump_peers(self.network.peers_file.as_ref(), network).await?; + } info!(target: "reth::cli", "Finishing up"); Ok(()) @@ -194,12 +201,14 @@ impl Command { config: &Config, db: &Arc>, ) -> NetworkConfig>> { + let peers_file = (!self.network.no_persist_peers).then_some(&self.network.peers_file); config.network_config( db.clone(), self.chain.clone(), self.network.disable_discovery, self.network.bootnodes.clone(), self.nat, + peers_file.map(|f| f.as_ref().to_path_buf()), ) } @@ -285,6 +294,15 @@ impl Command { } } +/// Dumps peers to `file_path` for persistence. +async fn dump_peers(file_path: &Path, network: NetworkHandle) -> Result<(), io::Error> { + info!(target : "net::peers", file = %file_path.display(), "Saving current peers"); + let known_peers = network.peers_handle().all_peers().await; + + tokio::fs::write(file_path, serde_json::to_string_pretty(&known_peers)?).await?; + Ok(()) +} + /// The current high-level state of the node. #[derive(Default)] struct NodeState { @@ -324,7 +342,7 @@ async fn handle_events(mut events: impl Stream + Unpin) { let mut interval = tokio::time::interval(Duration::from_secs(30)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { - select! { + tokio::select! { Some(event) = events.next() => { match event { NodeEvent::Network(NetworkEvent::SessionEstablished { peer_id, status, .. }) => { diff --git a/bin/reth/src/p2p/mod.rs b/bin/reth/src/p2p/mod.rs index 29dc040a94..1dedc3c134 100644 --- a/bin/reth/src/p2p/mod.rs +++ b/bin/reth/src/p2p/mod.rs @@ -98,7 +98,14 @@ impl Command { config.peers.connect_trusted_nodes_only = self.trusted_only; let network = config - .network_config(noop_db, self.chain.clone(), self.disable_discovery, None, self.nat) + .network_config( + noop_db, + self.chain.clone(), + self.disable_discovery, + None, + self.nat, + None, + ) .start_network() .await?; diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index 0ebd4cd087..465f71fa0f 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -143,6 +143,7 @@ impl Command { self.network.disable_discovery, None, self.nat, + None, ) .start_network() .await?; diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 91fd59cd7d..37b3d7ff3c 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -42,6 +42,7 @@ tokio-util = { version = "0.7", features = ["codec"] } # io serde = { version = "1.0", optional = true } humantime-serde = { version = "1.1", optional = true } +serde_json = { version = "1.0", optional = true } # metrics metrics = "0.20.1" @@ -96,5 +97,5 @@ serial_test = "0.10" [features] default = ["serde"] -serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr?/serde"] +serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr?/serde", "dep:serde_json"] test-utils = ["reth-provider/test-utils", "dep:enr", "dep:ethers-core", "dep:tempfile"] diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 40f35ad6cf..c254a47698 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -14,8 +14,9 @@ use reth_primitives::{ForkId, NodeRecord, PeerId}; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, fmt::Display, - io, + io::{self, ErrorKind}, net::{IpAddr, SocketAddr}, + path::Path, task::{Context, Poll}, time::Duration, }; @@ -25,7 +26,7 @@ use tokio::{ time::{Instant, Interval}, }; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{debug, trace}; +use tracing::{debug, info, trace}; /// A communication channel to the [`PeersManager`] to apply manual changes to the peer set. #[derive(Clone, Debug)] @@ -63,6 +64,14 @@ impl PeersHandle { rx.await.unwrap_or(None) } + + /// Returns all peers in the peerset. + pub async fn all_peers(&self) -> Vec { + let (tx, rx) = oneshot::channel(); + self.send(PeerCommand::GetPeers(tx)); + + rx.await.unwrap_or_default() + } } /// Maintains the state of _all_ the peers known to the network. @@ -111,6 +120,7 @@ impl PeersManager { backoff_durations, trusted_nodes, connect_trusted_nodes_only, + basic_nodes, .. } = config; let (manager_tx, handle_rx) = mpsc::unbounded_channel(); @@ -119,12 +129,16 @@ impl PeersManager { // We use half of the interval to decrease the max duration to `150%` in worst case let unban_interval = ban_duration.min(backoff_durations.low) / 2; - let mut peers = HashMap::with_capacity(trusted_nodes.len()); + let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len()); for NodeRecord { address, tcp_port, udp_port: _, id } in trusted_nodes { peers.entry(id).or_insert_with(|| Peer::trusted(SocketAddr::from((address, tcp_port)))); } + for NodeRecord { address, tcp_port, udp_port: _, id } in basic_nodes { + peers.entry(id).or_insert_with(|| Peer::new(SocketAddr::from((address, tcp_port)))); + } + Self { peers, manager_tx, @@ -623,6 +637,11 @@ impl PeersManager { PeerCommand::GetPeer(peer, tx) => { let _ = tx.send(self.peers.get(&peer).cloned()); } + PeerCommand::GetPeers(tx) => { + let _ = tx.send( + self.peers.iter().map(|(k, v)| NodeRecord::new(v.addr, *k)).collect(), + ); + } } } @@ -876,6 +895,8 @@ pub(crate) enum PeerCommand { ReputationChange(PeerId, ReputationChangeKind), /// Get information about a peer GetPeer(PeerId, oneshot::Sender>), + /// Get node information on all peers + GetPeers(oneshot::Sender>), } /// Actions the peer manager can trigger. @@ -921,6 +942,9 @@ pub struct PeersConfig { pub trusted_nodes: HashSet, /// Connect to trusted nodes only? pub connect_trusted_nodes_only: bool, + /// Basic nodes to connect to. + #[cfg_attr(feature = "serde", serde(skip))] + pub basic_nodes: HashSet, /// How long to ban bad peers. #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] pub ban_duration: Duration, @@ -948,6 +972,7 @@ impl Default for PeersConfig { backoff_durations: Default::default(), trusted_nodes: Default::default(), connect_trusted_nodes_only: false, + basic_nodes: Default::default(), } } } @@ -992,6 +1017,30 @@ impl PeersConfig { self.connect_trusted_nodes_only = trusted_only; self } + + /// Nodes available at launch. + pub fn with_basic_nodes(mut self, nodes: HashSet) -> Self { + self.basic_nodes = nodes; + self + } + + /// Read from file nodes available at launch. Ignored if None. + pub fn with_basic_nodes_from_file( + self, + optional_file: Option>, + ) -> Result { + let Some(file_path) = optional_file else { + return Ok(self) + }; + let reader = match std::fs::File::open(file_path.as_ref()) { + Ok(file) => std::io::BufReader::new(file), + Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self), + Err(e) => Err(e)?, + }; + info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers"); + let nodes: HashSet = serde_json::from_reader(reader)?; + Ok(self.with_basic_nodes(nodes)) + } } /// The durations to use when a backoff should be applied to a peer. diff --git a/crates/net/network/src/peers/mod.rs b/crates/net/network/src/peers/mod.rs index 698d643e1a..80f58b01b1 100644 --- a/crates/net/network/src/peers/mod.rs +++ b/crates/net/network/src/peers/mod.rs @@ -4,7 +4,7 @@ mod manager; mod reputation; pub(crate) use manager::{InboundConnectionError, PeerAction, PeersManager}; -pub use manager::{PeersConfig, PeersHandle}; +pub use manager::{Peer, PeersConfig, PeersHandle}; pub use reputation::ReputationChangeWeights; pub use reth_network_api::PeerKind; diff --git a/crates/staged-sync/src/config.rs b/crates/staged-sync/src/config.rs index 5d826f7d0e..c74d38a4de 100644 --- a/crates/staged-sync/src/config.rs +++ b/crates/staged-sync/src/config.rs @@ -1,5 +1,5 @@ //! Configuration files. -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; use reth_db::database::Database; use reth_discv4::Discv4Config; @@ -31,10 +31,13 @@ impl Config { disable_discovery: bool, bootnodes: Option>, nat_resolution_method: reth_net_nat::NatResolver, + peers_file: Option, ) -> NetworkConfig> { - let peer_config = reth_network::PeersConfig::default() - .with_trusted_nodes(self.peers.trusted_nodes.clone()) - .with_connect_trusted_nodes_only(self.peers.connect_trusted_nodes_only); + let peer_config = self + .peers + .clone() + .with_basic_nodes_from_file(peers_file) + .unwrap_or_else(|_| self.peers.clone()); let discv4 = Discv4Config::builder().external_ip_resolver(Some(nat_resolution_method)).clone(); NetworkConfigBuilder::new(rng_secret_key())