smol touchups

This commit is contained in:
Matthias Seitz
2023-10-03 12:44:59 +02:00
parent ce93096993
commit 00aec1a36e

View File

@@ -328,6 +328,7 @@ impl Discv4 {
let cmd = Discv4Command::Ban(node_id, ip);
self.send_to_service(cmd);
}
/// Adds the ip to the ban list.
///
/// This will prevent any future inclusion in the table
@@ -389,11 +390,10 @@ impl Discv4 {
self.to_service.send(cmd)?;
Ok(rx.await?)
}
/// Terminates the Discv4Service.
/// Terminates the spawned [Discv4Service].
pub fn terminate(&self) {
let cmd = Discv4Command::Terminated;
self.send_to_service(cmd);
self.send_to_service(Discv4Command::Terminated);
}
}
@@ -671,6 +671,7 @@ impl Discv4Service {
while let Some(event) = self.next().await {
trace!(target : "discv4", ?event, "processed");
}
trace!(target : "discv4", "service terminated");
})
}
@@ -717,8 +718,8 @@ impl Discv4Service {
self.kbuckets
.closest_values(&target_key)
.filter(|node| {
node.value.has_endpoint_proof
&& !self.pending_find_nodes.contains_key(&node.key.preimage().0)
node.value.has_endpoint_proof &&
!self.pending_find_nodes.contains_key(&node.key.preimage().0)
})
.take(MAX_NODES_PER_BUCKET)
.map(|n| (target_key.distance(&n.key), n.value.record)),
@@ -734,7 +735,7 @@ impl Discv4Service {
// (e.g. connectivity problems over a long period of time, or issues during initial
// bootstrapping) so we attempt to bootstrap again
self.bootstrap();
return;
return
}
trace!(target : "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
@@ -812,7 +813,7 @@ impl Discv4Service {
fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
if timestamp.elapsed() < self.config.bond_expiration {
return true;
return true
}
}
false
@@ -824,7 +825,7 @@ impl Discv4Service {
/// followup request to retrieve the updated ENR
fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
if record.id == self.local_node_record.id {
return;
return
}
// If EIP868 extension is disabled then we want to ignore this
@@ -858,7 +859,7 @@ impl Discv4Service {
/// Callback invoked when we receive a pong from the peer.
fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
if record.id == *self.local_peer_id() {
return;
return
}
// If EIP868 extension is disabled then we want to ignore this
@@ -965,7 +966,7 @@ impl Discv4Service {
fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
if self.is_expired(ping.expire) {
// ping's expiration timestamp is in the past
return;
return
}
// create the record
@@ -1063,17 +1064,17 @@ impl Discv4Service {
fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
if node.id == *self.local_peer_id() {
// don't ping ourselves
return;
return
}
if self.pending_pings.contains_key(&node.id)
|| self.pending_find_nodes.contains_key(&node.id)
if self.pending_pings.contains_key(&node.id) ||
self.pending_find_nodes.contains_key(&node.id)
{
return;
return
}
if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
return;
return
}
if self.pending_pings.len() < MAX_NODES_PING {
@@ -1108,7 +1109,7 @@ impl Discv4Service {
/// Returns the echo hash of the ping message.
pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
if !self.config.enable_eip868 {
return;
return
}
let remote_addr = node.udp_addr();
let enr_request = EnrRequest { expire: self.enr_request_expiration() };
@@ -1123,7 +1124,7 @@ impl Discv4Service {
/// Message handler for an incoming `Pong`.
fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
if self.is_expired(pong.expire) {
return;
return
}
let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
@@ -1132,7 +1133,7 @@ impl Discv4Service {
let request = entry.get();
if request.echo_hash != pong.echo {
debug!( target : "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
return;
return
}
}
entry.remove()
@@ -1170,11 +1171,11 @@ impl Discv4Service {
fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
if self.is_expired(msg.expire) {
// ping's expiration timestamp is in the past
return;
return
}
if node_id == *self.local_peer_id() {
// ignore find node requests to ourselves
return;
return
}
if self.has_bond(node_id, remote_addr.ip()) {
@@ -1222,7 +1223,7 @@ impl Discv4Service {
request_hash: B256,
) {
if !self.config.enable_eip868 || self.is_expired(msg.expire) {
return;
return
}
if self.has_bond(id, remote_addr.ip()) {
@@ -1241,7 +1242,7 @@ impl Discv4Service {
fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
if self.is_expired(msg.expire) {
// response is expired
return;
return
}
// check if this request was expected
let ctx = match self.pending_find_nodes.entry(node_id) {
@@ -1257,7 +1258,7 @@ impl Discv4Service {
request.response_count = total;
} else {
debug!(target : "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
return;
return
}
};
@@ -1273,7 +1274,7 @@ impl Discv4Service {
Entry::Vacant(_) => {
// received neighbours response without requesting it
debug!( target : "discv4", from=?remote_addr, "Received unsolicited Neighbours");
return;
return
}
};
@@ -1283,7 +1284,7 @@ impl Discv4Service {
// prevent banned peers from being added to the context
if self.config.ban_list.is_banned(&node.id, &node.address) {
trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
continue;
continue
}
ctx.add_node(node);
@@ -1352,7 +1353,7 @@ impl Discv4Service {
self.pending_pings.retain(|node_id, ping_request| {
if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
failed_pings.push(*node_id);
return false;
return false
}
true
});
@@ -1377,7 +1378,7 @@ impl Discv4Service {
// treat this as an hard error since it responded.
failed_neighbours.push(*node_id);
}
return false;
return false
}
true
});
@@ -1405,7 +1406,7 @@ impl Discv4Service {
if let Some(bucket) = self.kbuckets.get_bucket(&key) {
if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
// skip half empty bucket
continue;
continue
}
}
self.remove_node(node_id);
@@ -1452,7 +1453,7 @@ impl Discv4Service {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
if self.config.enforce_expiration_timestamps && timestamp < now {
debug!(target: "discv4", "Expired packet");
return Err(());
return Err(())
}
Ok(())
}
@@ -1496,7 +1497,7 @@ impl Discv4Service {
loop {
// drain buffered events first
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event);
return Poll::Ready(event)
}
// trigger self lookup
@@ -1562,8 +1563,7 @@ impl Discv4Service {
}
Discv4Command::Terminated => {
//self.terminate();
//todos!
// terminate the service
self.queued_events.push_back(Discv4Event::Terminated);
}
}
@@ -1624,7 +1624,7 @@ impl Discv4Service {
}
if self.queued_events.is_empty() {
return Poll::Pending;
return Poll::Pending
}
}
}
@@ -1636,13 +1636,11 @@ impl Stream for Discv4Service {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Poll the internal poll method
let event = ready!(self.get_mut().poll(cx));
match event {
// If the event is Terminated, return Poll::Ready(None)
match ready!(self.get_mut().poll(cx)) {
// if the service is terminated, return None to terminate the stream
Discv4Event::Terminated => Poll::Ready(None),
// For any other event, return Poll::Ready(Some(event))
_ => Poll::Ready(Some(event)),
ev => Poll::Ready(Some(ev)),
}
}
}
@@ -1710,7 +1708,7 @@ pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_i
if packet.node_id == local_id {
// received our own message
debug!(target : "discv4", ?remote_addr, "Received own packet.");
continue;
continue
}
send(IngressEvent::Packet(remote_addr, packet)).await;
}
@@ -1795,7 +1793,7 @@ impl LookupTargetRotator {
self.counter += 1;
self.counter %= self.interval;
if self.counter == 0 {
return *local;
return *local
}
PeerId::random()
}