mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
perf(discv4): trigger immediate lookup on first bootnode pong (#22551)
This commit is contained in:
@@ -489,6 +489,8 @@ pub struct Discv4Service {
|
||||
lookup_interval: Interval,
|
||||
/// Used to rotate targets to lookup
|
||||
lookup_rotator: LookupTargetRotator,
|
||||
/// Whether we still need to reset the lookup interval on the first bootnode pong.
|
||||
pending_lookup_reset: bool,
|
||||
/// Interval when to recheck active requests
|
||||
evict_expired_requests_interval: Interval,
|
||||
/// Interval when to resend pings.
|
||||
@@ -598,6 +600,7 @@ impl Discv4Service {
|
||||
ping_interval,
|
||||
evict_expired_requests_interval,
|
||||
lookup_rotator,
|
||||
pending_lookup_reset: config.enable_lookup,
|
||||
resolve_external_ip_interval: config.resolve_external_ip_interval(),
|
||||
config,
|
||||
queued_events: Default::default(),
|
||||
@@ -1291,9 +1294,15 @@ impl Discv4Service {
|
||||
match reason {
|
||||
PingReason::InitialInsert => {
|
||||
self.update_on_pong(node, pong.enr_sq);
|
||||
// Reset the lookup interval so the next poll_tick fires immediately,
|
||||
// rather than waiting the full ~20s for the first lookup.
|
||||
if self.pending_lookup_reset && self.config.bootstrap_nodes.contains(&node) {
|
||||
self.pending_lookup_reset = false;
|
||||
self.lookup_interval.reset();
|
||||
}
|
||||
}
|
||||
PingReason::EstablishBond => {
|
||||
// same as `InitialInsert` which renews the bond if the peer is in the table
|
||||
// no initial lookup needed here since the node was already in the table.
|
||||
self.update_on_pong(node, pong.enr_sq);
|
||||
}
|
||||
PingReason::RePing => {
|
||||
@@ -2955,27 +2964,21 @@ mod tests {
|
||||
let event = poll_fn(|cx| service_1.poll(cx)).await;
|
||||
assert_eq!(event, Discv4Event::Ping);
|
||||
|
||||
// we now wait for PONG
|
||||
let event = poll_fn(|cx| service_2.poll(cx)).await;
|
||||
|
||||
match event {
|
||||
Discv4Event::EnrRequest => {
|
||||
// since we support enr in the ping it may also request the enr
|
||||
// Drain events from service_2 until we see the Pong. Intermediate EnrRequest and
|
||||
// FindNode events are expected: ENR requests come from the ping handshake, and FindNode
|
||||
// arrives because service_1 resets its lookup interval on the first bootnode pong.
|
||||
tokio::time::timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
let event = poll_fn(|cx| service_2.poll(cx)).await;
|
||||
match event {
|
||||
Discv4Event::EnrRequest => {
|
||||
let event = poll_fn(|cx| service_2.poll(cx)).await;
|
||||
assert_eq!(event, Discv4Event::Pong);
|
||||
}
|
||||
Discv4Event::Pong => {}
|
||||
_ => {
|
||||
unreachable!()
|
||||
}
|
||||
Discv4Event::Pong => break,
|
||||
Discv4Event::EnrRequest | Discv4Event::FindNode => {}
|
||||
ev => unreachable!("{ev:?}"),
|
||||
}
|
||||
}
|
||||
Discv4Event::Pong => {}
|
||||
ev => unreachable!("{ev:?}"),
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("timed out waiting for Pong from service_2");
|
||||
|
||||
// endpoint is proven
|
||||
match service_2.kbuckets.entry(&key1) {
|
||||
@@ -3058,4 +3061,109 @@ mod tests {
|
||||
// Assert bootnode did not appear in update stream
|
||||
assert!(bootnode_appeared, "Bootnode should appear in update stream");
|
||||
}
|
||||
|
||||
fn insert_proven_node(service: &mut Discv4Service, record: NodeRecord) {
|
||||
let key = kad_key(record.id);
|
||||
let _ = service.kbuckets.insert_or_update(
|
||||
&key,
|
||||
NodeEntry::new_proven(record),
|
||||
NodeStatus {
|
||||
direction: ConnectionDirection::Incoming,
|
||||
state: ConnectionState::Connected,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
fn insert_initial_ping(service: &mut Discv4Service, record: NodeRecord) -> B256 {
|
||||
let echo_hash = B256::random();
|
||||
service.pending_pings.insert(
|
||||
record.id,
|
||||
PingRequest {
|
||||
sent_at: Instant::now(),
|
||||
node: record,
|
||||
echo_hash,
|
||||
reason: PingReason::InitialInsert,
|
||||
},
|
||||
);
|
||||
echo_hash
|
||||
}
|
||||
|
||||
fn make_pong(service: &Discv4Service, echo_hash: B256) -> Pong {
|
||||
Pong {
|
||||
to: rng_endpoint(&mut rand_08::thread_rng()),
|
||||
echo: echo_hash,
|
||||
expire: service.ping_expiration(),
|
||||
enr_sq: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_reset_on_first_bootnode_pong() {
|
||||
let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
|
||||
let config = Discv4Config::builder().add_boot_node(record).build();
|
||||
let (_discv4, mut service) = create_discv4_with_config(config).await;
|
||||
|
||||
// 1. initial state
|
||||
assert!(service.pending_lookup_reset);
|
||||
|
||||
// 2. setup: proven bootnode + pending InitialInsert ping
|
||||
insert_proven_node(&mut service, record);
|
||||
let echo_hash = insert_initial_ping(&mut service, record);
|
||||
|
||||
// 3. input: pong arrives
|
||||
service.on_pong(make_pong(&service, echo_hash), record.udp_addr(), record.id);
|
||||
|
||||
// 4. flag should be consumed — interval was reset
|
||||
assert!(!service.pending_lookup_reset, "flag should be consumed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_reset_fires_only_once() {
|
||||
let records: Vec<_> = (0..2)
|
||||
.map(|_| NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random()))
|
||||
.collect();
|
||||
let config = Discv4Config::builder().add_boot_nodes(records.clone()).build();
|
||||
let (_discv4, mut service) = create_discv4_with_config(config).await;
|
||||
|
||||
// 1. setup: two proven bootnodes with pending InitialInsert pings
|
||||
for &r in &records {
|
||||
insert_proven_node(&mut service, r);
|
||||
}
|
||||
let hashes: Vec<_> =
|
||||
records.iter().map(|r| insert_initial_ping(&mut service, *r)).collect();
|
||||
|
||||
// 2. first pong -> consumes the flag (resets the interval)
|
||||
service.on_pong(make_pong(&service, hashes[0]), records[0].udp_addr(), records[0].id);
|
||||
assert!(!service.pending_lookup_reset);
|
||||
|
||||
// 3. second pong -> flag already consumed, no second reset
|
||||
service.on_pong(make_pong(&service, hashes[1]), records[1].udp_addr(), records[1].id);
|
||||
assert!(!service.pending_lookup_reset);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_reset_not_triggered_by_non_bootnode() {
|
||||
let bootnode = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
|
||||
let config = Discv4Config::builder().add_boot_node(bootnode).build();
|
||||
let (_discv4, mut service) = create_discv4_with_config(config).await;
|
||||
|
||||
assert!(service.pending_lookup_reset);
|
||||
|
||||
// a non-bootnode pong should not consume the flag
|
||||
let stranger = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
|
||||
insert_proven_node(&mut service, stranger);
|
||||
let echo_hash = insert_initial_ping(&mut service, stranger);
|
||||
service.on_pong(make_pong(&service, echo_hash), stranger.udp_addr(), stranger.id);
|
||||
|
||||
assert!(service.pending_lookup_reset, "flag should not be consumed by non-bootnode");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_reset_disabled_when_lookup_disabled() {
|
||||
let config = Discv4Config::builder().enable_lookup(false).build();
|
||||
let (_discv4, service) = create_discv4_with_config(config).await;
|
||||
|
||||
// flag should be false when lookups are disabled
|
||||
assert!(!service.pending_lookup_reset);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user