From 5de04430fd34c68452ef044dd747cd5d3a49acaf Mon Sep 17 00:00:00 2001 From: prames <134806363+0xprames@users.noreply.github.com> Date: Sat, 7 Oct 2023 17:29:32 +0530 Subject: [PATCH] fix(disc) Send find_node request only after verifiying our endpoint proof (#4909) --- crates/net/discv4/src/lib.rs | 132 ++++++++++++++++++++++++++++++----- 1 file changed, 113 insertions(+), 19 deletions(-) diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index 17bfe2d7e6..d889b900cf 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -432,6 +432,11 @@ pub struct Discv4Service { queued_pings: VecDeque<(NodeRecord, PingReason)>, /// Currently active pings to specific nodes. pending_pings: HashMap, + /// Currently active endpoint proof verification lookups to specific nodes. + /// + /// Entries here means we've proven the peer's endpoint but haven't completed our end of the + /// endpoint proof + pending_lookup: HashMap, /// Currently active FindNode requests pending_find_nodes: HashMap, /// Currently active ENR requests @@ -546,6 +551,7 @@ impl Discv4Service { egress: egress_tx, queued_pings: Default::default(), pending_pings: Default::default(), + pending_lookup: Default::default(), pending_find_nodes: Default::default(), pending_enr_requests: Default::default(), commands_rx, @@ -988,10 +994,17 @@ impl Discv4Service { // the ping interval let mut is_new_insert = false; let mut needs_bond = false; + let mut is_proven = false; let old_enr = match self.kbuckets.entry(&key) { - kbucket::Entry::Present(mut entry, _) => entry.value_mut().update_with_enr(ping.enr_sq), - kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(ping.enr_sq), + kbucket::Entry::Present(mut entry, _) => { + is_proven = entry.value().has_endpoint_proof; + entry.value_mut().update_with_enr(ping.enr_sq) + } + kbucket::Entry::Pending(mut entry, _) => { + is_proven = entry.value().has_endpoint_proof; + entry.value().update_with_enr(ping.enr_sq) + } kbucket::Entry::Absent(entry) => { let mut node = NodeEntry::new(record); node.last_enr_seq = ping.enr_sq; @@ -1044,6 +1057,19 @@ impl Discv4Service { self.try_ping(record, PingReason::InitialInsert); } else if needs_bond { self.try_ping(record, PingReason::EstablishBond); + } else if is_proven { + // if node has been proven, this means we've recieved a pong and verified its endpoint + // proof. We've also sent a pong above to verify our endpoint proof, so we can now + // send our find_nodes request if PingReason::Lookup + if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) { + if self.pending_find_nodes.contains_key(&record.id) { + // there's already another pending request, unmark it so the next round can + // try to send it + ctx.unmark_queried(record.id); + } else { + self.find_node(&record, ctx); + } + } } else { // Request ENR if included in the ping match (ping.enr_sq, old_enr) { @@ -1156,13 +1182,11 @@ impl Discv4Service { } PingReason::Lookup(node, ctx) => { self.update_on_pong(node, pong.enr_sq); - if self.pending_find_nodes.contains_key(&node.id) { - // there's already another pending request, unmark it so the next round can try - // to send it - ctx.unmark_queried(node.id); - } else { - self.find_node(&node, ctx); - } + // insert node and assoc. lookup_context into the pending_lookup table to complete + // our side of the endpoint proof verification. + // Start the lookup timer here - and evict accordingly. Note that this is a separate + // timer than the ping_request timer. + self.pending_lookup.insert(node.id, (Instant::now(), ctx)); } } } @@ -1279,7 +1303,7 @@ impl Discv4Service { }; // This is the recursive lookup step where we initiate new FindNode requests for new nodes - // that where discovered. + // that were discovered. for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) { // prevent banned peers from being added to the context if self.config.ban_list.is_banned(&node.id, &node.address) { @@ -1299,7 +1323,8 @@ impl Discv4Service { match self.kbuckets.entry(&key) { BucketEntry::Absent(entry) => { // the node's endpoint is not proven yet, so we need to ping it first, on - // success, it will initiate a `FindNode` request. + // success, we will add the node to the pending_lookup table, and wait to send + // back a Pong before initiating a FindNode request. // In order to prevent that this node is selected again on subsequent responses, // while the ping is still active, we always mark it as queried. ctx.mark_queried(closest.id); @@ -1365,6 +1390,21 @@ impl Discv4Service { self.remove_node(node_id); } + let mut failed_lookups = Vec::new(); + self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| { + if now.duration_since(*lookup_sent_at) > self.config.ping_expiration { + failed_lookups.push(*node_id); + return false + } + true + }); + debug!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup"); + + // remove nodes that failed the e2e lookup process, so we can restart it + for node_id in failed_lookups { + self.remove_node(node_id); + } + self.evict_failed_neighbours(now); } @@ -1802,7 +1842,7 @@ impl LookupTargetRotator { /// Tracks lookups across multiple `FindNode` requests. /// /// If this type is dropped by all -#[derive(Clone)] +#[derive(Clone, Debug)] struct LookupContext { inner: Rc, } @@ -1905,7 +1945,7 @@ impl LookupContext { // guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of // [`LookupContext`]. unsafe impl Send for LookupContext {} - +#[derive(Debug)] struct LookupContextInner { /// The target to lookup. target: discv5::Key, @@ -2272,7 +2312,7 @@ mod tests { reth_tracing::init_test_tracing(); let config = Discv4Config::builder().build(); - let (_discv4, mut service) = create_discv4_with_config(config).await; + let (_discv4, mut service) = create_discv4_with_config(config.clone()).await; let id = PeerId::random(); let key = kad_key(id); @@ -2296,9 +2336,65 @@ mod tests { Poll::Ready(()) }) - .await + .await; } + #[tokio::test] + async fn test_on_neighbours_recursive_lookup() { + reth_tracing::init_test_tracing(); + + let config = Discv4Config::builder().build(); + let (_discv4, mut service) = create_discv4_with_config(config.clone()).await; + let (_discv4, mut service2) = create_discv4_with_config(config).await; + + let id = PeerId::random(); + let key = kad_key(id); + let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id); + + let _ = service.kbuckets.insert_or_update( + &key, + NodeEntry::new_proven(record), + NodeStatus { + direction: ConnectionDirection::Incoming, + state: ConnectionState::Connected, + }, + ); + // Needed in this test to populate self.pending_find_nodes for as a prereq to a valid + // on_neighbours request + service.lookup_self(); + assert_eq!(service.pending_find_nodes.len(), 1); + + poll_fn(|cx| { + let _ = service.poll(cx); + assert_eq!(service.pending_find_nodes.len(), 1); + + Poll::Ready(()) + }) + .await; + + let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() + + 10000000000000; + let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry }; + service.on_neighbours(msg, record.tcp_addr(), id); + // wait for the processed ping + let event = poll_fn(|cx| service2.poll(cx)).await; + assert_eq!(event, Discv4Event::Ping); + // assert that no find_node req has been added here on top of the initial one, since both + // sides of the endpoint proof is not completed here + assert_eq!(service.pending_find_nodes.len(), 1); + // we now wait for PONG + let event = poll_fn(|cx| service.poll(cx)).await; + assert_eq!(event, Discv4Event::Pong); + // Ideally we want to assert against service.pending_lookup.len() here - but because the + // service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets + // drained almost immediately - and no way to grab the handle to its intermediary state here + // :( + let event = poll_fn(|cx| service.poll(cx)).await; + assert_eq!(event, Discv4Event::Ping); + // assert that we've added the find_node req here after both sides of the endpoint proof is + // done + assert_eq!(service.pending_find_nodes.len(), 2); + } #[tokio::test] async fn test_no_local_in_closest() { reth_tracing::init_test_tracing(); @@ -2463,14 +2559,12 @@ mod tests { // we now wait for PONG let event = poll_fn(|cx| service_2.poll(cx)).await; - // Since the endpoint was already proven from 1 POV it can already send a FindNode so the - // next event is either the PONG or Find Node match event { - Discv4Event::FindNode | Discv4Event::EnrRequest => { + Discv4Event::EnrRequest => { // since we support enr in the ping it may also request the enr let event = poll_fn(|cx| service_2.poll(cx)).await; match event { - Discv4Event::FindNode | Discv4Event::EnrRequest => { + Discv4Event::EnrRequest => { let event = poll_fn(|cx| service_2.poll(cx)).await; assert_eq!(event, Discv4Event::Pong); }