diff --git a/src/net/acceptor.rs b/src/net/acceptor.rs
index 54c430595..6c5e99782 100644
--- a/src/net/acceptor.rs
+++ b/src/net/acceptor.rs
@@ -16,7 +16,13 @@
* along with this program. If not, see .
*/
-use std::{io::ErrorKind, sync::Arc};
+use std::{
+ io::ErrorKind,
+ sync::{
+ atomic::{AtomicUsize, Ordering::SeqCst},
+ Arc,
+ },
+};
use log::error;
use smol::Executor;
@@ -28,7 +34,7 @@ use super::{
transport::{Listener, PtListener},
};
use crate::{
- system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription},
+ system::{CondVar, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription},
Error, Result,
};
@@ -40,6 +46,7 @@ pub struct Acceptor {
channel_subscriber: SubscriberPtr>,
task: StoppableTaskPtr,
session: SessionWeakPtr,
+ conn_count: AtomicUsize,
}
impl Acceptor {
@@ -49,6 +56,7 @@ impl Acceptor {
channel_subscriber: Subscriber::new(),
task: StoppableTask::new(),
session,
+ conn_count: AtomicUsize::new(0),
})
}
@@ -74,7 +82,7 @@ impl Acceptor {
fn accept(self: Arc, listener: Box, ex: Arc>) {
let self_ = self.clone();
self.task.clone().start(
- self.run_accept_loop(listener),
+ self.run_accept_loop(listener, ex.clone()),
|result| self_.handle_stop(result),
Error::NetworkServiceStopped,
ex,
@@ -82,12 +90,53 @@ impl Acceptor {
}
/// Run the accept loop.
- async fn run_accept_loop(self: Arc, listener: Box) -> Result<()> {
+ async fn run_accept_loop(
+ self: Arc,
+ listener: Box,
+ ex: Arc>,
+ ) -> Result<()> {
+ // CondVar used to notify the loop to recheck if new connections can
+ // be accepted by the listener.
+ let cv = Arc::new(CondVar::new());
+
loop {
+ // Refuse new connections if we're up to the connection limit
+ let limit = self.session.upgrade().unwrap().p2p().settings().inbound_connections;
+ if self.clone().conn_count.load(SeqCst) >= limit {
+ // This will get notified every time an inbound channel is stopped.
+ // These channels are the channels spawned below on listener.next().is_ok().
+ // After the notification, we reset the condvar and retry this loop to see
+ // if we can accept more connections, and if not - we'll be back here.
+ cv.wait().await;
+ cv.reset();
+ continue
+ }
+
+ // Now we wait for a new connection.
match listener.next().await {
Ok((stream, url)) => {
+ // Create the new Channel.
let session = self.session.clone();
let channel = Channel::new(stream, url, session).await;
+
+ // Increment the connection counter
+ self.conn_count.fetch_add(1, SeqCst);
+
+ // This task will subscribe on the new channel and decrement
+ // the connection counter. Along with that, it will notify
+ // the CondVar that might be waiting to allow new connections.
+ let self_ = self.clone();
+ let channel_ = channel.clone();
+ let cv_ = cv.clone();
+ ex.spawn(async move {
+ let stop_sub = channel_.subscribe_stop().await.unwrap();
+ stop_sub.receive().await;
+ self_.conn_count.fetch_sub(1, SeqCst);
+ cv_.notify();
+ })
+ .detach();
+
+ // Finally, notify any subscribers about the new channel.
self.channel_subscriber.notify(Ok(channel)).await;
}
diff --git a/src/net/settings.rs b/src/net/settings.rs
index 4b92da9cc..59aa6b2f1 100644
--- a/src/net/settings.rs
+++ b/src/net/settings.rs
@@ -49,6 +49,9 @@ pub struct Settings {
/// Outbound connection slots number, this many connections will be
/// attempted. (This does not include manual connections)
pub outbound_connections: usize,
+ /// Inbound connection slots number, this many active listening connections
+ /// will be allowed. (This does not include manual connections)
+ pub inbound_connections: usize,
/// Manual connections retry limit, 0 for forever looping
pub manual_attempt_limit: usize,
/// Outbound connection timeout (in seconds)
@@ -82,6 +85,7 @@ impl Default for Settings {
allowed_transports: vec![],
transport_mixing: true,
outbound_connections: 0,
+ inbound_connections: 0,
manual_attempt_limit: 0,
outbound_connect_timeout: 15,
channel_handshake_timeout: 10,
@@ -107,9 +111,13 @@ pub struct SettingsOpt {
pub inbound: Vec,
/// Outbound connection slots number
- #[structopt(long = "slots")]
+ #[structopt(long = "outbound-slots")]
pub outbound_connections: Option,
+ /// Inbound connection slots number
+ #[structopt(long = "inbound-slots")]
+ pub inbound_connections: Option,
+
/// P2P external addresses node advertises so other peers can
/// reach us and connect to us, as long as inbound addresses
/// are also configured
@@ -187,6 +195,7 @@ impl From for Settings {
allowed_transports: opt.allowed_transports,
transport_mixing: opt.transport_mixing.unwrap_or(false),
outbound_connections: opt.outbound_connections.unwrap_or(0),
+ inbound_connections: opt.inbound_connections.unwrap_or(0),
manual_attempt_limit: opt.manual_attempt_limit.unwrap_or(0),
outbound_connect_timeout: opt.outbound_connect_timeout.unwrap_or(15),
channel_handshake_timeout: opt.channel_handshake_timeout.unwrap_or(10),