net/acceptor: Implement an inbound connection limit and its semaphore handler.

This commit is contained in:
parazyd
2023-09-04 14:13:25 +02:00
parent be70993c22
commit 99fd53bbee
2 changed files with 63 additions and 5 deletions

View File

@@ -16,7 +16,13 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{io::ErrorKind, sync::Arc};
use std::{
io::ErrorKind,
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc,
},
};
use log::error;
use smol::Executor;
@@ -28,7 +34,7 @@ use super::{
transport::{Listener, PtListener},
};
use crate::{
system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription},
system::{CondVar, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription},
Error, Result,
};
@@ -40,6 +46,7 @@ pub struct Acceptor {
channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
task: StoppableTaskPtr,
session: SessionWeakPtr,
conn_count: AtomicUsize,
}
impl Acceptor {
@@ -49,6 +56,7 @@ impl Acceptor {
channel_subscriber: Subscriber::new(),
task: StoppableTask::new(),
session,
conn_count: AtomicUsize::new(0),
})
}
@@ -74,7 +82,7 @@ impl Acceptor {
fn accept(self: Arc<Self>, listener: Box<dyn PtListener>, ex: Arc<Executor<'_>>) {
let self_ = self.clone();
self.task.clone().start(
self.run_accept_loop(listener),
self.run_accept_loop(listener, ex.clone()),
|result| self_.handle_stop(result),
Error::NetworkServiceStopped,
ex,
@@ -82,12 +90,53 @@ impl Acceptor {
}
/// Run the accept loop.
async fn run_accept_loop(self: Arc<Self>, listener: Box<dyn PtListener>) -> Result<()> {
async fn run_accept_loop(
self: Arc<Self>,
listener: Box<dyn PtListener>,
ex: Arc<Executor<'_>>,
) -> Result<()> {
// CondVar used to notify the loop to recheck if new connections can
// be accepted by the listener.
let cv = Arc::new(CondVar::new());
loop {
// Refuse new connections if we're up to the connection limit
let limit = self.session.upgrade().unwrap().p2p().settings().inbound_connections;
if self.clone().conn_count.load(SeqCst) >= limit {
// This will get notified every time an inbound channel is stopped.
// These channels are the channels spawned below on listener.next().is_ok().
// After the notification, we reset the condvar and retry this loop to see
// if we can accept more connections, and if not - we'll be back here.
cv.wait().await;
cv.reset();
continue
}
// Now we wait for a new connection.
match listener.next().await {
Ok((stream, url)) => {
// Create the new Channel.
let session = self.session.clone();
let channel = Channel::new(stream, url, session).await;
// Increment the connection counter
self.conn_count.fetch_add(1, SeqCst);
// This task will subscribe on the new channel and decrement
// the connection counter. Along with that, it will notify
// the CondVar that might be waiting to allow new connections.
let self_ = self.clone();
let channel_ = channel.clone();
let cv_ = cv.clone();
ex.spawn(async move {
let stop_sub = channel_.subscribe_stop().await.unwrap();
stop_sub.receive().await;
self_.conn_count.fetch_sub(1, SeqCst);
cv_.notify();
})
.detach();
// Finally, notify any subscribers about the new channel.
self.channel_subscriber.notify(Ok(channel)).await;
}

View File

@@ -49,6 +49,9 @@ pub struct Settings {
/// Outbound connection slots number, this many connections will be
/// attempted. (This does not include manual connections)
pub outbound_connections: usize,
/// Inbound connection slots number, this many active listening connections
/// will be allowed. (This does not include manual connections)
pub inbound_connections: usize,
/// Manual connections retry limit, 0 for forever looping
pub manual_attempt_limit: usize,
/// Outbound connection timeout (in seconds)
@@ -82,6 +85,7 @@ impl Default for Settings {
allowed_transports: vec![],
transport_mixing: true,
outbound_connections: 0,
inbound_connections: 0,
manual_attempt_limit: 0,
outbound_connect_timeout: 15,
channel_handshake_timeout: 10,
@@ -107,9 +111,13 @@ pub struct SettingsOpt {
pub inbound: Vec<Url>,
/// Outbound connection slots number
#[structopt(long = "slots")]
#[structopt(long = "outbound-slots")]
pub outbound_connections: Option<usize>,
/// Inbound connection slots number
#[structopt(long = "inbound-slots")]
pub inbound_connections: Option<usize>,
/// P2P external addresses node advertises so other peers can
/// reach us and connect to us, as long as inbound addresses
/// are also configured
@@ -187,6 +195,7 @@ impl From<SettingsOpt> for Settings {
allowed_transports: opt.allowed_transports,
transport_mixing: opt.transport_mixing.unwrap_or(false),
outbound_connections: opt.outbound_connections.unwrap_or(0),
inbound_connections: opt.inbound_connections.unwrap_or(0),
manual_attempt_limit: opt.manual_attempt_limit.unwrap_or(0),
outbound_connect_timeout: opt.outbound_connect_timeout.unwrap_or(15),
channel_handshake_timeout: opt.channel_handshake_timeout.unwrap_or(10),