From a2ae02ad9f379f0ffc08959fcb8751ce7845e157 Mon Sep 17 00:00:00 2001 From: x Date: Thu, 24 Aug 2023 09:37:04 +0200 Subject: [PATCH] net: reorganize code of outbound session into individual separated slots --- src/net/hosts.rs | 1 + src/net/session/outbound_session.rs | 467 ++++++++++++++++------------ 2 files changed, 266 insertions(+), 202 deletions(-) diff --git a/src/net/hosts.rs b/src/net/hosts.rs index 2a55a752d..1ff08c9d4 100644 --- a/src/net/hosts.rs +++ b/src/net/hosts.rs @@ -220,6 +220,7 @@ impl Hosts { } /// Get all peers that match the given transport schemes from the hosts set. + /// TODO: add a limit: usize argument pub async fn fetch_with_schemes(&self, schemes: &[String]) -> Vec { let mut ret = vec![]; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index 51a1fc2c6..5bd2560de 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -53,28 +53,6 @@ use crate::{ pub type OutboundSessionPtr = Arc; -/// Connection state -#[derive(Eq, PartialEq, Copy, Clone, Debug)] -pub enum OutboundState { - Open, - Pending, - Connected, -} - -impl std::fmt::Display for OutboundState { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - f, - "{}", - match self { - Self::Open => "open", - Self::Pending => "pending", - Self::Connected => "connected", - } - ) - } -} - /// Defines outbound connections session. pub struct OutboundSession { /// Weak pointer to parent p2p object @@ -85,62 +63,110 @@ pub struct OutboundSession { channel_subscriber: SubscriberPtr>, /// Flag to toggle channel_subscriber notifications notify: Mutex, + + /// Outbound connection slots + slots: Mutex>>, } impl OutboundSession { /// Create a new outbound session. - pub fn new(p2p: Weak) -> OutboundSessionPtr { + pub(crate) fn new(p2p: Weak) -> OutboundSessionPtr { Arc::new(Self { p2p, connect_slots: Mutex::new(vec![]), channel_subscriber: Subscriber::new(), notify: Mutex::new(false), + slots: Mutex::new(Vec::new()), }) } /// Start the outbound session. Runs the channel connect loop. - pub async fn start(self: Arc) -> Result<()> { - let ex = self.p2p().executor(); + pub(crate) async fn start(self: Arc) -> Result<()> { let n_slots = self.p2p().settings().outbound_connections; info!(target: "net::outbound_session", "[P2P] Starting {} outbound connection slots.", n_slots); // Activate mutex lock on connection slots. - let mut connect_slots = self.connect_slots.lock().await; + let mut slots = self.slots.lock().await; + + let self_ = Arc::downgrade(&self); for i in 0..n_slots as u32 { - let task = StoppableTask::new(); - - task.clone().start( - self.clone().channel_connect_loop(i), - // Ignore stop handler - |_| async {}, - Error::NetworkServiceStopped, - ex.clone(), - ); - - connect_slots.push(task); + let slot = Slot::new(self_.clone(), i); + slot.clone().start().await; + slots.push(slot); } Ok(()) } /// Stops the outbound session. - pub async fn stop(&self) { + pub(crate) async fn stop(&self) { let connect_slots = &*self.connect_slots.lock().await; for slot in connect_slots { slot.stop().await; } } +} - /// Creates a connector object and tries to connect using it. - pub async fn channel_connect_loop(self: Arc, slot: u32) -> Result<()> { - let ex = self.p2p().executor(); - let parent = Arc::downgrade(&self); - let connector = Connector::new(self.p2p().settings(), Arc::new(parent)); +#[async_trait] +impl Session for OutboundSession { + fn p2p(&self) -> P2pPtr { + self.p2p.upgrade().unwrap() + } - // Retrieve whitelisted outbound transports - let transports = &self.p2p().settings().allowed_transports; + fn type_id(&self) -> SessionBitFlag { + SESSION_OUTBOUND + } +} +#[derive(PartialEq, Clone, Debug)] +pub enum SlotState { + // Before start() is called + Inactive, + Active, + Discovery, + Seed, + Sleep, +} + +pub struct Slot { + slot: u32, + process: Mutex, + state: Mutex, + session: Weak, +} + +impl Slot { + pub fn new(session: Weak, slot: u32) -> Arc { + Arc::new(Self { + slot, + process: Mutex::new(StoppableTask::new()), + state: Mutex::new(SlotState::Inactive), + session, + }) + } + + async fn start(self: Arc) { + // TODO: way too many clones, look into making this implicit. See implicit-clone crate + self.process.lock().await.clone().start( + self.clone()._run(), + // Ignore stop handler + |_| async {}, + Error::NetworkServiceStopped, + self.p2p().executor(), + ); + } + async fn stop(self: Arc) { + self.process.lock().await.stop().await + } + + // TODO: need to fix StoppableTask so it accepts arbitrary function signatures + async fn _run(self: Arc) -> Result<()> { + self.run().await; + Ok(()) + } + async fn run(self: Arc) { + assert_eq!(*self.state.lock().await, SlotState::Inactive); // This is the main outbound connection loop where we try to establish // a connection in the slot. The `try_connect` function will block in // case the connection was sucessfully established. If it fails, then @@ -153,27 +179,93 @@ impl OutboundSession { // signal and then exit. Once it exits, we'll run `try_connect` again // and attempt to fill the slot with another peer. loop { - match self.try_connect(slot, &connector, transports, ex.clone()).await { - Ok(()) => { - info!( - target: "net::outbound_session", - "[P2P] Outbound slot #{} disconnected", - slot - ); + // Activate the slot + *self.state.lock().await = SlotState::Active; + + /* + let mut addr = None; + + if !load_addr { + if self.session().exists_inactive_slot() { + *self.state.lock().await = SlotState::Sleep; + wait for wakeup + continue; } - Err(e) => { + + if !self.p2p().channels().lock.await.is_empty() { + *self.state.lock().await = SlotState::Discovery; + send get addr + wait for addr back + if load_addr { + return addr + } + } + + // Finally try seed phase + *self.state.lock().await = SlotState::Seed; + self.p2p().seed().await; + + if load_addr { + return addr + } else { + *self.state.lock().await = SlotState::Sleep; + wait for wakeup + continue + } + } + + match try_connect() { + ... + } + + */ + debug!( + target: "net::outbound_session::try_connect()", + "[P2P] Finding a host to connect to for outbound slot #{}", + self.slot, + ); + + // Retrieve whitelisted outbound transports + let transports = &self.p2p().settings().allowed_transports; + + // Find an address to connect to. We also do peer discovery here if needed. + let addr = if let Some(addr) = self.fetch_address_with_lock(transports).await { + addr + } else { + // Peer discovery + self.peer_discovery().await; + continue + }; + + let (addr_final, channel) = match self.try_connect(addr.clone()).await { + Ok((addr_final, channel)) => (addr_final, channel), + Err(err) => { error!( target: "net::outbound_session", "[P2P] Outbound slot #{} connection failed: {}", - slot, e, + self.slot, err, ); dnetev!(self, OutboundDisconnected, { - slot, - err: e.to_string() + slot: self.slot, + err: err.to_string() }); + continue } + }; + + let stop_sub = channel.subscribe_stop().await.expect("Channel should not be stopped"); + // Setup new channel + if let Err(err) = self.setup_channel(addr, addr_final, channel.clone()).await { + info!( + target: "net::outbound_session", + "[P2P] Outbound slot #{} disconnected: {}", + self.slot, err + ); + continue } + // Wait for channel to close + stop_sub.receive().await; } } @@ -184,162 +276,141 @@ impl OutboundSession { /// the list of pending channels, and starts sending messages across the /// channel. In case of any failures, a network error is returned and the /// main connect loop (parent of this function) will iterate again. - async fn try_connect( - &self, - slot: u32, - connector: &Connector, - transports: &[String], - ex: Arc>, - ) -> Result<()> { - debug!( - target: "net::outbound_session::try_connect()", - "[P2P] Finding a host to connect to for outbound slot #{}", - slot, - ); - - // Find an address to connect to. We also do peer discovery here if needed. - let addr = self.load_address(slot, transports).await?; + async fn try_connect(&self, addr: Url) -> Result<(Url, ChannelPtr)> { info!( target: "net::outbound_session::try_connect()", "[P2P] Connecting outbound slot #{} [{}]", - slot, addr, + self.slot, addr, ); dnetev!(self, OutboundConnecting, { - slot, + slot: self.slot, addr: addr.clone(), }); + let parent = Arc::downgrade(&self.session()); + let connector = Connector::new(self.p2p().settings(), Arc::new(parent)); + match connector.connect(&addr).await { - Ok((url, channel)) => { - info!( - target: "net::outbound_session::try_connect()", - "[P2P] Outbound slot #{} connected [{}]", - slot, url - ); - - dnetev!(self, OutboundConnected, { - slot, - addr: addr.clone(), - channel_id: channel.info.id - }); - - let stop_sub = - channel.subscribe_stop().await.expect("Channel should not be stopped"); - - // Register the new channel - self.register_channel(channel.clone(), ex.clone()).await?; - - // Channel is now connected but not yet setup - // Remove pending lock since register_channel will add the channel to p2p - self.p2p().remove_pending(&addr).await; - - // Notify that channel processing has been finished - if *self.notify.lock().await { - self.channel_subscriber.notify(Ok(channel)).await; - } - - // Wait for channel to close - stop_sub.receive().await; - return Ok(()) - } + Ok((addr_final, channel)) => Ok((addr_final, channel)), Err(e) => { error!( target: "net::outbound_session::try_connect()", "[P2P] Unable to connect outbound slot #{} [{}]: {}", - slot, addr, e + self.slot, addr, e ); + + // At this point we failed to connect. We'll quarantine this peer now. + self.p2p().hosts().quarantine(&addr).await; + + // Notify that channel processing failed + self.session().channel_subscriber.notify(Err(Error::ConnectFailed)).await; + + Err(Error::ConnectFailed) } } + } - // At this point we failed to connect. We'll quarantine this peer now. - self.p2p().hosts().quarantine(&addr).await; + async fn setup_channel(&self, addr: Url, addr_final: Url, channel: ChannelPtr) -> Result<()> { + info!( + target: "net::outbound_session::try_connect()", + "[P2P] Outbound slot #{} connected [{}]", + self.slot, addr_final + ); - // Notify that channel processing failed - if *self.notify.lock().await { - self.channel_subscriber.notify(Err(Error::ConnectFailed)).await; - } + dnetev!(self, OutboundConnected, { + slot: self.slot, + addr: addr_final.clone(), + channel_id: channel.info.id + }); - Err(Error::ConnectFailed) + // Register the new channel + self.session().register_channel(channel.clone(), self.p2p().executor()).await?; + + // Channel is now connected but not yet setup + // Remove pending lock since register_channel will add the channel to p2p + self.p2p().remove_pending(&addr).await; + + // Notify that channel processing has been finished + self.session().channel_subscriber.notify(Ok(channel)).await; + + Ok(()) } /// Loops through host addresses to find an outbound address that we can /// connect to. Check whether the address is valid by making sure it isn't /// our own inbound address, then checks whether it is already connected - /// (exists) or connecting (pending). If no address was found, we'll attempt - /// to do peer discovery and try to fill the slot again. - async fn load_address(&self, slot: u32, transports: &[String]) -> Result { - loop { - let p2p = self.p2p(); - let retry_sleep = p2p.settings().outbound_connect_timeout; + /// (exists) or connecting (pending). + /// Lastly adds matching address to the pending list. + /// TODO: this method should go in hosts + async fn fetch_address_with_lock(&self, transports: &[String]) -> Option { + let p2p = self.p2p(); - if *p2p.peer_discovery_running.lock().await { - debug!( - target: "net::outbound_session::load_address()", - "[P2P] #{} Peer discovery active, waiting {} seconds...", - slot, retry_sleep, - ); - sleep(retry_sleep).await; - } - - // Collect hosts - let mut hosts = HashSet::new(); - - // If transport mixing is enabled, then for example we're allowed to - // use tor:// to connect to tcp:// and tor+tls:// to connect to tcp+tls://. - // However, **do not** mix tor:// and tcp+tls://, nor tor+tls:// and tcp://. - let transport_mixing = self.p2p().settings().transport_mixing; - macro_rules! mix_transport { - ($a:expr, $b:expr) => { - if transports.contains(&$a.to_string()) && transport_mixing { - let mut a_to_b = p2p.hosts().fetch_with_schemes(&[$b.to_string()]).await; - for addr in a_to_b.iter_mut() { - addr.set_scheme($a).unwrap(); - hosts.insert(addr.clone()); - } - } - }; - } - mix_transport!("tor", "tcp"); - mix_transport!("tor+tls", "tcp+tls"); - mix_transport!("nym", "tcp"); - mix_transport!("nym+tls", "tcp+tls"); - - // And now the actual requested transports - for addr in p2p.hosts().fetch_with_schemes(transports).await { - hosts.insert(addr); - } - - // Try to find an unused host in the set. - for host in &hosts { - // Check if we already have this connection established - if p2p.exists(host).await { - continue - } - - // Obtain a lock on this address to prevent duplicate connection - if !p2p.add_pending(host).await { - continue - } - - return Ok(host.clone()) - } - - // We didn't find a host to connect to, let's try to find more peers. - info!( + // TODO: REMOVE !!!!! + let retry_sleep = p2p.settings().outbound_connect_timeout; + if *p2p.peer_discovery_running.lock().await { + debug!( target: "net::outbound_session::load_address()", - "[P2P] Outbound #{}: No peers found. Starting peer discovery...", - slot, + "[P2P] #{} Peer discovery active, waiting {} seconds...", + self.slot, retry_sleep, ); - // NOTE: A design decision here is to do a sleep inside peer_discovery() - // so that there's a certain period (outbound_connect_timeout) of time - // to send the GetAddr, receive Addrs, and sort things out. By sleeping - // inside peer_discovery, it will block here in the slot sessions, while - // other slots can keep trying to find hosts. This is also why we sleep - // in the beginning of this loop if peer discovery is currently active. - self.peer_discovery(slot).await; + sleep(retry_sleep).await; } + // TODO: REMOVE !!!!! + + // Collect hosts + let mut hosts = HashSet::new(); + + // If transport mixing is enabled, then for example we're allowed to + // use tor:// to connect to tcp:// and tor+tls:// to connect to tcp+tls://. + // However, **do not** mix tor:// and tcp+tls://, nor tor+tls:// and tcp://. + let transport_mixing = p2p.settings().transport_mixing; + macro_rules! mix_transport { + ($a:expr, $b:expr) => { + if transports.contains(&$a.to_string()) && transport_mixing { + let mut a_to_b = p2p.hosts().fetch_with_schemes(&[$b.to_string()]).await; + for addr in a_to_b.iter_mut() { + addr.set_scheme($a).unwrap(); + hosts.insert(addr.clone()); + } + } + }; + } + mix_transport!("tor", "tcp"); + mix_transport!("tor+tls", "tcp+tls"); + mix_transport!("nym", "tcp"); + mix_transport!("nym+tls", "tcp+tls"); + + // And now the actual requested transports + for addr in p2p.hosts().fetch_with_schemes(transports).await { + hosts.insert(addr); + } + + // TODO: randomize hosts list. Do not try to connect in a deterministic order. + // This is healthier for multiple slots to not compete for the same addrs. + + // Try to find an unused host in the set. + for host in &hosts { + // Check if we already have this connection established + if p2p.exists(host).await { + continue + } + + // Check if we already have this configured as a manual peer + if p2p.settings().peers.contains(host) { + continue + } + + // Obtain a lock on this address to prevent duplicate connection + if !p2p.add_pending(host).await { + continue + } + + return Some(host.clone()) + } + + None } /// Activate peer discovery if not active already. This will loop through all @@ -349,14 +420,14 @@ impl OutboundSession { /// This function will also sleep `Settings::outbound_connect_timeout` seconds /// after broadcasting in order to let the P2P stack receive and work through /// the addresses it is expecting. - async fn peer_discovery(&self, slot: u32) { + async fn peer_discovery(&self) { let p2p = self.p2p(); if *p2p.peer_discovery_running.lock().await { info!( target: "net::outbound_session::peer_discovery()", "[P2P] Outbound #{}: Peer discovery already active", - slot, + self.slot, ); return } @@ -364,7 +435,7 @@ impl OutboundSession { info!( target: "net::outbound_session::peer_discovery()", "[P2P] Outbound #{}: Started peer discovery", - slot, + self.slot, ); *p2p.peer_discovery_running.lock().await = true; @@ -375,7 +446,7 @@ impl OutboundSession { info!( target: "net::outbound_session::peer_discovery()", "[P2P] Outbound #{}: Broadcasting GetAddrs across active channels", - slot, + self.slot, ); p2p.broadcast(&get_addrs).await; } else { @@ -397,30 +468,22 @@ impl OutboundSession { debug!( target: "net::outbound_session::peer_discovery()", "[P2P] Outbound #{}: Sleeping {} seconds", - slot, p2p.settings().outbound_connect_timeout, + self.slot, p2p.settings().outbound_connect_timeout, ); sleep(p2p.settings().outbound_connect_timeout).await; *p2p.peer_discovery_running.lock().await = false; } - /// Enable channel_subscriber notifications. - pub async fn enable_notify(self: Arc) { - *self.notify.lock().await = true; + async fn populate_hosts() {} + + async fn wakeup(self: Arc) { + // wakey :) } - /// Disable channel_subscriber notifications. - pub async fn disable_notify(self: Arc) { - *self.notify.lock().await = false; + fn session(&self) -> OutboundSessionPtr { + self.session.upgrade().unwrap() } -} - -#[async_trait] -impl Session for OutboundSession { fn p2p(&self) -> P2pPtr { - self.p2p.upgrade().unwrap() - } - - fn type_id(&self) -> SessionBitFlag { - SESSION_OUTBOUND + self.session().p2p() } }