net: handle try_register() errors

This commit is contained in:
epiphany
2025-10-15 16:20:48 +02:00
committed by skoupidi
parent 6b7e1cf7ca
commit 6a4326bad9
6 changed files with 45 additions and 23 deletions

View File

@@ -936,7 +936,7 @@ impl Hosts {
// Then ensure we aren't currently trying to add this peer to the hostlist.
for (i, (addr, last_seen)) in filtered_addrs.iter().enumerate() {
if let Err(e) = self.try_register(addr.clone(), HostState::Insert) {
debug!(target: "net::hosts::store_or_update", "Cannot insert addr={}, err={e}",
debug!(target: "net::hosts::insert()", "Cannot insert addr={}, err={e}",
addr.clone());
continue
@@ -948,7 +948,9 @@ impl Hosts {
self.container.sort_by_last_seen(color.clone() as usize);
self.container.resize(color.clone());
self.unregister(addr);
if let Err(e) = self.unregister(addr) {
warn!(target: "net::hosts::insert()", "Error while unregistering addr={addr}, err={e}");
}
}
self.store_publisher.notify(addrs_len).await;
@@ -1048,10 +1050,11 @@ impl Hosts {
}
/// Mark as host as Free which frees it up for most future operations.
pub(in crate::net) fn unregister(&self, addr: &Url) {
pub(in crate::net) fn unregister(&self, addr: &Url) -> Result<()> {
let age = UNIX_EPOCH.elapsed().unwrap().as_secs();
self.try_register(addr.clone(), HostState::Free(age)).unwrap();
self.try_register(addr.clone(), HostState::Free(age))?;
debug!(target: "net::hosts::unregister()", "Unregistered: {}", &addr);
Ok(())
}
/// Return the list of all connected channels, including seed and
@@ -1433,9 +1436,7 @@ impl Hosts {
self.move_host(addr, last_seen, HostColor::Grey).await?;
// Free up this addr for future operations.
self.unregister(addr);
Ok(())
self.unregister(addr)
}
pub async fn whitelist_host(&self, addr: &Url, last_seen: u64) -> Result<()> {
@@ -1443,9 +1444,7 @@ impl Hosts {
self.move_host(addr, last_seen, HostColor::White).await?;
// Free up this addr for future operations.
self.unregister(addr);
Ok(())
self.unregister(addr)
}
/// A single function for moving hosts between hostlists. Called on the following occasions:

View File

@@ -216,7 +216,9 @@ impl Slot {
);
// Free up this addr for future operations.
self.p2p().hosts().unregister(channel.address());
if let Err(e) = self.p2p().hosts().unregister(channel.address()) {
warn!(target: "net::manual_session", "[P2P] Error while unregistering addr={}, err={e}", channel.display_address());
}
}
}
}
@@ -227,7 +229,9 @@ impl Slot {
);
// Free up this addr for future operations.
self.p2p().hosts().unregister(&self.addr);
if let Err(e) = self.p2p().hosts().unregister(&self.addr) {
warn!(target: "net::manual_session", "[P2P] Error while unregistering addr={}, err={e}", self.addr);
}
}
}

View File

@@ -102,7 +102,9 @@ pub async fn remove_sub_on_stop(
// don't call this on refine sessions since the unregister() call
// happens in the refinery directly.
if type_id & SESSION_REFINE == 0 {
hosts.unregister(channel.address());
if let Err(e) = hosts.unregister(channel.address()) {
error!(target: "net::session::remove_sub_on_stop()", "Error while unregistering addr={}, err={e}", channel.display_address());
}
}
if !p2p.is_connected() {

View File

@@ -363,13 +363,22 @@ impl Slot {
);
// Peer disconnected during the registry process. We'll downgrade this peer now.
self.p2p().hosts().move_host(channel.address(), last_seen, HostColor::Grey).await?;
if let Err(e) = self
.p2p()
.hosts()
.move_host(channel.address(), last_seen, HostColor::Grey)
.await
{
warn!(target: "net::outbound_session", "Error while moving addr={} to greylist: {e}", channel.display_address());
continue
}
// Mark its state as Suspend, which sends this node to the Refinery for processing.
self.p2p()
.hosts()
.try_register(channel.address().clone(), HostState::Suspend)
.unwrap();
if let Err(e) =
self.p2p().hosts().try_register(channel.address().clone(), HostState::Suspend)
{
warn!(target: "net::outbound_session", "Error while suspending addr={}: {e}", channel.display_address());
}
continue
}
@@ -412,7 +421,9 @@ impl Slot {
self.p2p().hosts().move_host(&addr, last_seen, HostColor::Grey).await?;
// Mark its state as Suspend, which sends it to the Refinery for processing.
self.p2p().hosts().try_register(addr.clone(), HostState::Suspend).unwrap();
if let Err(e) = self.p2p().hosts().try_register(addr.clone(), HostState::Suspend) {
warn!(target: "net::outbound_session::try_connect()", "Error while suspending addr={addr}: {e}");
}
// Notify that channel processing failed
self.p2p().hosts().channel_publisher.notify(Err(err.clone())).await;

View File

@@ -252,13 +252,15 @@ impl GreylistRefinery {
warn!(target: "net::refinery", "No connections for {}s. GreylistRefinery paused.",
offline_timer.as_secs());
// It is neccessary to Free suspended hosts at this point, otherwise these
// It is necessary to Free suspended hosts at this point, otherwise these
// hosts cannot be connected to in Outbound Session. Failure to do this could
// result in the refinery being paused forver (since connections could never be
// made).
let suspended_hosts = hosts.suspended();
for host in suspended_hosts {
hosts.unregister(&host);
if let Err(e) = hosts.unregister(&host) {
warn!(target: "net::refinery", "Error while unregistering addr={host}, err={e}");
}
}
continue
@@ -284,7 +286,9 @@ impl GreylistRefinery {
);
// Free up this addr for future operations.
hosts.unregister(url);
if let Err(e) = hosts.unregister(url) {
warn!(target: "net::refinery", "Error while unregistering addr={url}, err={e}");
}
continue
}

View File

@@ -275,7 +275,9 @@ impl Slot {
self.failed.store(true, SeqCst);
// Free up this addr for future operations.
self.p2p().hosts().unregister(addr);
if let Err(e) = self.p2p().hosts().unregister(addr) {
warn!(target: "net::session::seedsync_session", "[P2P] Error while unregistering addr={addr}, err={e}");
}
// Reset the CondVar for future use.
self.reset();