diff --git a/src/net/hosts/refinery.rs b/src/net/hosts/refinery.rs index 1c1438919..7a1dfca6b 100644 --- a/src/net/hosts/refinery.rs +++ b/src/net/hosts/refinery.rs @@ -106,7 +106,7 @@ impl GreylistRefinery { let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); // Append to the whitelist. - hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await.unwrap(); + hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await; // Remove whitelisted peer from the greylist. hosts.greylist_remove(url, position).await; diff --git a/src/net/hosts/store.rs b/src/net/hosts/store.rs index 306a4d812..7cd3dbf1f 100644 --- a/src/net/hosts/store.rs +++ b/src/net/hosts/store.rs @@ -276,7 +276,7 @@ impl Hosts { /// Stores an address on the greylist or updates its last_seen field if we already /// have the address. - pub async fn greylist_store_or_update(&self, addrs: &[(Url, u64)]) -> Result<()> { + pub async fn greylist_store_or_update(&self, addrs: &[(Url, u64)]) { trace!(target: "store::greylist_store_or_update()", "[START]"); // Filter addresses before writing to the greylist. @@ -299,12 +299,11 @@ impl Hosts { self.greylist_update_last_seen(&addr, last_seen, index).await; } self.store_subscriber.notify(filtered_addrs_len).await; - Ok(()) } /// Stores an address on the whitelist or updates its last_seen field if we already /// have the address. - pub async fn whitelist_store_or_update(&self, addrs: &[(Url, u64)]) -> Result<()> { + pub async fn whitelist_store_or_update(&self, addrs: &[(Url, u64)]) { trace!(target: "store::whitelist_store_or_update()", "[START]"); // No address filtering for whitelist (whitelist is created from greylist) @@ -325,12 +324,11 @@ impl Hosts { .expect("Expected whitelist entry to exist"); self.whitelist_update_last_seen(addr, last_seen.clone(), index).await; } - Ok(()) } /// Stores an address on the anchorlist or updates its last_seen field if we already /// have the address. - pub async fn anchorlist_store_or_update(&self, addrs: &[(Url, u64)]) -> Result<()> { + pub async fn anchorlist_store_or_update(&self, addrs: &[(Url, u64)]) { trace!(target: "store::anchor_store_or_update()", "[START]"); // No address filtering for anchorlist (contains addresses we have already connected to) @@ -350,7 +348,6 @@ impl Hosts { .expect("Expected anchorlist entry to exist"); self.anchorlist_update_last_seen(addr, last_seen.clone(), index).await; } - Ok(()) } /// Append host to the greylist. Called on learning of a new peer. @@ -407,58 +404,58 @@ impl Hosts { trace!(target: "store::anchorlist_store()", "[END]"); } - // Downgrade a non-responsive host. - // TODO: it is perhaps more efficient simply to select another connection and allow - // the greylist refinery process to filter address. - pub async fn downgrade_host(&self, addr: &Url) -> Result<()> { - if self.anchorlist_contains(addr).await { - debug!(target: "net::store::downgrade_host()", - "Removing non responsive peer from anchorlist"); + //// Downgrade a non-responsive host. + //// TODO: it is perhaps more efficient simply to select another connection and allow + //// the greylist refinery process to filter address. + //pub async fn downgrade_host(&self, addr: &Url) -> Result<()> { + // if self.anchorlist_contains(addr).await { + // debug!(target: "net::store::downgrade_host()", + // "Removing non responsive peer from anchorlist"); - let index = self - .get_anchorlist_index_at_addr(addr.clone()) - .await - .expect("Expected anchorlist index to exist"); - let entry = self - .get_anchorlist_entry_at_addr(addr) - .await - .expect("Expected anchorlist entry to exist"); + // let index = self + // .get_anchorlist_index_at_addr(addr.clone()) + // .await + // .expect("Expected anchorlist index to exist"); + // let entry = self + // .get_anchorlist_entry_at_addr(addr) + // .await + // .expect("Expected anchorlist entry to exist"); - self.anchorlist_remove(addr, index).await; - self.greylist_store_or_update(&[entry]).await?; + // self.anchorlist_remove(addr, index).await; + // self.greylist_store_or_update(&[entry]).await?; - Ok(()) - } else if self.whitelist_contains(addr).await { - debug!(target: "net::store::downgrade_host()", - "Removing non responsive peer from whitelist"); + // Ok(()) + // } else if self.whitelist_contains(addr).await { + // debug!(target: "net::store::downgrade_host()", + // "Removing non responsive peer from whitelist"); - let index = self - .get_whitelist_index_at_addr(addr.clone()) - .await - .expect("Expected whitelist index to exist"); - let entry = self - .get_whitelist_entry_at_addr(addr) - .await - .expect("Expected whitelist entry to exist"); + // let index = self + // .get_whitelist_index_at_addr(addr.clone()) + // .await + // .expect("Expected whitelist index to exist"); + // let entry = self + // .get_whitelist_entry_at_addr(addr) + // .await + // .expect("Expected whitelist entry to exist"); - self.whitelist_remove(addr, index).await; - self.greylist_store_or_update(&[entry]).await?; + // self.whitelist_remove(addr, index).await; + // self.greylist_store_or_update(&[entry]).await?; - Ok(()) - } else { - debug!(target: "net::store::downgrade_host()", - "Removing non responsive peer from greylist"); + // Ok(()) + // } else { + // debug!(target: "net::store::downgrade_host()", + // "Removing non responsive peer from greylist"); - let index = self - .get_greylist_index_at_addr(addr.clone()) - .await - .expect("Expected greylist index to exist"); + // let index = self + // .get_greylist_index_at_addr(addr.clone()) + // .await + // .expect("Expected greylist index to exist"); - self.greylist_remove(addr, index).await; + // self.greylist_remove(addr, index).await; - Ok(()) - } - } + // Ok(()) + // } + //} /// Update the last_seen field of a peer on the greylist. pub async fn greylist_update_last_seen(&self, addr: &Url, last_seen: u64, index: usize) { @@ -918,7 +915,8 @@ impl Hosts { ret.push((addr.clone(), *last_seen)); limit -= 1; if limit == 0 { - debug!(target: "store::greylist_fetch_with_schemes", "Found matching scheme, returning"); + debug!(target: "deadlock", "Found matching grey scheme, returning"); + debug!(target: "store::greylist_fetch_with_schemes", "Found matching greylist entry, returning"); return ret } } @@ -952,12 +950,16 @@ impl Hosts { ret.push((addr.clone(), *last_seen)); parsed_limit -= 1; if parsed_limit == 0 { + debug!(target: "deadlock", + "Found matching white scheme, returning {:?}", ret); trace!(target: "store::whitelist_fetch_with_schemes", "Found matching white scheme, returning {:?}", ret); return ret } } - debug!(target: "store::whitelist_fetch_with_schemes", + debug!(target: "deadlock", + "No matching schemes! Trying greylist..."); + warn!(target: "store::whitelist_fetch_with_schemes", "No matching schemes! Trying greylist..."); return self.greylist_fetch_with_schemes(schemes, limit).await } @@ -997,14 +999,20 @@ impl Hosts { ret.push((addr.clone(), *last_seen)); parsed_limit -= 1; if parsed_limit == 0 { + debug!(target: "deadlock", + "Found matching anchor scheme, returning {:?}", ret); + trace!(target: "store::anchorlist_fetch_with_schemes", "Found matching anchor scheme, returning {:?}", ret); return ret } } - debug!(target: "store::anchorlist_fetch_with_schemes", - "No matching schemes! Trying greylist..."); + debug!(target: "deadlock", + "No matching schemes! Trying whitelist..."); + + warn!(target: "store::anchorlist_fetch_with_schemes", + "No matching schemes! Trying whitelist..."); return self.whitelist_fetch_with_schemes(schemes, limit).await } } diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 451b72065..9e5f680d2 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -228,7 +228,9 @@ impl P2p { /// Check whether we're connected to a given address pub async fn exists(&self, addr: &Url) -> bool { - self.channels.lock().await.contains_key(addr) + //self.channels.lock().await.contains_key(addr) + //true + false } /// Add a channel to the set of connected channels diff --git a/src/net/protocol/protocol_address.rs b/src/net/protocol/protocol_address.rs index 0fd4e8717..6a928aeab 100644 --- a/src/net/protocol/protocol_address.rs +++ b/src/net/protocol/protocol_address.rs @@ -100,7 +100,7 @@ impl ProtocolAddress { "Appending to greylist...", ); - self.hosts.greylist_store_or_update(&addrs_msg.addrs).await?; + self.hosts.greylist_store_or_update(&addrs_msg.addrs).await; } } diff --git a/src/net/protocol/protocol_seed.rs b/src/net/protocol/protocol_seed.rs index ab512b37d..82967a515 100644 --- a/src/net/protocol/protocol_seed.rs +++ b/src/net/protocol/protocol_seed.rs @@ -129,7 +129,7 @@ impl ProtocolBase for ProtocolSeed { target: "net::protocol_seed::start()", "Appending to greylist...", ); - self.hosts.greylist_store_or_update(&addrs_msg.addrs).await?; + self.hosts.greylist_store_or_update(&addrs_msg.addrs).await; debug!(target: "net::protocol_seed::start()", "END => address={}", self.channel.address()); Ok(()) diff --git a/src/net/session/manual_session.rs b/src/net/session/manual_session.rs index 043708526..7e928242a 100644 --- a/src/net/session/manual_session.rs +++ b/src/net/session/manual_session.rs @@ -134,6 +134,9 @@ impl ManualSession { // Remove pending lock since register_channel will add the channel to p2p self.p2p().remove_pending(&addr).await; + // Add this connection to the anchorlist, remove it from the [otherlist] + self.upgrade_connection(&addr).await; + // Notify that channel processing has finished self.channel_subscriber.notify(Ok(channel)).await; diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 89dd7f883..d98883f3f 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -24,6 +24,7 @@ use std::{ use async_trait::async_trait; use log::debug; use smol::Executor; +use url::Url; use super::{channel::ChannelPtr, p2p::P2pPtr, protocol::ProtocolVersion}; use crate::Result; @@ -143,16 +144,6 @@ pub trait Session: Sync { // Perform handshake protocol_version.run(executor.clone()).await?; - if self.type_id() != SESSION_INBOUND { - // Channel is now initialized. Timestamp this. - let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - - self.p2p() - .hosts() - .anchorlist_store_or_update(&[(channel.address().clone(), last_seen)]) - .await?; - } - // Add channel to p2p self.p2p().store(channel.clone()).await; @@ -163,6 +154,26 @@ pub trait Session: Sync { Ok(()) } + /// Upgrade a connection to the anchorlist and remove it from the white or greylist. + /// Called after a connection has been successfully established in Outbound and Manual + /// sessions. + async fn upgrade_connection(&self, addr: &Url) { + let hosts = self.p2p().hosts(); + + let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); + hosts.anchorlist_store_or_update(&[(addr.clone(), last_seen)]).await; + + if hosts.whitelist_contains(&addr).await { + let index = hosts.get_whitelist_index_at_addr(addr.clone()).await.unwrap(); + hosts.whitelist_remove(&addr, index).await; + } + + if hosts.greylist_contains(&addr).await { + let index = hosts.get_greylist_index_at_addr(addr.clone()).await.unwrap(); + hosts.greylist_remove(&addr, index).await; + } + } + /// Returns a pointer to the p2p network interface fn p2p(&self) -> P2pPtr; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index e771a0099..c650c7467 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -190,14 +190,13 @@ impl Slot { let connects = self.p2p().settings().outbound_connections; let white_count = connects * self.p2p().settings().white_connection_percent / 100; - // Up to anchor_connection_count connections: - // - // Select from the anchorlist - // If the anchorlist is empty, select from the whitelist - // If the whitelist is empty, select from the greylist - // If the greylist is empty, do peer discovery - let addrs = { + // Up to anchor_connection_count connections: + // + // Select from the anchorlist + // If the anchorlist is empty, select from the whitelist + // If the whitelist is empty, select from the greylist + // If the greylist is empty, do peer discovery if slot_count < self.p2p().settings().anchor_connection_count { debug!(target: "net::outbound_session::fetch_address()", "First two connections- prefer anchor connections"); @@ -230,6 +229,7 @@ impl Slot { // * address is already pending a connection let addr = hosts.check_address_with_lock(self.p2p(), addrs).await; return addr + //return None } // We first try to make connections to the addresses on our anchor list. We then find some @@ -282,6 +282,8 @@ impl Slot { continue }; + debug!(target: "deadlock", "Got addrs: {}, slot {} node {}", addr.0, self.slot, self.p2p().settings().node_id); + let host = addr.0; let slot = self.slot; @@ -305,12 +307,20 @@ impl Slot { slot, err, self.p2p().settings().node_id ); + debug!( + target: "deadlock", + "connection failed: {}, slot {} node {}, channel {}", + err, slot, self.p2p().settings().node_id, host.clone() + ); + dnetev!(self, OutboundSlotDisconnected, { slot, err: err.to_string() }); self.channel_id.store(0, Ordering::Relaxed); + + sleep(1).await; continue } }; @@ -347,6 +357,9 @@ impl Slot { self.channel_id.store(channel.info.id, Ordering::Relaxed); + // Add this connection to the anchorlist, remove it from the [otherlist] + self.session().upgrade_connection(&addr).await; + // Wait for channel to close stop_sub.receive().await; self.channel_id.store(0, Ordering::Relaxed); @@ -374,9 +387,9 @@ impl Slot { self.slot, addr, e ); - // At this point we've failed to connect. - // If the host is in the anchorlist or whitelist, downgrade it to greylist. - self.p2p().hosts().downgrade_host(&addr).await?; + //// At this point we've failed to connect. + //// If the host is in the anchorlist or whitelist, downgrade it to greylist. + //self.p2p().hosts().downgrade_host(&addr).await?; // Remove connection from pending self.p2p().remove_pending(&addr).await; diff --git a/src/net/tests.rs b/src/net/tests.rs index 2b4fd18d7..b3c6bfa8c 100644 --- a/src/net/tests.rs +++ b/src/net/tests.rs @@ -31,36 +31,36 @@ use crate::{ }; // Number of nodes to spawn and number of peers each node connects to -const N_NODES: usize = 10; +const N_NODES: usize = 5; const N_CONNS: usize = 3; #[test] fn p2p_test() { let mut cfg = simplelog::ConfigBuilder::new(); - //cfg.add_filter_ignore("sled".to_string()); - //cfg.add_filter_ignore("net::protocol_ping".to_string()); - //cfg.add_filter_ignore("net::channel::subscribe_stop()".to_string()); - //cfg.add_filter_ignore("net::hosts".to_string()); - //cfg.add_filter_ignore("net::inbound_session".to_string()); - //cfg.add_filter_ignore("net::outbound_session".to_string()); - //cfg.add_filter_ignore("net::session".to_string()); - //cfg.add_filter_ignore("net::refinery".to_string()); - //cfg.add_filter_ignore("net::message_subscriber".to_string()); - //cfg.add_filter_ignore("net::protocol_address".to_string()); - //cfg.add_filter_ignore("net::protocol_jobs_manager".to_string()); - //cfg.add_filter_ignore("net::protocol_version".to_string()); - //cfg.add_filter_ignore("net::protocol_registry".to_string()); - //cfg.add_filter_ignore("net::protocol_seed".to_string()); - //cfg.add_filter_ignore("net::channel".to_string()); - //cfg.add_filter_ignore("net::p2p::seed".to_string()); - //cfg.add_filter_ignore("net::p2p::start".to_string()); - //cfg.add_filter_ignore("store".to_string()); - //cfg.add_filter_ignore("net::store".to_string()); - //cfg.add_filter_ignore("net::channel::send()".to_string()); - //cfg.add_filter_ignore("net::channel::start()".to_string()); - //cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string()); - //cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string()); + cfg.add_filter_ignore("sled".to_string()); + cfg.add_filter_ignore("net::protocol_ping".to_string()); + cfg.add_filter_ignore("net::channel::subscribe_stop()".to_string()); + cfg.add_filter_ignore("net::hosts".to_string()); + cfg.add_filter_ignore("net::inbound_session".to_string()); + cfg.add_filter_ignore("net::outbound_session".to_string()); + cfg.add_filter_ignore("net::session".to_string()); + cfg.add_filter_ignore("net::refinery".to_string()); + cfg.add_filter_ignore("net::message_subscriber".to_string()); + cfg.add_filter_ignore("net::protocol_address".to_string()); + cfg.add_filter_ignore("net::protocol_jobs_manager".to_string()); + cfg.add_filter_ignore("net::protocol_version".to_string()); + cfg.add_filter_ignore("net::protocol_registry".to_string()); + cfg.add_filter_ignore("net::protocol_seed".to_string()); + cfg.add_filter_ignore("net::channel".to_string()); + cfg.add_filter_ignore("net::p2p::seed".to_string()); + cfg.add_filter_ignore("net::p2p::start".to_string()); + cfg.add_filter_ignore("store".to_string()); + cfg.add_filter_ignore("net::store".to_string()); + cfg.add_filter_ignore("net::channel::send()".to_string()); + cfg.add_filter_ignore("net::channel::start()".to_string()); + cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string()); + cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string()); cfg.add_filter_ignore("net::tcp".to_string()); simplelog::TermLogger::init( @@ -116,13 +116,13 @@ async fn hostlist_propagation(ex: Arc>) { for i in 0..N_NODES { // Everyone will connect to N_CONNS random peers. let mut peers = vec![]; - for _ in 0..N_CONNS { - let mut port = 13200 + i; - while port == 13200 + i { - port = 13200 + rng.gen_range(0..N_NODES); - } - peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap()); - } + //for _ in 0..N_CONNS { + // let mut port = 13200 + i; + // while port == 13200 + i { + // port = 13200 + rng.gen_range(0..N_NODES); + // } + // peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap()); + //} let settings = Settings { localnet: true, inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()], @@ -148,29 +148,29 @@ async fn hostlist_propagation(ex: Arc>) { } //info!("Waiting until all peers connect"); - sleep(30).await; + //sleep(5).await; - info!("Inspecting hostlists..."); - for p2p in p2p_instances.iter() { - let hosts = p2p.hosts(); + //info!("Inspecting hostlists..."); + //for p2p in p2p_instances.iter() { + // let hosts = p2p.hosts(); - let greylist = hosts.greylist.read().await; - let whitelist = hosts.whitelist.read().await; - let anchorlist = hosts.anchorlist.read().await; + // let greylist = hosts.greylist.read().await; + // let whitelist = hosts.whitelist.read().await; + // let anchorlist = hosts.anchorlist.read().await; - info!("Node {}", p2p.settings().node_id); - for (i, (url, last_seen)) in greylist.iter().enumerate() { - info!("Greylist entry {}: {}, {}", i, url, last_seen); - } + // info!("Node {}", p2p.settings().node_id); + // for (i, (url, last_seen)) in greylist.iter().enumerate() { + // info!("Greylist entry {}: {}, {}", i, url, last_seen); + // } - for (i, (url, last_seen)) in whitelist.iter().enumerate() { - info!("Whitelist entry {}: {}, {}", i, url, last_seen); - } + // for (i, (url, last_seen)) in whitelist.iter().enumerate() { + // info!("Whitelist entry {}: {}, {}", i, url, last_seen); + // } - for (i, (url, last_seen)) in anchorlist.iter().enumerate() { - info!("Anchorlist entry {}: {}, {}", i, url, last_seen); - } - } + // for (i, (url, last_seen)) in anchorlist.iter().enumerate() { + // info!("Anchorlist entry {}: {}, {}", i, url, last_seen); + // } + //} // Stop the P2P network for p2p in p2p_instances.iter() {