From 99fd53bbee0eec89463021bebd9a8462f85d491e Mon Sep 17 00:00:00 2001 From: parazyd Date: Mon, 4 Sep 2023 14:13:25 +0200 Subject: [PATCH] net/acceptor: Implement an inbound connection limit and its semaphore handler. --- src/net/acceptor.rs | 57 +++++++++++++++++++++++++++++++++++++++++---- src/net/settings.rs | 11 ++++++++- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/src/net/acceptor.rs b/src/net/acceptor.rs index 54c430595..6c5e99782 100644 --- a/src/net/acceptor.rs +++ b/src/net/acceptor.rs @@ -16,7 +16,13 @@ * along with this program. If not, see . */ -use std::{io::ErrorKind, sync::Arc}; +use std::{ + io::ErrorKind, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }, +}; use log::error; use smol::Executor; @@ -28,7 +34,7 @@ use super::{ transport::{Listener, PtListener}, }; use crate::{ - system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription}, + system::{CondVar, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription}, Error, Result, }; @@ -40,6 +46,7 @@ pub struct Acceptor { channel_subscriber: SubscriberPtr>, task: StoppableTaskPtr, session: SessionWeakPtr, + conn_count: AtomicUsize, } impl Acceptor { @@ -49,6 +56,7 @@ impl Acceptor { channel_subscriber: Subscriber::new(), task: StoppableTask::new(), session, + conn_count: AtomicUsize::new(0), }) } @@ -74,7 +82,7 @@ impl Acceptor { fn accept(self: Arc, listener: Box, ex: Arc>) { let self_ = self.clone(); self.task.clone().start( - self.run_accept_loop(listener), + self.run_accept_loop(listener, ex.clone()), |result| self_.handle_stop(result), Error::NetworkServiceStopped, ex, @@ -82,12 +90,53 @@ impl Acceptor { } /// Run the accept loop. - async fn run_accept_loop(self: Arc, listener: Box) -> Result<()> { + async fn run_accept_loop( + self: Arc, + listener: Box, + ex: Arc>, + ) -> Result<()> { + // CondVar used to notify the loop to recheck if new connections can + // be accepted by the listener. + let cv = Arc::new(CondVar::new()); + loop { + // Refuse new connections if we're up to the connection limit + let limit = self.session.upgrade().unwrap().p2p().settings().inbound_connections; + if self.clone().conn_count.load(SeqCst) >= limit { + // This will get notified every time an inbound channel is stopped. + // These channels are the channels spawned below on listener.next().is_ok(). + // After the notification, we reset the condvar and retry this loop to see + // if we can accept more connections, and if not - we'll be back here. + cv.wait().await; + cv.reset(); + continue + } + + // Now we wait for a new connection. match listener.next().await { Ok((stream, url)) => { + // Create the new Channel. let session = self.session.clone(); let channel = Channel::new(stream, url, session).await; + + // Increment the connection counter + self.conn_count.fetch_add(1, SeqCst); + + // This task will subscribe on the new channel and decrement + // the connection counter. Along with that, it will notify + // the CondVar that might be waiting to allow new connections. + let self_ = self.clone(); + let channel_ = channel.clone(); + let cv_ = cv.clone(); + ex.spawn(async move { + let stop_sub = channel_.subscribe_stop().await.unwrap(); + stop_sub.receive().await; + self_.conn_count.fetch_sub(1, SeqCst); + cv_.notify(); + }) + .detach(); + + // Finally, notify any subscribers about the new channel. self.channel_subscriber.notify(Ok(channel)).await; } diff --git a/src/net/settings.rs b/src/net/settings.rs index 4b92da9cc..59aa6b2f1 100644 --- a/src/net/settings.rs +++ b/src/net/settings.rs @@ -49,6 +49,9 @@ pub struct Settings { /// Outbound connection slots number, this many connections will be /// attempted. (This does not include manual connections) pub outbound_connections: usize, + /// Inbound connection slots number, this many active listening connections + /// will be allowed. (This does not include manual connections) + pub inbound_connections: usize, /// Manual connections retry limit, 0 for forever looping pub manual_attempt_limit: usize, /// Outbound connection timeout (in seconds) @@ -82,6 +85,7 @@ impl Default for Settings { allowed_transports: vec![], transport_mixing: true, outbound_connections: 0, + inbound_connections: 0, manual_attempt_limit: 0, outbound_connect_timeout: 15, channel_handshake_timeout: 10, @@ -107,9 +111,13 @@ pub struct SettingsOpt { pub inbound: Vec, /// Outbound connection slots number - #[structopt(long = "slots")] + #[structopt(long = "outbound-slots")] pub outbound_connections: Option, + /// Inbound connection slots number + #[structopt(long = "inbound-slots")] + pub inbound_connections: Option, + /// P2P external addresses node advertises so other peers can /// reach us and connect to us, as long as inbound addresses /// are also configured @@ -187,6 +195,7 @@ impl From for Settings { allowed_transports: opt.allowed_transports, transport_mixing: opt.transport_mixing.unwrap_or(false), outbound_connections: opt.outbound_connections.unwrap_or(0), + inbound_connections: opt.inbound_connections.unwrap_or(0), manual_attempt_limit: opt.manual_attempt_limit.unwrap_or(0), outbound_connect_timeout: opt.outbound_connect_timeout.unwrap_or(15), channel_handshake_timeout: opt.channel_handshake_timeout.unwrap_or(10),