mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 09:08:05 -05:00
perf(net): emit all discovred nodes (#1414)
This commit is contained in:
@@ -769,7 +769,7 @@ impl Discv4Service {
|
||||
entry.value_mut().update_with_enr(last_enr_seq);
|
||||
if !old_status.is_connected() {
|
||||
let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
|
||||
trace!(target : "discv4", ?record, "added after successful endpoint proof");
|
||||
debug!(target : "discv4", ?record, "added after successful endpoint proof");
|
||||
self.notify(DiscoveryUpdate::Added(record));
|
||||
|
||||
if has_enr_seq {
|
||||
@@ -783,7 +783,7 @@ impl Discv4Service {
|
||||
if !status.is_connected() {
|
||||
status.state = ConnectionState::Connected;
|
||||
let _ = entry.update(status);
|
||||
trace!(target : "discv4", ?record, "added after successful endpoint proof");
|
||||
debug!(target : "discv4", ?record, "added after successful endpoint proof");
|
||||
self.notify(DiscoveryUpdate::Added(record));
|
||||
|
||||
if has_enr_seq {
|
||||
@@ -884,7 +884,16 @@ impl Discv4Service {
|
||||
// mark as new insert if insert was successful
|
||||
is_new_insert = true;
|
||||
}
|
||||
_ => {
|
||||
BucketInsertResult::Full => {
|
||||
// we received a ping but the corresponding bucket for the peer is already
|
||||
// full, we can't add any additional peers to that bucket, but we still want
|
||||
// to emit an event that we discovered the node
|
||||
debug!(target : "discv4", ?record, "discovered new record but bucket is full");
|
||||
self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record))
|
||||
}
|
||||
BucketInsertResult::FailedFilter |
|
||||
BucketInsertResult::TooManyIncoming |
|
||||
BucketInsertResult::NodeExists => {
|
||||
// insert unsuccessful but we still want to send the pong
|
||||
}
|
||||
}
|
||||
@@ -896,14 +905,14 @@ impl Discv4Service {
|
||||
|
||||
// send the pong first, but the PONG and optionally PING don't need to be send in a
|
||||
// particular order
|
||||
let msg = Message::Pong(Pong {
|
||||
let pong = Message::Pong(Pong {
|
||||
// we use the actual address of the peer
|
||||
to: record.into(),
|
||||
echo: hash,
|
||||
expire: ping.expire,
|
||||
enr_sq: self.enr_seq(),
|
||||
});
|
||||
self.send_packet(msg, remote_addr);
|
||||
self.send_packet(pong, remote_addr);
|
||||
|
||||
// if node was absent also send a ping to establish the endpoint proof from our end
|
||||
if is_new_insert {
|
||||
@@ -1179,6 +1188,10 @@ impl Discv4Service {
|
||||
// only ping if the node was added to the table
|
||||
self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
|
||||
}
|
||||
BucketInsertResult::Full => {
|
||||
// new node but the node's bucket is already full
|
||||
self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -1858,10 +1871,12 @@ enum PingReason {
|
||||
/// Represents node related updates state changes in the underlying node table
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DiscoveryUpdate {
|
||||
/// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
|
||||
EnrForkId(NodeRecord, ForkId),
|
||||
/// A new node was discovered _and_ added to the table.
|
||||
Added(NodeRecord),
|
||||
/// A new node was discovered but _not_ added to the table because it is currently full.
|
||||
DiscoveredAtCapacity(NodeRecord),
|
||||
/// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
|
||||
EnrForkId(NodeRecord, ForkId),
|
||||
/// Node that was removed from the table
|
||||
Removed(PeerId),
|
||||
/// A series of updates
|
||||
|
||||
@@ -156,6 +156,9 @@ impl Discovery {
|
||||
self.on_discv4_update(update);
|
||||
}
|
||||
}
|
||||
DiscoveryUpdate::DiscoveredAtCapacity(record) => {
|
||||
self.on_node_record_update(record, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user