net: downgrade host if they disconnect or we can't connect to them.

This commit introduces a new Session method called downgrade_host().

It gets called on two occasions:

* if we receive a stop signal on a channel (Inbound, Outbound, Manual sessions)
* if we cannot establish a connection (Outbound and Manual session)

This commit deprecates the "rejected" vector inside Outbound session
that prevented us from instantly reconnecting to an inactive host.
This commit is contained in:
lunar-mining
2024-01-17 11:01:36 +01:00
parent 4ded978f06
commit 3d5eabfe59
5 changed files with 79 additions and 24 deletions

View File

@@ -686,14 +686,22 @@ impl Hosts {
/// Get the entry for a given addr on the whitelist.
pub async fn get_whitelist_entry_at_addr(&self, addr: &Url) -> Option<(Url, u64)> {
let whitelist = self.whitelist.read().await;
whitelist.iter().find(|(url, _)| url == addr).map(|(url, time)| (url.clone(), *time))
self.whitelist
.read()
.await
.iter()
.find(|(url, _)| url == addr)
.map(|(url, time)| (url.clone(), *time))
}
/// Get the entry for a given addr on the anchorlist.
pub async fn get_anchorlist_entry_at_addr(&self, addr: &Url) -> Option<(Url, u64)> {
let anchorlist = self.anchorlist.read().await;
anchorlist.iter().find(|(url, _)| url == addr).map(|(url, time)| (url.clone(), *time))
self.anchorlist
.read()
.await
.iter()
.find(|(url, _)| url == addr)
.map(|(url, time)| (url.clone(), *time))
}
/// Return all known whitelisted hosts
@@ -1177,6 +1185,31 @@ mod tests {
});
}
#[test]
fn test_hostlist_get_entry() {
smol::block_on(async {
let settings = Settings {
localnet: false,
external_addrs: vec![
Url::parse("tcp://foo.bar:123").unwrap(),
Url::parse("tcp://lol.cat:321").unwrap(),
],
..Default::default()
};
let hosts = Hosts::new(Arc::new(settings.clone()));
let url = Url::parse("tcp://dark.renaissance:333").unwrap();
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
hosts.whitelist_store(url.clone(), last_seen).await;
hosts.anchorlist_store(url.clone(), last_seen).await;
assert!(hosts.get_whitelist_entry_at_addr(&url).await.is_some());
assert!(hosts.get_anchorlist_entry_at_addr(&url).await.is_some());
});
}
#[test]
fn test_fetch_address() {
smol::block_on(async {

View File

@@ -171,6 +171,9 @@ impl InboundSession {
self.p2p().remove(channel.clone()).await;
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.downgrade_host(&channel.info.addr).await;
debug!(
target: "net::inbound_session::setup_channel()",
"Received stop_sub, channel removed from P2P",

View File

@@ -135,7 +135,7 @@ impl ManualSession {
self.p2p().remove_pending(&addr).await;
// Add this connection to the anchorlist, remove it from the [otherlist]
self.upgrade_connection(&addr).await;
self.upgrade_host(&addr).await;
// Notify that channel processing has finished
self.channel_subscriber.notify(Ok(channel)).await;
@@ -146,6 +146,9 @@ impl ManualSession {
target: "net::manual_session",
"[P2P] Manual outbound disconnected [{}]", url,
);
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.downgrade_host(&addr).await;
// DEV NOTE: Here we can choose to attempt reconnection again
return Ok(())
}
@@ -155,6 +158,9 @@ impl ManualSession {
"[P2P] Unable to connect to manual outbound [{}]: {}",
addr, e,
);
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.downgrade_host(&addr).await;
}
}

View File

@@ -65,13 +65,6 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) {
"Received stop event. Removing channel {}", channel.address(),
);
// Remove channel from anchorlist
if p2p.hosts().anchorlist_contains(channel.address()).await {
let index =
p2p.hosts().get_anchorlist_index_at_addr(channel.address().clone()).await.unwrap();
p2p.hosts().anchorlist_remove(channel.address(), index).await;
}
// Remove channel from p2p
p2p.remove(channel).await;
debug!(target: "net::session::remove_sub_on_stop()", "[END]");
@@ -157,7 +150,7 @@ pub trait Session: Sync {
/// Upgrade a connection to the anchorlist and remove it from the white or greylist.
/// Called after a connection has been successfully established in Outbound and Manual
/// sessions.
async fn upgrade_connection(&self, addr: &Url) {
async fn upgrade_host(&self, addr: &Url) {
let hosts = self.p2p().hosts();
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
@@ -174,6 +167,31 @@ pub trait Session: Sync {
}
}
/// Downgrade a connection. If it's on the anchorlist or the whitelist, remove it and add it
/// to the greylist. Called when we cannot establish a connection to a host or when a
/// pre-existing connection disconnects.
async fn downgrade_host(&self, addr: &Url) {
let hosts = self.p2p().hosts();
// Remove channel from anchorlist and add it to greylist
if hosts.anchorlist_contains(addr).await {
let (url, last_seen) = hosts.get_anchorlist_entry_at_addr(addr).await.unwrap();
hosts.greylist_store_or_update(&[(url, last_seen)]).await;
let index = hosts.get_anchorlist_index_at_addr(addr.clone()).await.unwrap();
hosts.anchorlist_remove(addr, index).await;
}
// Remove channel from whitelist and add to greylist
if hosts.whitelist_contains(addr).await {
let (url, last_seen) = hosts.get_whitelist_entry_at_addr(addr).await.unwrap();
hosts.greylist_store_or_update(&[(url, last_seen)]).await;
let index = hosts.get_whitelist_index_at_addr(addr.clone()).await.unwrap();
hosts.whitelist_remove(addr, index).await;
}
}
/// Returns a pointer to the p2p network interface
fn p2p(&self) -> P2pPtr;

View File

@@ -236,7 +236,6 @@ impl Slot {
async fn run(self: Arc<Self>) {
let hosts = self.p2p().hosts();
let slot_count = self.p2p().settings().outbound_connections;
let mut rejected = vec![];
loop {
// Activate the slot
@@ -297,10 +296,6 @@ impl Slot {
addr: host.clone(),
});
if rejected.contains(&host) {
continue
}
let (addr, channel) = match self.try_connect(host.clone()).await {
Ok(connect_info) => connect_info,
Err(err) => {
@@ -315,11 +310,8 @@ impl Slot {
err: err.to_string()
});
// Add to the rejected list to avoid immediately reconnecting.
// TODO: Once it's been added to this list it will never be connected to
// in the lifespan of Outbound Session. Refactor this so that it gets put
// in a temporary quarantine and is freed up to try again later.
rejected.push(host.clone());
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.session().downgrade_host(&host).await;
self.channel_id.store(0, Ordering::Relaxed);
continue
@@ -361,11 +353,14 @@ impl Slot {
self.channel_id.store(channel.info.id, Ordering::Relaxed);
// Add this connection to the anchorlist, remove it from the [otherlist]
self.session().upgrade_connection(&addr).await;
self.session().upgrade_host(&addr).await;
// Wait for channel to close
stop_sub.receive().await;
self.channel_id.store(0, Ordering::Relaxed);
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.session().downgrade_host(&addr).await;
}
}