net: reorganize code of outbound session into individual separated slots

This commit is contained in:
x
2023-08-24 09:37:04 +02:00
parent 2e1bc56371
commit a2ae02ad9f
2 changed files with 266 additions and 202 deletions

View File

@@ -220,6 +220,7 @@ impl Hosts {
}
/// Get all peers that match the given transport schemes from the hosts set.
/// TODO: add a limit: usize argument
pub async fn fetch_with_schemes(&self, schemes: &[String]) -> Vec<Url> {
let mut ret = vec![];

View File

@@ -53,28 +53,6 @@ use crate::{
pub type OutboundSessionPtr = Arc<OutboundSession>;
/// Connection state
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum OutboundState {
Open,
Pending,
Connected,
}
impl std::fmt::Display for OutboundState {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Self::Open => "open",
Self::Pending => "pending",
Self::Connected => "connected",
}
)
}
}
/// Defines outbound connections session.
pub struct OutboundSession {
/// Weak pointer to parent p2p object
@@ -85,62 +63,110 @@ pub struct OutboundSession {
channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
/// Flag to toggle channel_subscriber notifications
notify: Mutex<bool>,
/// Outbound connection slots
slots: Mutex<Vec<Arc<Slot>>>,
}
impl OutboundSession {
/// Create a new outbound session.
pub fn new(p2p: Weak<P2p>) -> OutboundSessionPtr {
pub(crate) fn new(p2p: Weak<P2p>) -> OutboundSessionPtr {
Arc::new(Self {
p2p,
connect_slots: Mutex::new(vec![]),
channel_subscriber: Subscriber::new(),
notify: Mutex::new(false),
slots: Mutex::new(Vec::new()),
})
}
/// Start the outbound session. Runs the channel connect loop.
pub async fn start(self: Arc<Self>) -> Result<()> {
let ex = self.p2p().executor();
pub(crate) async fn start(self: Arc<Self>) -> Result<()> {
let n_slots = self.p2p().settings().outbound_connections;
info!(target: "net::outbound_session", "[P2P] Starting {} outbound connection slots.", n_slots);
// Activate mutex lock on connection slots.
let mut connect_slots = self.connect_slots.lock().await;
let mut slots = self.slots.lock().await;
let self_ = Arc::downgrade(&self);
for i in 0..n_slots as u32 {
let task = StoppableTask::new();
task.clone().start(
self.clone().channel_connect_loop(i),
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
ex.clone(),
);
connect_slots.push(task);
let slot = Slot::new(self_.clone(), i);
slot.clone().start().await;
slots.push(slot);
}
Ok(())
}
/// Stops the outbound session.
pub async fn stop(&self) {
pub(crate) async fn stop(&self) {
let connect_slots = &*self.connect_slots.lock().await;
for slot in connect_slots {
slot.stop().await;
}
}
}
/// Creates a connector object and tries to connect using it.
pub async fn channel_connect_loop(self: Arc<Self>, slot: u32) -> Result<()> {
let ex = self.p2p().executor();
let parent = Arc::downgrade(&self);
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));
#[async_trait]
impl Session for OutboundSession {
fn p2p(&self) -> P2pPtr {
self.p2p.upgrade().unwrap()
}
// Retrieve whitelisted outbound transports
let transports = &self.p2p().settings().allowed_transports;
fn type_id(&self) -> SessionBitFlag {
SESSION_OUTBOUND
}
}
#[derive(PartialEq, Clone, Debug)]
pub enum SlotState {
// Before start() is called
Inactive,
Active,
Discovery,
Seed,
Sleep,
}
pub struct Slot {
slot: u32,
process: Mutex<StoppableTaskPtr>,
state: Mutex<SlotState>,
session: Weak<OutboundSession>,
}
impl Slot {
pub fn new(session: Weak<OutboundSession>, slot: u32) -> Arc<Slot> {
Arc::new(Self {
slot,
process: Mutex::new(StoppableTask::new()),
state: Mutex::new(SlotState::Inactive),
session,
})
}
async fn start(self: Arc<Self>) {
// TODO: way too many clones, look into making this implicit. See implicit-clone crate
self.process.lock().await.clone().start(
self.clone()._run(),
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
self.p2p().executor(),
);
}
async fn stop(self: Arc<Self>) {
self.process.lock().await.stop().await
}
// TODO: need to fix StoppableTask so it accepts arbitrary function signatures
async fn _run(self: Arc<Self>) -> Result<()> {
self.run().await;
Ok(())
}
async fn run(self: Arc<Self>) {
assert_eq!(*self.state.lock().await, SlotState::Inactive);
// This is the main outbound connection loop where we try to establish
// a connection in the slot. The `try_connect` function will block in
// case the connection was sucessfully established. If it fails, then
@@ -153,27 +179,93 @@ impl OutboundSession {
// signal and then exit. Once it exits, we'll run `try_connect` again
// and attempt to fill the slot with another peer.
loop {
match self.try_connect(slot, &connector, transports, ex.clone()).await {
Ok(()) => {
info!(
target: "net::outbound_session",
"[P2P] Outbound slot #{} disconnected",
slot
);
// Activate the slot
*self.state.lock().await = SlotState::Active;
/*
let mut addr = None;
if !load_addr {
if self.session().exists_inactive_slot() {
*self.state.lock().await = SlotState::Sleep;
wait for wakeup
continue;
}
Err(e) => {
if !self.p2p().channels().lock.await.is_empty() {
*self.state.lock().await = SlotState::Discovery;
send get addr
wait for addr back
if load_addr {
return addr
}
}
// Finally try seed phase
*self.state.lock().await = SlotState::Seed;
self.p2p().seed().await;
if load_addr {
return addr
} else {
*self.state.lock().await = SlotState::Sleep;
wait for wakeup
continue
}
}
match try_connect() {
...
}
*/
debug!(
target: "net::outbound_session::try_connect()",
"[P2P] Finding a host to connect to for outbound slot #{}",
self.slot,
);
// Retrieve whitelisted outbound transports
let transports = &self.p2p().settings().allowed_transports;
// Find an address to connect to. We also do peer discovery here if needed.
let addr = if let Some(addr) = self.fetch_address_with_lock(transports).await {
addr
} else {
// Peer discovery
self.peer_discovery().await;
continue
};
let (addr_final, channel) = match self.try_connect(addr.clone()).await {
Ok((addr_final, channel)) => (addr_final, channel),
Err(err) => {
error!(
target: "net::outbound_session",
"[P2P] Outbound slot #{} connection failed: {}",
slot, e,
self.slot, err,
);
dnetev!(self, OutboundDisconnected, {
slot,
err: e.to_string()
slot: self.slot,
err: err.to_string()
});
continue
}
};
let stop_sub = channel.subscribe_stop().await.expect("Channel should not be stopped");
// Setup new channel
if let Err(err) = self.setup_channel(addr, addr_final, channel.clone()).await {
info!(
target: "net::outbound_session",
"[P2P] Outbound slot #{} disconnected: {}",
self.slot, err
);
continue
}
// Wait for channel to close
stop_sub.receive().await;
}
}
@@ -184,162 +276,141 @@ impl OutboundSession {
/// the list of pending channels, and starts sending messages across the
/// channel. In case of any failures, a network error is returned and the
/// main connect loop (parent of this function) will iterate again.
async fn try_connect(
&self,
slot: u32,
connector: &Connector,
transports: &[String],
ex: Arc<Executor<'_>>,
) -> Result<()> {
debug!(
target: "net::outbound_session::try_connect()",
"[P2P] Finding a host to connect to for outbound slot #{}",
slot,
);
// Find an address to connect to. We also do peer discovery here if needed.
let addr = self.load_address(slot, transports).await?;
async fn try_connect(&self, addr: Url) -> Result<(Url, ChannelPtr)> {
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Connecting outbound slot #{} [{}]",
slot, addr,
self.slot, addr,
);
dnetev!(self, OutboundConnecting, {
slot,
slot: self.slot,
addr: addr.clone(),
});
let parent = Arc::downgrade(&self.session());
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));
match connector.connect(&addr).await {
Ok((url, channel)) => {
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Outbound slot #{} connected [{}]",
slot, url
);
dnetev!(self, OutboundConnected, {
slot,
addr: addr.clone(),
channel_id: channel.info.id
});
let stop_sub =
channel.subscribe_stop().await.expect("Channel should not be stopped");
// Register the new channel
self.register_channel(channel.clone(), ex.clone()).await?;
// Channel is now connected but not yet setup
// Remove pending lock since register_channel will add the channel to p2p
self.p2p().remove_pending(&addr).await;
// Notify that channel processing has been finished
if *self.notify.lock().await {
self.channel_subscriber.notify(Ok(channel)).await;
}
// Wait for channel to close
stop_sub.receive().await;
return Ok(())
}
Ok((addr_final, channel)) => Ok((addr_final, channel)),
Err(e) => {
error!(
target: "net::outbound_session::try_connect()",
"[P2P] Unable to connect outbound slot #{} [{}]: {}",
slot, addr, e
self.slot, addr, e
);
// At this point we failed to connect. We'll quarantine this peer now.
self.p2p().hosts().quarantine(&addr).await;
// Notify that channel processing failed
self.session().channel_subscriber.notify(Err(Error::ConnectFailed)).await;
Err(Error::ConnectFailed)
}
}
}
// At this point we failed to connect. We'll quarantine this peer now.
self.p2p().hosts().quarantine(&addr).await;
async fn setup_channel(&self, addr: Url, addr_final: Url, channel: ChannelPtr) -> Result<()> {
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Outbound slot #{} connected [{}]",
self.slot, addr_final
);
// Notify that channel processing failed
if *self.notify.lock().await {
self.channel_subscriber.notify(Err(Error::ConnectFailed)).await;
}
dnetev!(self, OutboundConnected, {
slot: self.slot,
addr: addr_final.clone(),
channel_id: channel.info.id
});
Err(Error::ConnectFailed)
// Register the new channel
self.session().register_channel(channel.clone(), self.p2p().executor()).await?;
// Channel is now connected but not yet setup
// Remove pending lock since register_channel will add the channel to p2p
self.p2p().remove_pending(&addr).await;
// Notify that channel processing has been finished
self.session().channel_subscriber.notify(Ok(channel)).await;
Ok(())
}
/// Loops through host addresses to find an outbound address that we can
/// connect to. Check whether the address is valid by making sure it isn't
/// our own inbound address, then checks whether it is already connected
/// (exists) or connecting (pending). If no address was found, we'll attempt
/// to do peer discovery and try to fill the slot again.
async fn load_address(&self, slot: u32, transports: &[String]) -> Result<Url> {
loop {
let p2p = self.p2p();
let retry_sleep = p2p.settings().outbound_connect_timeout;
/// (exists) or connecting (pending).
/// Lastly adds matching address to the pending list.
/// TODO: this method should go in hosts
async fn fetch_address_with_lock(&self, transports: &[String]) -> Option<Url> {
let p2p = self.p2p();
if *p2p.peer_discovery_running.lock().await {
debug!(
target: "net::outbound_session::load_address()",
"[P2P] #{} Peer discovery active, waiting {} seconds...",
slot, retry_sleep,
);
sleep(retry_sleep).await;
}
// Collect hosts
let mut hosts = HashSet::new();
// If transport mixing is enabled, then for example we're allowed to
// use tor:// to connect to tcp:// and tor+tls:// to connect to tcp+tls://.
// However, **do not** mix tor:// and tcp+tls://, nor tor+tls:// and tcp://.
let transport_mixing = self.p2p().settings().transport_mixing;
macro_rules! mix_transport {
($a:expr, $b:expr) => {
if transports.contains(&$a.to_string()) && transport_mixing {
let mut a_to_b = p2p.hosts().fetch_with_schemes(&[$b.to_string()]).await;
for addr in a_to_b.iter_mut() {
addr.set_scheme($a).unwrap();
hosts.insert(addr.clone());
}
}
};
}
mix_transport!("tor", "tcp");
mix_transport!("tor+tls", "tcp+tls");
mix_transport!("nym", "tcp");
mix_transport!("nym+tls", "tcp+tls");
// And now the actual requested transports
for addr in p2p.hosts().fetch_with_schemes(transports).await {
hosts.insert(addr);
}
// Try to find an unused host in the set.
for host in &hosts {
// Check if we already have this connection established
if p2p.exists(host).await {
continue
}
// Obtain a lock on this address to prevent duplicate connection
if !p2p.add_pending(host).await {
continue
}
return Ok(host.clone())
}
// We didn't find a host to connect to, let's try to find more peers.
info!(
// TODO: REMOVE !!!!!
let retry_sleep = p2p.settings().outbound_connect_timeout;
if *p2p.peer_discovery_running.lock().await {
debug!(
target: "net::outbound_session::load_address()",
"[P2P] Outbound #{}: No peers found. Starting peer discovery...",
slot,
"[P2P] #{} Peer discovery active, waiting {} seconds...",
self.slot, retry_sleep,
);
// NOTE: A design decision here is to do a sleep inside peer_discovery()
// so that there's a certain period (outbound_connect_timeout) of time
// to send the GetAddr, receive Addrs, and sort things out. By sleeping
// inside peer_discovery, it will block here in the slot sessions, while
// other slots can keep trying to find hosts. This is also why we sleep
// in the beginning of this loop if peer discovery is currently active.
self.peer_discovery(slot).await;
sleep(retry_sleep).await;
}
// TODO: REMOVE !!!!!
// Collect hosts
let mut hosts = HashSet::new();
// If transport mixing is enabled, then for example we're allowed to
// use tor:// to connect to tcp:// and tor+tls:// to connect to tcp+tls://.
// However, **do not** mix tor:// and tcp+tls://, nor tor+tls:// and tcp://.
let transport_mixing = p2p.settings().transport_mixing;
macro_rules! mix_transport {
($a:expr, $b:expr) => {
if transports.contains(&$a.to_string()) && transport_mixing {
let mut a_to_b = p2p.hosts().fetch_with_schemes(&[$b.to_string()]).await;
for addr in a_to_b.iter_mut() {
addr.set_scheme($a).unwrap();
hosts.insert(addr.clone());
}
}
};
}
mix_transport!("tor", "tcp");
mix_transport!("tor+tls", "tcp+tls");
mix_transport!("nym", "tcp");
mix_transport!("nym+tls", "tcp+tls");
// And now the actual requested transports
for addr in p2p.hosts().fetch_with_schemes(transports).await {
hosts.insert(addr);
}
// TODO: randomize hosts list. Do not try to connect in a deterministic order.
// This is healthier for multiple slots to not compete for the same addrs.
// Try to find an unused host in the set.
for host in &hosts {
// Check if we already have this connection established
if p2p.exists(host).await {
continue
}
// Check if we already have this configured as a manual peer
if p2p.settings().peers.contains(host) {
continue
}
// Obtain a lock on this address to prevent duplicate connection
if !p2p.add_pending(host).await {
continue
}
return Some(host.clone())
}
None
}
/// Activate peer discovery if not active already. This will loop through all
@@ -349,14 +420,14 @@ impl OutboundSession {
/// This function will also sleep `Settings::outbound_connect_timeout` seconds
/// after broadcasting in order to let the P2P stack receive and work through
/// the addresses it is expecting.
async fn peer_discovery(&self, slot: u32) {
async fn peer_discovery(&self) {
let p2p = self.p2p();
if *p2p.peer_discovery_running.lock().await {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Outbound #{}: Peer discovery already active",
slot,
self.slot,
);
return
}
@@ -364,7 +435,7 @@ impl OutboundSession {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Outbound #{}: Started peer discovery",
slot,
self.slot,
);
*p2p.peer_discovery_running.lock().await = true;
@@ -375,7 +446,7 @@ impl OutboundSession {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Outbound #{}: Broadcasting GetAddrs across active channels",
slot,
self.slot,
);
p2p.broadcast(&get_addrs).await;
} else {
@@ -397,30 +468,22 @@ impl OutboundSession {
debug!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Outbound #{}: Sleeping {} seconds",
slot, p2p.settings().outbound_connect_timeout,
self.slot, p2p.settings().outbound_connect_timeout,
);
sleep(p2p.settings().outbound_connect_timeout).await;
*p2p.peer_discovery_running.lock().await = false;
}
/// Enable channel_subscriber notifications.
pub async fn enable_notify(self: Arc<Self>) {
*self.notify.lock().await = true;
async fn populate_hosts() {}
async fn wakeup(self: Arc<Self>) {
// wakey :)
}
/// Disable channel_subscriber notifications.
pub async fn disable_notify(self: Arc<Self>) {
*self.notify.lock().await = false;
fn session(&self) -> OutboundSessionPtr {
self.session.upgrade().unwrap()
}
}
#[async_trait]
impl Session for OutboundSession {
fn p2p(&self) -> P2pPtr {
self.p2p.upgrade().unwrap()
}
fn type_id(&self) -> SessionBitFlag {
SESSION_OUTBOUND
self.session().p2p()
}
}