session for manual connections

This commit is contained in:
narodnik
2022-01-05 18:32:13 +01:00
parent 8b37435806
commit 7ed58b0d2c
6 changed files with 122 additions and 26 deletions

View File

@@ -11,7 +11,7 @@ use crate::{
error::{Error, Result},
net::{
messages::Message,
sessions::{InboundSession, OutboundSession, SeedSession, ManualSession},
sessions::{InboundSession, ManualSession, OutboundSession, SeedSession},
Channel, ChannelPtr, Hosts, HostsPtr, Settings, SettingsPtr,
},
system::{Subscriber, SubscriberPtr, Subscription},
@@ -68,10 +68,8 @@ impl P2p {
debug!(target: "net", "P2p::run() [BEGIN]");
let manual = ManualSession::new(Arc::downgrade(&self));
manual.clone().start(executor.clone())?;
for peer in &self.settings.peers {
manual.clone().connect(peer);
manual.clone().connect(peer, executor.clone()).await;
}
let inbound = InboundSession::new(Arc::downgrade(&self));
@@ -85,6 +83,7 @@ impl P2p {
stop_sub.receive().await;
// Stop the sessions
manual.stop().await;
inbound.stop().await;
outbound.stop().await;

View File

@@ -5,9 +5,8 @@ use std::{
sync::{Arc, Weak},
};
use crate::error::{Error, Result};
//use crate::net::error::{Error, Result};
use crate::{
error::{Error, Result},
net::{
protocols::{ProtocolAddress, ProtocolPing},
sessions::Session,
@@ -39,7 +38,7 @@ impl InboundSession {
self.clone().start_accept_session(accept_addr, executor.clone())?;
}
None => {
info!("Not configured for accepting incoming connections.");
info!(target: "net", "Not configured for accepting incoming connections.");
return Ok(())
}
}
@@ -65,10 +64,10 @@ impl InboundSession {
accept_addr: SocketAddr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
info!("Starting inbound session on {}", accept_addr);
info!(target: "net", "Starting inbound session on {}", accept_addr);
let result = self.acceptor.clone().start(accept_addr, executor);
if let Err(err) = result.clone() {
error!("Error starting listener: {}", err);
error!(target: "net", "Error starting listener: {}", err);
}
result
}
@@ -93,7 +92,7 @@ impl InboundSession {
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
info!("Connected inbound [{}]", channel.address());
info!(target: "net", "Connected inbound [{}]", channel.address());
self.clone().register_channel(channel.clone(), executor.clone()).await?;

View File

@@ -1,38 +1,134 @@
use async_executor::Executor;
use async_std::sync::Mutex;
use log::*;
use std::{
net::SocketAddr,
sync::{Arc, Weak},
};
use crate::error::{Error, Result};
//use crate::net::error::{Error, Result};
use crate::{
error::{Error, Result},
net::{
protocols::{ProtocolAddress, ProtocolPing},
sessions::Session,
Acceptor, AcceptorPtr, ChannelPtr, P2p,
ChannelPtr, Connector, P2p,
},
system::{StoppableTask, StoppableTaskPtr},
};
pub struct ManualSession {
p2p: Weak<P2p>,
connect_slots: Mutex<Vec<StoppableTaskPtr>>,
}
impl ManualSession {
/// Create a new inbound session.
pub fn new(p2p: Weak<P2p>) -> Arc<Self> {
Arc::new(Self { p2p, })
Arc::new(Self { p2p, connect_slots: Mutex::new(Vec::new()) })
}
/// Starts the inbound session. Begins by accepting connections and fails if
/// the address is not configured. Then runs the channel subscription
/// loop.
pub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
/// Stop the outbound session.
pub async fn stop(&self) {
let connect_slots = &*self.connect_slots.lock().await;
for slot in connect_slots {
slot.stop().await;
}
}
pub async fn connect(self: Arc<Self>, addr: &SocketAddr, executor: Arc<Executor<'_>>) {
let task = StoppableTask::new();
task.clone().start(
self.clone().channel_connect_loop(addr.clone(), executor.clone()),
// Ignore stop handler
|_| async {},
Error::ServiceStopped,
executor.clone(),
);
self.connect_slots.lock().await.push(task);
}
pub async fn channel_connect_loop(
self: Arc<Self>,
addr: SocketAddr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let connector = Connector::new(self.p2p().settings());
let attempts = self.p2p().settings().manual_attempt_limit;
let mut remaining = attempts;
loop {
// Loop forever if attempts is 0
// Otherwise loop attempts number of times
remaining = if attempts == 0 { 1 } else { remaining - 1 };
if remaining == 0 {
break
}
self.p2p().add_pending(addr).await;
info!(target: "net", "Connecting to manual outbound [{}]", addr);
match connector.connect(addr).await {
Ok(channel) => {
// Blacklist goes here
info!(target: "net", "Connected to manual outbound [{}]", addr);
let stop_sub = channel.subscribe_stop().await;
self.clone().register_channel(channel.clone(), executor.clone()).await?;
// Channel is now connected but not yet setup
// Remove pending lock since register_channel will add the channel to p2p
self.p2p().remove_pending(&addr).await;
self.clone().attach_protocols(channel, executor.clone()).await?;
// Wait for channel to close
stop_sub.receive().await;
}
Err(err) => {
info!(target: "net", "Unable to connect to manual outbound [{}]: {}", addr, err);
}
}
}
warn!(
target: "net",
"Suspending manual connection to [{}] after {} failed attempts.",
addr,
attempts
);
Ok(())
}
pub fn connect(self: Arc<Self>, addr: &SocketAddr) {
/// Starts sending keep-alive and address messages across the channels.
async fn attach_protocols(
self: Arc<Self>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let settings = self.p2p().settings().clone();
let hosts = self.p2p().hosts();
let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone());
let protocol_addr = ProtocolAddress::new(channel, hosts).await;
protocol_ping.start(executor.clone()).await;
protocol_addr.start(executor).await;
Ok(())
}
}
impl Session for ManualSession {
fn p2p(&self) -> Arc<P2p> {
self.p2p.upgrade().unwrap()
}
}

View File

@@ -27,8 +27,8 @@ pub mod outbound_session;
/// channel and initializing the channel by performing a network handshake.
pub mod session;
pub use seed_session::SeedSession;
pub use manual_session::ManualSession;
pub use inbound_session::InboundSession;
pub use manual_session::ManualSession;
pub use outbound_session::OutboundSession;
pub use seed_session::SeedSession;
pub use session::Session;

View File

@@ -30,7 +30,7 @@ impl OutboundSession {
/// Start the outbound session. Runs the channel connect loop.
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
let slots_count = self.p2p().settings().outbound_connections;
info!("Starting {} outbound connection slots.", slots_count);
info!(target: "net", "Starting {} outbound connection slots.", slots_count);
// Activate mutex lock on connection slots.
let mut connect_slots = self.connect_slots.lock().await;
@@ -74,13 +74,13 @@ impl OutboundSession {
loop {
let addr = self.load_address(slot_number).await?;
info!("#{} connecting to outbound [{}]", slot_number, addr);
info!(target: "net", "#{} connecting to outbound [{}]", slot_number, addr);
match connector.connect(addr).await {
Ok(channel) => {
// Blacklist goes here
info!("#{} connected to outbound [{}]", slot_number, addr);
info!(target: "net", "#{} connected to outbound [{}]", slot_number, addr);
let stop_sub = channel.subscribe_stop().await;
@@ -97,7 +97,7 @@ impl OutboundSession {
stop_sub.receive().await;
}
Err(err) => {
info!("Unable to connect to outbound [{}]: {}", addr, err);
info!(target: "net", "Unable to connect to outbound [{}]: {}", addr, err);
}
}
}
@@ -117,7 +117,7 @@ impl OutboundSession {
let addr = hosts.load_single().await;
if addr.is_none() {
error!("Hosts address pool is empty. Closing connect slot #{}", slot_number);
error!(target: "net", "Hosts address pool is empty. Closing connect slot #{}", slot_number);
return Err(Error::ServiceStopped)
}
let addr = addr.unwrap();

View File

@@ -8,6 +8,7 @@ pub type SettingsPtr = Arc<Settings>;
pub struct Settings {
pub inbound: Option<SocketAddr>,
pub outbound_connections: u32,
pub manual_attempt_limit: u32,
pub seed_query_timeout_seconds: u32,
pub connect_timeout_seconds: u32,
@@ -24,6 +25,7 @@ impl Default for Settings {
Self {
inbound: None,
outbound_connections: 0,
manual_attempt_limit: 0,
seed_query_timeout_seconds: 8,
connect_timeout_seconds: 10,
channel_handshake_seconds: 4,