From 7ebc91ceadbc42a9f45acdb2967d227549e147c1 Mon Sep 17 00:00:00 2001 From: narodnik Date: Wed, 24 Feb 2021 17:35:38 +0100 Subject: [PATCH] use a pending lock to prevent duplicate connections during setup phase --- run_network.sh | 3 ++- src/net/p2p.rs | 32 ++++++++++++++++++++-------- src/net/sessions/outbound_session.rs | 19 +++++++++++++++-- src/net/sessions/session.rs | 2 +- 4 files changed, 43 insertions(+), 13 deletions(-) diff --git a/run_network.sh b/run_network.sh index 0900087d4..dba4340ca 100755 --- a/run_network.sh +++ b/run_network.sh @@ -16,13 +16,14 @@ declare -a arr=( # Client with 1 outgoing connection "cargo run --bin dfi -- -r 9004 --seeds 127.0.0.1:9999 --slots 1 --log /tmp/darkfi/client2.log" # Server/client with 2 outgoing connections - "cargo run --bin dfi -- -r 9005 --accept 127.0.0.1:10002 --seeds 127.0.0.1:9999 --log /tmp/darkfi/server1.log" --slots 2 + "cargo run --bin dfi -- -r 9005 --accept 127.0.0.1:10002 --seeds 127.0.0.1:9999 --log /tmp/darkfi/server1.log --slots 2" ) for cmd in "${arr[@]}"; do { echo "Process \"$cmd\" started"; RUST_BACKTRACE=1 $cmd & pid=$! PID_LIST+=" $pid"; + sleep 1; } done trap "kill $PID_LIST" SIGINT diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 16a1ab734..32bc7b17a 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -1,7 +1,7 @@ use async_executor::Executor; use async_std::sync::Mutex; use log::*; -use std::collections::HashMap; +use std::collections::{HashSet, HashMap}; use std::net::SocketAddr; use std::sync::Arc; @@ -10,12 +10,14 @@ use crate::net::sessions::{InboundSession, OutboundSession, SeedSession}; use crate::net::{Channel, ChannelPtr, Hosts, HostsPtr, Settings, SettingsPtr}; use crate::system::{Subscriber, SubscriberPtr, Subscription}; -pub type Pending = Mutex>>; +pub type PendingChannels = Mutex>; +pub type ConnectedChannels = Mutex>>; pub type P2pPtr = Arc; pub struct P2p { - pending_channels: Pending, + pending: PendingChannels, + channels: ConnectedChannels, // Used internally stop_subscriber: SubscriberPtr, hosts: HostsPtr, @@ -26,7 +28,8 @@ impl P2p { pub fn new(settings: Settings) -> Arc { let settings = Arc::new(settings); Arc::new(Self { - pending_channels: Mutex::new(HashMap::new()), + pending: Mutex::new(HashSet::new()), + channels: Mutex::new(HashMap::new()), stop_subscriber: Subscriber::new(), hosts: Hosts::new(settings.clone()), settings, @@ -70,21 +73,32 @@ impl P2p { Ok(()) } - pub async fn store(self: Arc, channel: ChannelPtr) { - self.pending_channels + pub async fn store(&self, channel: ChannelPtr) { + self.channels .lock() .await .insert(channel.address(), channel); } - pub async fn remove(self: Arc, channel: ChannelPtr) { - self.pending_channels + pub async fn remove(&self, channel: ChannelPtr) { + self.channels .lock() .await .remove(&channel.address()); } + pub async fn exists(&self, addr: &SocketAddr) -> bool { + self.channels.lock().await.contains_key(addr) + } + + pub async fn add_pending(&self, addr: SocketAddr) -> bool { + self.pending.lock().await.insert(addr) + } + pub async fn remove_pending(&self, addr: &SocketAddr) { + self.pending.lock().await.remove(addr); + } + pub async fn connections_count(&self) -> usize { - self.pending_channels.lock().await.len() + self.channels.lock().await.len() } pub fn settings(&self) -> SettingsPtr { diff --git a/src/net/sessions/outbound_session.rs b/src/net/sessions/outbound_session.rs index 1ca442aa8..68b711cc1 100644 --- a/src/net/sessions/outbound_session.rs +++ b/src/net/sessions/outbound_session.rs @@ -76,6 +76,11 @@ impl OutboundSession { .register_channel(channel.clone(), executor.clone()) .await?; + // Channel is now connected but not yet setup + + // Remove pending lock since register_channel will add the channel to p2p + self.p2p().remove_pending(&addr).await; + self.clone() .attach_protocols(channel, executor.clone()) .await?; @@ -91,8 +96,9 @@ impl OutboundSession { } async fn load_address(&self, slot_number: u32) -> NetResult { - let hosts = self.p2p().hosts(); - let inbound_addr = self.p2p().settings().inbound; + let p2p = self.p2p(); + let hosts = p2p.hosts(); + let inbound_addr = p2p.settings().inbound; loop { let addr = hosts.load_single().await; @@ -110,6 +116,15 @@ impl OutboundSession { continue; } + if p2p.exists(&addr).await { + continue; + } + + // Obtain a lock on this address to prevent duplicate connections + if !p2p.add_pending(addr).await { + continue; + } + return Ok(addr); } } diff --git a/src/net/sessions/session.rs b/src/net/sessions/session.rs index a496d7050..c5a26c454 100644 --- a/src/net/sessions/session.rs +++ b/src/net/sessions/session.rs @@ -57,7 +57,7 @@ pub trait Session: Sync { // Channel is now initialized // Add channel to p2p - self.p2p().clone().store(channel.clone()).await; + self.p2p().store(channel.clone()).await; // Subscribe to stop, so can remove from p2p executor