net: restructure hostlist removal logic

Previously we would remove a host from the hostlist in two scenarios:

1. When a connected host disconnects
2. When we cannot establish a connection.

However, due to the refinery nodes frequently connect and disconnect
from eachother. This behavior (n.1 above) shouldn't result in entries
being removed from the hostlist.

Further, immediately removing a host when we cannot establish a connection
(n.2) is a more aggressive approach than the previous net code. Previously
we had a quaratine mechanism that allowed us to re-attempt N times before
the host was considered `rejected`.

Following this commit, the hostlist removal mechanism is as follows:

1. Remove a host if the version exchange in register_channel() fails
2. Re-introduce quarantine for outbound::try_connect. This removes a
   host from the hostlist, and then allow multiple subsequent connection
   attempts before marking the host as `rejected`.
This commit is contained in:
draoi
2024-02-02 13:06:06 +01:00
parent af7ac9edb3
commit ba8edefb59
4 changed files with 46 additions and 11 deletions

View File

@@ -63,6 +63,13 @@ pub struct Hosts {
/// Nodes to which we have already been able to establish a connection.
pub anchorlist: RwLock<Vec<(Url, u64)>>,
/// Set of stored addresses that are quarantined.
/// We quarantine peers we've been unable to connect to, but we keep them
/// around so we can potentially try them again, up to n tries. This should
/// be helpful in order to self-heal the p2p connections in case we have an
/// Internet interrupt (goblins unplugging cables)
quarantine: RwLock<HashMap<Url, usize>>,
/// Peers we reject from connecting to
rejected: RwLock<HashSet<String>>,
@@ -83,6 +90,7 @@ impl Hosts {
greylist: RwLock::new(Vec::new()),
whitelist: RwLock::new(Vec::new()),
anchorlist: RwLock::new(Vec::new()),
quarantine: RwLock::new(HashMap::new()),
rejected: RwLock::new(HashSet::new()),
migrating: RwLock::new(HashSet::new()),
store_subscriber: Subscriber::new(),
@@ -642,6 +650,28 @@ impl Hosts {
ret
}
/// Quarantine a peer.
/// If they've been quarantined for more than a configured limit, forget them.
pub async fn quarantine(&self, url: &Url) {
debug!(target: "store::remove()", "Quarantining peer {}", url);
// Remove from the entire hosts set
self.remove_host(url).await;
let mut q = self.quarantine.write().await;
if let Some(retries) = q.get_mut(url) {
*retries += 1;
debug!(target: "net::hosts::quarantine()", "Peer {} quarantined {} times", url, retries);
if *retries == self.settings.hosts_quarantine_limit {
debug!(target: "net::hosts::quarantine()", "Banning peer {}", url);
q.remove(url);
self.mark_rejected(url).await;
}
} else {
debug!(target: "net::hosts::remove()", "Added peer {} to quarantine", url);
q.insert(url.clone(), 0);
}
}
/// Check if a given peer (URL) is in the set of rejected hosts
pub async fn is_rejected(&self, peer: &Url) -> bool {
// Skip lookup for UNIX sockets and localhost connections

View File

@@ -146,9 +146,6 @@ impl ManualSession {
"[P2P] Manual outbound disconnected [{}]", url,
);
// Remove this host from the hostlist.
self.p2p().hosts().remove_host(&addr).await;
// DEV NOTE: Here we can choose to attempt reconnection again
return Ok(())
}

View File

@@ -102,10 +102,21 @@ pub trait Session: Sync {
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
// Switch on the channel
channel.start(executor.clone());
channel.clone().start(executor.clone());
// Wait for handshake to finish.
handshake_task.await?;
// If the handshake returns an error, remove this host from all hostlists.
match handshake_task.await {
Ok(()) => {
debug!(target: "net::session::register_channel()",
"Handshake successful {}", channel.clone().address());
}
Err(e) => {
debug!(target: "net::session::register_channel()",
"Handshake error {}. Removing {} from hostlists", e, channel.clone().address());
p2p.hosts().remove_host(channel.address()).await;
}
}
// Now the channel is ready
debug!(target: "net::session::register_channel()", "Session handshake complete");

View File

@@ -323,9 +323,6 @@ impl Slot {
err: err.to_string()
});
// Remove this host from the hostlist.
hosts.remove_host(&host).await;
self.channel_id.store(0, Ordering::Relaxed);
continue
}
@@ -371,9 +368,6 @@ impl Slot {
// Wait for channel to close
stop_sub.receive().await;
self.channel_id.store(0, Ordering::Relaxed);
// Remove this host from the hostlist.
hosts.remove_host(&addr).await;
}
}
@@ -398,6 +392,9 @@ impl Slot {
self.slot, addr, e
);
// At this point we failed to connect. We'll quarantine this peer now.
self.p2p().hosts().quarantine(&addr).await;
// Remove connection from pending
self.p2p().remove_pending(&addr).await;