From 3d5eabfe5929ef88b6b763f1174f48211bf50180 Mon Sep 17 00:00:00 2001 From: lunar-mining Date: Wed, 17 Jan 2024 11:01:36 +0100 Subject: [PATCH] net: downgrade host if they disconnect or we can't connect to them. This commit introduces a new Session method called downgrade_host(). It gets called on two occasions: * if we receive a stop signal on a channel (Inbound, Outbound, Manual sessions) * if we cannot establish a connection (Outbound and Manual session) This commit deprecates the "rejected" vector inside Outbound session that prevented us from instantly reconnecting to an inactive host. --- src/net/hosts/store.rs | 41 ++++++++++++++++++++++++++--- src/net/session/inbound_session.rs | 3 +++ src/net/session/manual_session.rs | 8 +++++- src/net/session/mod.rs | 34 ++++++++++++++++++------ src/net/session/outbound_session.rs | 17 +++++------- 5 files changed, 79 insertions(+), 24 deletions(-) diff --git a/src/net/hosts/store.rs b/src/net/hosts/store.rs index 5000d7b0b..c4c386925 100644 --- a/src/net/hosts/store.rs +++ b/src/net/hosts/store.rs @@ -686,14 +686,22 @@ impl Hosts { /// Get the entry for a given addr on the whitelist. pub async fn get_whitelist_entry_at_addr(&self, addr: &Url) -> Option<(Url, u64)> { - let whitelist = self.whitelist.read().await; - whitelist.iter().find(|(url, _)| url == addr).map(|(url, time)| (url.clone(), *time)) + self.whitelist + .read() + .await + .iter() + .find(|(url, _)| url == addr) + .map(|(url, time)| (url.clone(), *time)) } /// Get the entry for a given addr on the anchorlist. pub async fn get_anchorlist_entry_at_addr(&self, addr: &Url) -> Option<(Url, u64)> { - let anchorlist = self.anchorlist.read().await; - anchorlist.iter().find(|(url, _)| url == addr).map(|(url, time)| (url.clone(), *time)) + self.anchorlist + .read() + .await + .iter() + .find(|(url, _)| url == addr) + .map(|(url, time)| (url.clone(), *time)) } /// Return all known whitelisted hosts @@ -1177,6 +1185,31 @@ mod tests { }); } + #[test] + fn test_hostlist_get_entry() { + smol::block_on(async { + let settings = Settings { + localnet: false, + external_addrs: vec![ + Url::parse("tcp://foo.bar:123").unwrap(), + Url::parse("tcp://lol.cat:321").unwrap(), + ], + ..Default::default() + }; + + let hosts = Hosts::new(Arc::new(settings.clone())); + + let url = Url::parse("tcp://dark.renaissance:333").unwrap(); + let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); + + hosts.whitelist_store(url.clone(), last_seen).await; + hosts.anchorlist_store(url.clone(), last_seen).await; + + assert!(hosts.get_whitelist_entry_at_addr(&url).await.is_some()); + assert!(hosts.get_anchorlist_entry_at_addr(&url).await.is_some()); + }); + } + #[test] fn test_fetch_address() { smol::block_on(async { diff --git a/src/net/session/inbound_session.rs b/src/net/session/inbound_session.rs index ff0f092e9..8655a00ef 100644 --- a/src/net/session/inbound_session.rs +++ b/src/net/session/inbound_session.rs @@ -171,6 +171,9 @@ impl InboundSession { self.p2p().remove(channel.clone()).await; + // Downgrade this host to greylist if it's on the whitelist or anchorlist. + self.downgrade_host(&channel.info.addr).await; + debug!( target: "net::inbound_session::setup_channel()", "Received stop_sub, channel removed from P2P", diff --git a/src/net/session/manual_session.rs b/src/net/session/manual_session.rs index 7e928242a..30599cc7d 100644 --- a/src/net/session/manual_session.rs +++ b/src/net/session/manual_session.rs @@ -135,7 +135,7 @@ impl ManualSession { self.p2p().remove_pending(&addr).await; // Add this connection to the anchorlist, remove it from the [otherlist] - self.upgrade_connection(&addr).await; + self.upgrade_host(&addr).await; // Notify that channel processing has finished self.channel_subscriber.notify(Ok(channel)).await; @@ -146,6 +146,9 @@ impl ManualSession { target: "net::manual_session", "[P2P] Manual outbound disconnected [{}]", url, ); + // Downgrade this host to greylist if it's on the whitelist or anchorlist. + self.downgrade_host(&addr).await; + // DEV NOTE: Here we can choose to attempt reconnection again return Ok(()) } @@ -155,6 +158,9 @@ impl ManualSession { "[P2P] Unable to connect to manual outbound [{}]: {}", addr, e, ); + + // Downgrade this host to greylist if it's on the whitelist or anchorlist. + self.downgrade_host(&addr).await; } } diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index e32367626..3df935aa1 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -65,13 +65,6 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) { "Received stop event. Removing channel {}", channel.address(), ); - // Remove channel from anchorlist - if p2p.hosts().anchorlist_contains(channel.address()).await { - let index = - p2p.hosts().get_anchorlist_index_at_addr(channel.address().clone()).await.unwrap(); - p2p.hosts().anchorlist_remove(channel.address(), index).await; - } - // Remove channel from p2p p2p.remove(channel).await; debug!(target: "net::session::remove_sub_on_stop()", "[END]"); @@ -157,7 +150,7 @@ pub trait Session: Sync { /// Upgrade a connection to the anchorlist and remove it from the white or greylist. /// Called after a connection has been successfully established in Outbound and Manual /// sessions. - async fn upgrade_connection(&self, addr: &Url) { + async fn upgrade_host(&self, addr: &Url) { let hosts = self.p2p().hosts(); let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); @@ -174,6 +167,31 @@ pub trait Session: Sync { } } + /// Downgrade a connection. If it's on the anchorlist or the whitelist, remove it and add it + /// to the greylist. Called when we cannot establish a connection to a host or when a + /// pre-existing connection disconnects. + async fn downgrade_host(&self, addr: &Url) { + let hosts = self.p2p().hosts(); + + // Remove channel from anchorlist and add it to greylist + if hosts.anchorlist_contains(addr).await { + let (url, last_seen) = hosts.get_anchorlist_entry_at_addr(addr).await.unwrap(); + hosts.greylist_store_or_update(&[(url, last_seen)]).await; + + let index = hosts.get_anchorlist_index_at_addr(addr.clone()).await.unwrap(); + hosts.anchorlist_remove(addr, index).await; + } + + // Remove channel from whitelist and add to greylist + if hosts.whitelist_contains(addr).await { + let (url, last_seen) = hosts.get_whitelist_entry_at_addr(addr).await.unwrap(); + hosts.greylist_store_or_update(&[(url, last_seen)]).await; + + let index = hosts.get_whitelist_index_at_addr(addr.clone()).await.unwrap(); + hosts.whitelist_remove(addr, index).await; + } + } + /// Returns a pointer to the p2p network interface fn p2p(&self) -> P2pPtr; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index e23980531..3ea2eaa17 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -236,7 +236,6 @@ impl Slot { async fn run(self: Arc) { let hosts = self.p2p().hosts(); let slot_count = self.p2p().settings().outbound_connections; - let mut rejected = vec![]; loop { // Activate the slot @@ -297,10 +296,6 @@ impl Slot { addr: host.clone(), }); - if rejected.contains(&host) { - continue - } - let (addr, channel) = match self.try_connect(host.clone()).await { Ok(connect_info) => connect_info, Err(err) => { @@ -315,11 +310,8 @@ impl Slot { err: err.to_string() }); - // Add to the rejected list to avoid immediately reconnecting. - // TODO: Once it's been added to this list it will never be connected to - // in the lifespan of Outbound Session. Refactor this so that it gets put - // in a temporary quarantine and is freed up to try again later. - rejected.push(host.clone()); + // Downgrade this host to greylist if it's on the whitelist or anchorlist. + self.session().downgrade_host(&host).await; self.channel_id.store(0, Ordering::Relaxed); continue @@ -361,11 +353,14 @@ impl Slot { self.channel_id.store(channel.info.id, Ordering::Relaxed); // Add this connection to the anchorlist, remove it from the [otherlist] - self.session().upgrade_connection(&addr).await; + self.session().upgrade_host(&addr).await; // Wait for channel to close stop_sub.receive().await; self.channel_id.store(0, Ordering::Relaxed); + + // Downgrade this host to greylist if it's on the whitelist or anchorlist. + self.session().downgrade_host(&addr).await; } }