From a590ed7ce5af3bbf6feb3cd5a626334311e341d7 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 2 May 2024 22:34:13 +0200 Subject: [PATCH] chore(discv5): pub methods (#8057) --- crates/net/discv5/src/config.rs | 2 +- crates/net/discv5/src/filter.rs | 22 +- crates/net/discv5/src/lib.rs | 343 ++++++++++++++++---------------- 3 files changed, 182 insertions(+), 185 deletions(-) diff --git a/crates/net/discv5/src/config.rs b/crates/net/discv5/src/config.rs index 3a506902ed..05c2863c85 100644 --- a/crates/net/discv5/src/config.rs +++ b/crates/net/discv5/src/config.rs @@ -143,7 +143,7 @@ impl ConfigBuilder { } /// Sets the tcp port to advertise in the local [`Enr`](discv5::enr::Enr). - fn tcp_port(mut self, port: u16) -> Self { + pub fn tcp_port(mut self, port: u16) -> Self { self.tcp_port = port; self } diff --git a/crates/net/discv5/src/filter.rs b/crates/net/discv5/src/filter.rs index 2e20e2fbda..d62a7584a6 100644 --- a/crates/net/discv5/src/filter.rs +++ b/crates/net/discv5/src/filter.rs @@ -35,14 +35,12 @@ impl MustIncludeKey { /// Returns [`FilterOutcome::Ok`] if [`Enr`](discv5::Enr) contains the configured kv-pair key. pub fn filter(&self, enr: &discv5::Enr) -> FilterOutcome { if enr.get_raw_rlp(self.key).is_none() { - return FilterOutcome::Ignore { reason: self.ignore_reason() } + return FilterOutcome::Ignore { + reason: format!("{} fork required", String::from_utf8_lossy(self.key)), + } } FilterOutcome::Ok } - - fn ignore_reason(&self) -> String { - format!("{} fork required", String::from_utf8_lossy(self.key)) - } } /// Filter requiring that peers not advertise kv-pairs using certain keys, e.g. b"eth2". @@ -69,20 +67,18 @@ impl MustNotIncludeKeys { pub fn filter(&self, enr: &discv5::Enr) -> FilterOutcome { for key in self.keys.iter() { if matches!(key.filter(enr), FilterOutcome::Ok) { - return FilterOutcome::Ignore { reason: self.ignore_reason() } + return FilterOutcome::Ignore { + reason: format!( + "{} forks not allowed", + self.keys.iter().map(|key| String::from_utf8_lossy(key.key)).format(",") + ), + } } } FilterOutcome::Ok } - fn ignore_reason(&self) -> String { - format!( - "{} forks not allowed", - self.keys.iter().map(|key| String::from_utf8_lossy(key.key)).format(",") - ) - } - /// Adds a key that must not be present for any kv-pair in a node record. pub fn add_disallowed_keys(&mut self, keys: &[&'static [u8]]) { for key in keys { diff --git a/crates/net/discv5/src/lib.rs b/crates/net/discv5/src/lib.rs index b8b2eab242..8895f8a163 100644 --- a/crates/net/discv5/src/lib.rs +++ b/crates/net/discv5/src/lib.rs @@ -161,7 +161,7 @@ impl Discv5 { // // 1. make local enr from listen config // - let (enr, bc_enr, fork_key, ip_mode) = Self::build_local_enr(sk, &discv5_config); + let (enr, bc_enr, fork_key, ip_mode) = build_local_enr(sk, &discv5_config); trace!(target: "net::discv5", ?enr, @@ -197,14 +197,14 @@ impl Discv5 { // // 3. add boot nodes // - Self::bootstrap(bootstrap_nodes, &discv5).await?; + bootstrap(bootstrap_nodes, &discv5).await?; let metrics = Discv5Metrics::default(); // // 4. start bg kbuckets maintenance // - Self::spawn_populate_kbuckets_bg( + spawn_populate_kbuckets_bg( lookup_interval, bootstrap_lookup_interval, bootstrap_lookup_countdown, @@ -219,169 +219,6 @@ impl Discv5 { )) } - fn build_local_enr( - sk: &SecretKey, - config: &Config, - ) -> (Enr, NodeRecord, Option<&'static [u8]>, IpMode) { - let mut builder = discv5::enr::Enr::builder(); - - let Config { discv5_config, fork, tcp_port, other_enr_kv_pairs, .. } = config; - - let (ip_mode, socket) = match discv5_config.listen_config { - ListenConfig::Ipv4 { ip, port } => { - if ip != Ipv4Addr::UNSPECIFIED { - builder.ip4(ip); - } - builder.udp4(port); - builder.tcp4(*tcp_port); - - (IpMode::Ip4, (ip, port).into()) - } - ListenConfig::Ipv6 { ip, port } => { - if ip != Ipv6Addr::UNSPECIFIED { - builder.ip6(ip); - } - builder.udp6(port); - builder.tcp6(*tcp_port); - - (IpMode::Ip6, (ip, port).into()) - } - ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port } => { - if ipv4 != Ipv4Addr::UNSPECIFIED { - builder.ip4(ipv4); - } - builder.udp4(ipv4_port); - builder.tcp4(*tcp_port); - - if ipv6 != Ipv6Addr::UNSPECIFIED { - builder.ip6(ipv6); - } - builder.udp6(ipv6_port); - - (IpMode::DualStack, (ipv6, ipv6_port).into()) - } - }; - - // identifies which network node is on - let network_stack_id = fork.as_ref().map(|(network_stack_id, fork_value)| { - builder.add_value_rlp(network_stack_id, alloy_rlp::encode(fork_value).into()); - *network_stack_id - }); - - // add other data - for (key, value) in other_enr_kv_pairs { - builder.add_value_rlp(key, value.clone().into()); - } - - // enr v4 not to get confused with discv4, independent versioning enr and - // discovery - let enr = builder.build(sk).expect("should build enr v4"); - - // backwards compatible enr - let bc_enr = NodeRecord::from_secret_key(socket, sk); - - (enr, bc_enr, network_stack_id, ip_mode) - } - - /// Bootstraps underlying [`discv5::Discv5`] node with configured peers. - async fn bootstrap( - bootstrap_nodes: HashSet, - discv5: &Arc, - ) -> Result<(), Error> { - trace!(target: "net::discv5", - ?bootstrap_nodes, - "adding bootstrap nodes .." - ); - - let mut enr_requests = vec![]; - for node in bootstrap_nodes { - match node { - BootNode::Enr(node) => { - if let Err(err) = discv5.add_enr(node) { - return Err(Error::AddNodeFailed(err)) - } - } - BootNode::Enode(enode) => { - let discv5 = discv5.clone(); - enr_requests.push(async move { - if let Err(err) = discv5.request_enr(enode.to_string()).await { - debug!(target: "net::discv5", - ?enode, - %err, - "failed adding boot node" - ); - } - }) - } - } - } - - // If a session is established, the ENR is added straight away to discv5 kbuckets - Ok(_ = join_all(enr_requests).await) - } - - /// Backgrounds regular look up queries, in order to keep kbuckets populated. - fn spawn_populate_kbuckets_bg( - lookup_interval: u64, - bootstrap_lookup_interval: u64, - bootstrap_lookup_countdown: u64, - metrics: Discv5Metrics, - discv5: Arc, - ) { - task::spawn({ - let local_node_id = discv5.local_enr().node_id(); - let lookup_interval = Duration::from_secs(lookup_interval); - let metrics = metrics.discovered_peers; - let mut kbucket_index = MAX_KBUCKET_INDEX; - let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval); - // todo: graceful shutdown - - async move { - // make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest - // log2distance from local node - for i in (0..bootstrap_lookup_countdown).rev() { - let target = discv5::enr::NodeId::random(); - - trace!(target: "net::discv5", - %target, - bootstrap_boost_runs_countdown=i, - lookup_interval=format!("{:#?}", pulse_lookup_interval), - "starting bootstrap boost lookup query" - ); - - lookup(target, &discv5, &metrics).await; - - tokio::time::sleep(pulse_lookup_interval).await; - } - - // initiate regular lookups to populate kbuckets - loop { - // make sure node is connected to each subtree in the network by target - // selection (ref kademlia) - let target = get_lookup_target(kbucket_index, local_node_id); - - trace!(target: "net::discv5", - %target, - lookup_interval=format!("{:#?}", lookup_interval), - "starting periodic lookup query" - ); - - lookup(target, &discv5, &metrics).await; - - if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX { - // try to populate bucket one step closer - kbucket_index -= 1 - } else { - // start over with bucket furthest away - kbucket_index = MAX_KBUCKET_INDEX - } - - tokio::time::sleep(lookup_interval).await; - } - } - }); - } - /// Process an event from the underlying [`discv5::Discv5`] node. pub fn on_discv5_update(&mut self, update: discv5::Event) -> Option { match update { @@ -416,7 +253,7 @@ impl Discv5 { } /// Processes a discovered peer. Returns `true` if peer is added to - fn on_discovered_peer( + pub fn on_discovered_peer( &mut self, enr: &discv5::Enr, socket: SocketAddr, @@ -467,7 +304,7 @@ impl Discv5 { /// /// Note: [`discv5::Discv5`] won't initiate a session with any peer with a malformed node /// record, that advertises a reserved IP address on a WAN network. - fn try_into_reachable( + pub fn try_into_reachable( &self, enr: &discv5::Enr, socket: SocketAddr, @@ -490,13 +327,13 @@ impl Discv5 { /// Applies filtering rules on an ENR. Returns [`Ok`](FilterOutcome::Ok) if peer should be /// passed up to app, and [`Ignore`](FilterOutcome::Ignore) if peer should instead be dropped. - fn filter_discovered_peer(&self, enr: &discv5::Enr) -> FilterOutcome { + pub fn filter_discovered_peer(&self, enr: &discv5::Enr) -> FilterOutcome { self.discovered_peer_filter.filter(enr) } /// Returns the [`ForkId`] of the given [`Enr`](discv5::Enr) w.r.t. the local node's network /// stack, if field is set. - fn get_fork_id( + pub fn get_fork_id( &self, enr: &discv5::enr::Enr, ) -> Result { @@ -551,6 +388,170 @@ pub struct DiscoveredPeer { pub fork_id: Option, } +/// Builds the local ENR with the supplied key. +pub fn build_local_enr( + sk: &SecretKey, + config: &Config, +) -> (Enr, NodeRecord, Option<&'static [u8]>, IpMode) { + let mut builder = discv5::enr::Enr::builder(); + + let Config { discv5_config, fork, tcp_port, other_enr_kv_pairs, .. } = config; + + let (ip_mode, socket) = match discv5_config.listen_config { + ListenConfig::Ipv4 { ip, port } => { + if ip != Ipv4Addr::UNSPECIFIED { + builder.ip4(ip); + } + builder.udp4(port); + builder.tcp4(*tcp_port); + + (IpMode::Ip4, (ip, port).into()) + } + ListenConfig::Ipv6 { ip, port } => { + if ip != Ipv6Addr::UNSPECIFIED { + builder.ip6(ip); + } + builder.udp6(port); + builder.tcp6(*tcp_port); + + (IpMode::Ip6, (ip, port).into()) + } + ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port } => { + if ipv4 != Ipv4Addr::UNSPECIFIED { + builder.ip4(ipv4); + } + builder.udp4(ipv4_port); + builder.tcp4(*tcp_port); + + if ipv6 != Ipv6Addr::UNSPECIFIED { + builder.ip6(ipv6); + } + builder.udp6(ipv6_port); + + (IpMode::DualStack, (ipv6, ipv6_port).into()) + } + }; + + // identifies which network node is on + let network_stack_id = fork.as_ref().map(|(network_stack_id, fork_value)| { + builder.add_value_rlp(network_stack_id, alloy_rlp::encode(fork_value).into()); + *network_stack_id + }); + + // add other data + for (key, value) in other_enr_kv_pairs { + builder.add_value_rlp(key, value.clone().into()); + } + + // enr v4 not to get confused with discv4, independent versioning enr and + // discovery + let enr = builder.build(sk).expect("should build enr v4"); + + // backwards compatible enr + let bc_enr = NodeRecord::from_secret_key(socket, sk); + + (enr, bc_enr, network_stack_id, ip_mode) +} + +/// Bootstraps underlying [`discv5::Discv5`] node with configured peers. +pub async fn bootstrap( + bootstrap_nodes: HashSet, + discv5: &Arc, +) -> Result<(), Error> { + trace!(target: "net::discv5", + ?bootstrap_nodes, + "adding bootstrap nodes .." + ); + + let mut enr_requests = vec![]; + for node in bootstrap_nodes { + match node { + BootNode::Enr(node) => { + if let Err(err) = discv5.add_enr(node) { + return Err(Error::AddNodeFailed(err)) + } + } + BootNode::Enode(enode) => { + let discv5 = discv5.clone(); + enr_requests.push(async move { + if let Err(err) = discv5.request_enr(enode.to_string()).await { + debug!(target: "net::discv5", + ?enode, + %err, + "failed adding boot node" + ); + } + }) + } + } + } + + // If a session is established, the ENR is added straight away to discv5 kbuckets + Ok(_ = join_all(enr_requests).await) +} + +/// Backgrounds regular look up queries, in order to keep kbuckets populated. +pub fn spawn_populate_kbuckets_bg( + lookup_interval: u64, + bootstrap_lookup_interval: u64, + bootstrap_lookup_countdown: u64, + metrics: Discv5Metrics, + discv5: Arc, +) { + task::spawn({ + let local_node_id = discv5.local_enr().node_id(); + let lookup_interval = Duration::from_secs(lookup_interval); + let metrics = metrics.discovered_peers; + let mut kbucket_index = MAX_KBUCKET_INDEX; + let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval); + // todo: graceful shutdown + + async move { + // make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest + // log2distance from local node + for i in (0..bootstrap_lookup_countdown).rev() { + let target = discv5::enr::NodeId::random(); + + trace!(target: "net::discv5", + %target, + bootstrap_boost_runs_countdown=i, + lookup_interval=format!("{:#?}", pulse_lookup_interval), + "starting bootstrap boost lookup query" + ); + + lookup(target, &discv5, &metrics).await; + + tokio::time::sleep(pulse_lookup_interval).await; + } + + // initiate regular lookups to populate kbuckets + loop { + // make sure node is connected to each subtree in the network by target + // selection (ref kademlia) + let target = get_lookup_target(kbucket_index, local_node_id); + + trace!(target: "net::discv5", + %target, + lookup_interval=format!("{:#?}", lookup_interval), + "starting periodic lookup query" + ); + + lookup(target, &discv5, &metrics).await; + + if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX { + // try to populate bucket one step closer + kbucket_index -= 1 + } else { + // start over with bucket furthest away + kbucket_index = MAX_KBUCKET_INDEX + } + + tokio::time::sleep(lookup_interval).await; + } + } + }); +} + /// Gets the next lookup target, based on which bucket is currently being targeted. pub fn get_lookup_target( kbucket_index: usize, @@ -846,7 +847,7 @@ mod tests { let config = Config::builder(TCP_PORT).fork(NetworkStackId::ETH, fork_id).build(); let sk = SecretKey::new(&mut thread_rng()); - let (enr, _, _, _) = Discv5::build_local_enr(&sk, &config); + let (enr, _, _, _) = build_local_enr(&sk, &config); let decoded_fork_id = enr .get_decodable::(NetworkStackId::ETH)