net: add peer discovery connection strategy

This commit is contained in:
x
2023-08-25 17:04:54 +02:00
parent 9ed979454c
commit 1e752bfb7e
7 changed files with 232 additions and 84 deletions

View File

@@ -45,29 +45,42 @@ pub type SendMessage = MessageInfo;
pub type RecvMessage = MessageInfo;
#[derive(Clone, Debug)]
pub struct OutboundConnecting {
pub struct OutboundSlotSleeping {
pub slot: u32,
}
#[derive(Clone, Debug)]
pub struct OutboundSlotConnecting {
pub slot: u32,
pub addr: Url,
}
#[derive(Clone, Debug)]
pub struct OutboundConnected {
pub struct OutboundSlotConnected {
pub slot: u32,
pub addr: Url,
pub channel_id: u32,
}
#[derive(Clone, Debug)]
pub struct OutboundDisconnected {
pub struct OutboundSlotDisconnected {
pub slot: u32,
pub err: String,
}
#[derive(Clone, Debug)]
pub struct OutboundPeerDiscovery {
pub attempt: u32,
pub state: &'static str,
}
#[derive(Clone, Debug)]
pub enum DnetEvent {
SendMessage(MessageInfo),
RecvMessage(MessageInfo),
OutboundConnecting(OutboundConnecting),
OutboundConnected(OutboundConnected),
OutboundDisconnected(OutboundDisconnected),
OutboundSlotSleeping(OutboundSlotSleeping),
OutboundSlotConnecting(OutboundSlotConnecting),
OutboundSlotConnected(OutboundSlotConnected),
OutboundSlotDisconnected(OutboundSlotDisconnected),
OutboundPeerDiscovery(OutboundPeerDiscovery),
}

View File

@@ -27,6 +27,10 @@ use smol::lock::RwLock;
use url::Url;
use super::settings::SettingsPtr;
use crate::{
system::{Subscriber, SubscriberPtr, Subscription},
Result,
};
/// Atomic pointer to hosts object
pub type HostsPtr = Arc<Hosts>;
@@ -43,6 +47,9 @@ pub struct Hosts {
/// Internet interrupt (goblins unplugging cables)
quarantine: RwLock<HashMap<Url, usize>>,
/// Subscriber listening for store updates
store_subscriber: SubscriberPtr<usize>,
/// Pointer to configured P2P settings
settings: SettingsPtr,
}
@@ -53,6 +60,7 @@ impl Hosts {
Arc::new(Self {
addrs: RwLock::new(HashSet::new()),
quarantine: RwLock::new(HashMap::new()),
store_subscriber: Subscriber::new(),
settings,
})
}
@@ -62,6 +70,7 @@ impl Hosts {
debug!(target: "net::hosts::store()", "hosts::store() [START]");
let filtered_addrs = self.filter_addresses(addrs).await;
let filtered_addrs_len = filtered_addrs.len();
if !filtered_addrs.is_empty() {
let mut addrs_map = self.addrs.write().await;
@@ -77,9 +86,15 @@ impl Hosts {
}
}
self.store_subscriber.notify(filtered_addrs_len).await;
debug!(target: "net::hosts::store()", "hosts::store() [END]");
}
pub async fn subscribe_store(&self) -> Result<Subscription<usize>> {
let sub = self.store_subscriber.clone().subscribe().await;
Ok(sub)
}
/// Filter given addresses based on certain rulesets and validity.
async fn filter_addresses(&self, addrs: &[Url]) -> Vec<Url> {
debug!(target: "net::hosts::filter_addresses()", "Filtering addrs: {:?}", addrs);

View File

@@ -57,8 +57,6 @@ pub struct ManualSession {
connect_slots: Mutex<Vec<StoppableTaskPtr>>,
/// Subscriber used to signal channels processing
channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
/// Flag to toggle channel_subscriber notifications
notify: Mutex<bool>,
}
impl ManualSession {
@@ -68,7 +66,6 @@ impl ManualSession {
p2p: LazyWeak::new(),
connect_slots: Mutex::new(Vec::new()),
channel_subscriber: Subscriber::new(),
notify: Mutex::new(false),
})
}
@@ -137,9 +134,7 @@ impl ManualSession {
self.p2p().remove_pending(&addr).await;
// Notify that channel processing has finished
if *self.notify.lock().await {
self.channel_subscriber.notify(Ok(channel)).await;
}
self.channel_subscriber.notify(Ok(channel)).await;
// Wait for channel to close
stop_sub.receive().await;
@@ -162,9 +157,7 @@ impl ManualSession {
// Wait and try again.
// TODO: Should we notify about the failure now, or after all attempts
// have failed?
if *self.notify.lock().await {
self.channel_subscriber.notify(Err(Error::ConnectFailed)).await;
}
self.channel_subscriber.notify(Err(Error::ConnectFailed)).await;
remaining = if attempts == 0 { 1 } else { remaining - 1 };
if remaining == 0 {
@@ -189,16 +182,6 @@ impl ManualSession {
Ok(())
}
/// Enable channel_subscriber notifications.
pub async fn enable_notify(self: Arc<Self>) {
*self.notify.lock().await = true;
}
/// Disable channel_subscriber notifications.
pub async fn disable_notify(self: Arc<Self>) {
*self.notify.lock().await = false;
}
}
#[async_trait]

View File

@@ -26,7 +26,10 @@
//! and insures that no other part of the program uses the slots at the
//! same time.
use std::sync::{Arc, Weak};
use std::{
sync::{Arc, Weak},
time::{Duration, Instant},
};
use async_trait::async_trait;
use log::{debug, error, info, warn};
@@ -46,7 +49,8 @@ use super::{
};
use crate::{
system::{
sleep, CondVar, LazyWeak, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr,
sleep, timeout::timeout, CondVar, LazyWeak, StoppableTask, StoppableTaskPtr, Subscriber,
SubscriberPtr,
},
Error, Result,
};
@@ -188,6 +192,10 @@ impl Slot {
let addr = if let Some(addr) = self.fetch_address_with_lock(transports).await {
addr
} else {
dnetev!(self, OutboundSlotSleeping, {
slot: self.slot,
});
self.wakeup_self.reset();
// Peer discovery
self.session().wakeup_peer_discovery();
@@ -196,6 +204,17 @@ impl Slot {
continue
};
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Connecting outbound slot #{} [{}]",
self.slot, addr,
);
dnetev!(self, OutboundSlotConnecting, {
slot: self.slot,
addr: addr.clone(),
});
let (addr_final, channel) = match self.try_connect(addr.clone()).await {
Ok(connect_info) => connect_info,
Err(err) => {
@@ -205,7 +224,7 @@ impl Slot {
self.slot, err,
);
dnetev!(self, OutboundDisconnected, {
dnetev!(self, OutboundSlotDisconnected, {
slot: self.slot,
err: err.to_string()
});
@@ -213,16 +232,28 @@ impl Slot {
}
};
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Outbound slot #{} connected [{}]",
self.slot, addr_final
);
dnetev!(self, OutboundSlotConnected, {
slot: self.slot,
addr: addr_final.clone(),
channel_id: channel.info.id
});
let stop_sub = channel.subscribe_stop().await.expect("Channel should not be stopped");
// Setup new channel
if let Err(err) = self.setup_channel(addr, addr_final, channel.clone()).await {
if let Err(err) = self.setup_channel(addr, channel.clone()).await {
info!(
target: "net::outbound_session",
"[P2P] Outbound slot #{} disconnected: {}",
self.slot, err
);
dnetev!(self, OutboundDisconnected, {
dnetev!(self, OutboundSlotDisconnected, {
slot: self.slot,
err: err.to_string()
});
@@ -241,17 +272,6 @@ impl Slot {
/// channel. In case of any failures, a network error is returned and the
/// main connect loop (parent of this function) will iterate again.
async fn try_connect(&self, addr: Url) -> Result<(Url, ChannelPtr)> {
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Connecting outbound slot #{} [{}]",
self.slot, addr,
);
dnetev!(self, OutboundConnecting, {
slot: self.slot,
addr: addr.clone(),
});
let parent = Arc::downgrade(&self.session());
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));
@@ -276,19 +296,7 @@ impl Slot {
}
}
async fn setup_channel(&self, addr: Url, addr_final: Url, channel: ChannelPtr) -> Result<()> {
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Outbound slot #{} connected [{}]",
self.slot, addr_final
);
dnetev!(self, OutboundConnected, {
slot: self.slot,
addr: addr_final.clone(),
channel_id: channel.info.id
});
async fn setup_channel(&self, addr: Url, channel: ChannelPtr) -> Result<()> {
// Register the new channel
self.session().register_channel(channel.clone(), self.p2p().executor()).await?;
@@ -418,44 +426,134 @@ impl PeerDiscovery {
/// after broadcasting in order to let the P2P stack receive and work through
/// the addresses it is expecting.
async fn run(self: Arc<Self>) {
let mut current_attempt = 0;
loop {
dnetev!(self, OutboundPeerDiscovery, {
attempt: current_attempt,
state: "wait",
});
// wait to be woken up by notify()
self.wakeup_self.wait().await;
let sleep_was_instant = self.wait().await;
let p2p = self.p2p();
// Broadcast the GetAddrs message to all active channels.
// If we have no active channels, we will perform a SeedSyncSession instead.
if p2p.is_connected().await {
if sleep_was_instant {
// Try again
current_attempt += 1;
} else {
// reset back to start
current_attempt = 1;
}
if current_attempt >= 4 {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Outbound: Broadcasting GetAddrs across active channels",
"[P2P] Sleeping and trying again..."
);
dnetev!(self, OutboundPeerDiscovery, {
attempt: current_attempt,
state: "sleep",
});
sleep(p2p.settings().outbound_peer_discovery_cooloff_time).await;
current_attempt = 1;
}
// First 2 times try sending GetAddr to the network.
// 3rd time do a seed sync.
if p2p.is_connected().await && current_attempt <= 2 {
// Broadcast the GetAddrs message to all active channels.
// If we have no active channels, we will perform a SeedSyncSession instead.
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Requesting addrs from active channels. Attempt: {}",
current_attempt
);
dnetev!(self, OutboundPeerDiscovery, {
attempt: current_attempt,
state: "getaddr",
});
let get_addrs = GetAddrsMessage { max: p2p.settings().outbound_connections as u32 };
p2p.broadcast(&get_addrs).await;
// Temporary workaround. Sleep until the nodes respond back and
// we process the addr messages.
sleep(p2p.settings().outbound_connect_timeout).await;
// Wait for a hosts store update event
let store_sub = self.p2p().hosts().subscribe_store().await.unwrap();
let result = timeout(
Duration::from_secs(p2p.settings().outbound_peer_discovery_attempt_time),
store_sub.receive(),
)
.await;
match result {
Ok(addrs_len) => {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Discovered {} addrs", addrs_len
);
}
Err(_) => {
warn!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Peer discovery waiting for addrs timed out."
);
// TODO: Just do seed next time
}
}
// TODO: check every subscribe() call has a corresponding unsubscribe()
store_sub.unsubscribe().await;
} else {
warn!(
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] No connected channels found for peer discovery. Reseeding.",
"[P2P] Seeding hosts. Attempt: {}",
current_attempt
);
if let Err(e) = p2p.clone().seed().await {
error!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Network reseed failed: {}", e,
);
dnetev!(self, OutboundPeerDiscovery, {
attempt: current_attempt,
state: "seed",
});
match p2p.clone().seed().await {
Ok(()) => {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Seeding hosts successful."
);
}
Err(err) => {
error!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Network reseed failed: {}", err,
);
}
}
}
self.session().wakeup_slots().await;
self.wakeup_self.reset();
self.session().wakeup_slots().await;
// Give some time for new connections to be established
sleep(p2p.settings().outbound_peer_discovery_attempt_time).await;
}
}
async fn wait(&self) -> bool {
let wakeup_start = Instant::now();
self.wakeup_self.wait().await;
let wakeup_end = Instant::now();
let epsilon = Duration::from_millis(200);
let sleep_was_instant = wakeup_end - wakeup_start <= epsilon;
sleep_was_instant
}
fn notify(&self) {
self.wakeup_self.notify()
}

View File

@@ -61,6 +61,10 @@ pub struct Settings {
pub localnet: bool,
/// Delete a peer from hosts if they've been quarantined N times
pub hosts_quarantine_limit: usize,
/// Cooling off time for peer discovery when unsuccessful
pub outbound_peer_discovery_cooloff_time: u64,
/// Time between peer discovery attempts
pub outbound_peer_discovery_attempt_time: u64,
}
impl Default for Settings {
@@ -84,6 +88,8 @@ impl Default for Settings {
channel_heartbeat_interval: 10,
localnet: false,
hosts_quarantine_limit: 50,
outbound_peer_discovery_cooloff_time: 30,
outbound_peer_discovery_attempt_time: 5,
}
}
}
@@ -158,6 +164,12 @@ pub struct SettingsOpt {
#[structopt(skip)]
pub hosts_quarantine_limit: Option<usize>,
#[structopt(skip)]
pub outbound_peer_discovery_cooloff_time: Option<u64>,
#[structopt(skip)]
pub outbound_peer_discovery_attempt_time: Option<u64>,
}
impl From<SettingsOpt> for Settings {
@@ -181,6 +193,12 @@ impl From<SettingsOpt> for Settings {
channel_heartbeat_interval: opt.channel_heartbeat_interval.unwrap_or(10),
localnet: opt.localnet,
hosts_quarantine_limit: opt.hosts_quarantine_limit.unwrap_or(15),
outbound_peer_discovery_cooloff_time: opt
.outbound_peer_discovery_cooloff_time
.unwrap_or(30),
outbound_peer_discovery_attempt_time: opt
.outbound_peer_discovery_attempt_time
.unwrap_or(5),
}
}
}

View File

@@ -48,15 +48,22 @@ impl From<net::dnet::MessageInfo> for JsonValue {
}
#[cfg(feature = "net")]
impl From<net::dnet::OutboundConnecting> for JsonValue {
fn from(info: net::dnet::OutboundConnecting) -> JsonValue {
impl From<net::dnet::OutboundSlotSleeping> for JsonValue {
fn from(info: net::dnet::OutboundSlotSleeping) -> JsonValue {
json_map([("slot", JsonNum(info.slot.into()))])
}
}
#[cfg(feature = "net")]
impl From<net::dnet::OutboundSlotConnecting> for JsonValue {
fn from(info: net::dnet::OutboundSlotConnecting) -> JsonValue {
json_map([("slot", JsonNum(info.slot.into())), ("addr", JsonStr(info.addr.to_string()))])
}
}
#[cfg(feature = "net")]
impl From<net::dnet::OutboundConnected> for JsonValue {
fn from(info: net::dnet::OutboundConnected) -> JsonValue {
impl From<net::dnet::OutboundSlotConnected> for JsonValue {
fn from(info: net::dnet::OutboundSlotConnected) -> JsonValue {
json_map([
("slot", JsonNum(info.slot.into())),
("addr", JsonStr(info.addr.to_string())),
@@ -66,12 +73,22 @@ impl From<net::dnet::OutboundConnected> for JsonValue {
}
#[cfg(feature = "net")]
impl From<net::dnet::OutboundDisconnected> for JsonValue {
fn from(info: net::dnet::OutboundDisconnected) -> JsonValue {
impl From<net::dnet::OutboundSlotDisconnected> for JsonValue {
fn from(info: net::dnet::OutboundSlotDisconnected) -> JsonValue {
json_map([("slot", JsonNum(info.slot.into())), ("err", JsonStr(info.err))])
}
}
#[cfg(feature = "net")]
impl From<net::dnet::OutboundPeerDiscovery> for JsonValue {
fn from(info: net::dnet::OutboundPeerDiscovery) -> JsonValue {
json_map([
("attempt", JsonNum(info.attempt.into())),
("state", JsonStr(info.state.to_string())),
])
}
}
#[cfg(feature = "net")]
impl From<net::dnet::DnetEvent> for JsonValue {
fn from(event: net::dnet::DnetEvent) -> JsonValue {
@@ -82,14 +99,20 @@ impl From<net::dnet::DnetEvent> for JsonValue {
net::dnet::DnetEvent::RecvMessage(info) => {
json_map([("event", json_str("recv")), ("info", info.into())])
}
net::dnet::DnetEvent::OutboundConnecting(info) => {
json_map([("event", json_str("outbound_connecting")), ("info", info.into())])
net::dnet::DnetEvent::OutboundSlotSleeping(info) => {
json_map([("event", json_str("outbound_slot_sleeping")), ("info", info.into())])
}
net::dnet::DnetEvent::OutboundConnected(info) => {
json_map([("event", json_str("outbound_connected")), ("info", info.into())])
net::dnet::DnetEvent::OutboundSlotConnecting(info) => {
json_map([("event", json_str("outbound_slot_connecting")), ("info", info.into())])
}
net::dnet::DnetEvent::OutboundDisconnected(info) => {
json_map([("event", json_str("outbound_disconnected")), ("info", info.into())])
net::dnet::DnetEvent::OutboundSlotConnected(info) => {
json_map([("event", json_str("outbound_slot_connected")), ("info", info.into())])
}
net::dnet::DnetEvent::OutboundSlotDisconnected(info) => {
json_map([("event", json_str("outbound_slot_disconnected")), ("info", info.into())])
}
net::dnet::DnetEvent::OutboundPeerDiscovery(info) => {
json_map([("event", json_str("outbound_peer_discovery")), ("info", info.into())])
}
}
}

View File

@@ -84,12 +84,10 @@ impl<Parent> LazyWeak<Parent> {
assert!(self.0.get().is_none());
let parent = Arc::downgrade(&parent);
self.0.set(parent).unwrap();
assert!(self.0.get().is_some());
}
/// Access the `Arc<Parent>` pointer
pub fn upgrade(&self) -> Arc<Parent> {
assert!(self.0.get().is_some());
self.0.get().unwrap().upgrade().unwrap()
}
}