use a pending lock to prevent duplicate connections during setup phase

This commit is contained in:
narodnik
2021-02-24 17:35:38 +01:00
parent ee82ed9f70
commit 7ebc91cead
4 changed files with 43 additions and 13 deletions

View File

@@ -16,13 +16,14 @@ declare -a arr=(
# Client with 1 outgoing connection
"cargo run --bin dfi -- -r 9004 --seeds 127.0.0.1:9999 --slots 1 --log /tmp/darkfi/client2.log"
# Server/client with 2 outgoing connections
"cargo run --bin dfi -- -r 9005 --accept 127.0.0.1:10002 --seeds 127.0.0.1:9999 --log /tmp/darkfi/server1.log" --slots 2
"cargo run --bin dfi -- -r 9005 --accept 127.0.0.1:10002 --seeds 127.0.0.1:9999 --log /tmp/darkfi/server1.log --slots 2"
)
for cmd in "${arr[@]}"; do {
echo "Process \"$cmd\" started";
RUST_BACKTRACE=1 $cmd & pid=$!
PID_LIST+=" $pid";
sleep 1;
} done
trap "kill $PID_LIST" SIGINT

View File

@@ -1,7 +1,7 @@
use async_executor::Executor;
use async_std::sync::Mutex;
use log::*;
use std::collections::HashMap;
use std::collections::{HashSet, HashMap};
use std::net::SocketAddr;
use std::sync::Arc;
@@ -10,12 +10,14 @@ use crate::net::sessions::{InboundSession, OutboundSession, SeedSession};
use crate::net::{Channel, ChannelPtr, Hosts, HostsPtr, Settings, SettingsPtr};
use crate::system::{Subscriber, SubscriberPtr, Subscription};
pub type Pending<T> = Mutex<HashMap<SocketAddr, Arc<T>>>;
pub type PendingChannels = Mutex<HashSet<SocketAddr>>;
pub type ConnectedChannels<T> = Mutex<HashMap<SocketAddr, Arc<T>>>;
pub type P2pPtr = Arc<P2p>;
pub struct P2p {
pending_channels: Pending<Channel>,
pending: PendingChannels,
channels: ConnectedChannels<Channel>,
// Used internally
stop_subscriber: SubscriberPtr<NetError>,
hosts: HostsPtr,
@@ -26,7 +28,8 @@ impl P2p {
pub fn new(settings: Settings) -> Arc<Self> {
let settings = Arc::new(settings);
Arc::new(Self {
pending_channels: Mutex::new(HashMap::new()),
pending: Mutex::new(HashSet::new()),
channels: Mutex::new(HashMap::new()),
stop_subscriber: Subscriber::new(),
hosts: Hosts::new(settings.clone()),
settings,
@@ -70,21 +73,32 @@ impl P2p {
Ok(())
}
pub async fn store(self: Arc<Self>, channel: ChannelPtr) {
self.pending_channels
pub async fn store(&self, channel: ChannelPtr) {
self.channels
.lock()
.await
.insert(channel.address(), channel);
}
pub async fn remove(self: Arc<Self>, channel: ChannelPtr) {
self.pending_channels
pub async fn remove(&self, channel: ChannelPtr) {
self.channels
.lock()
.await
.remove(&channel.address());
}
pub async fn exists(&self, addr: &SocketAddr) -> bool {
self.channels.lock().await.contains_key(addr)
}
pub async fn add_pending(&self, addr: SocketAddr) -> bool {
self.pending.lock().await.insert(addr)
}
pub async fn remove_pending(&self, addr: &SocketAddr) {
self.pending.lock().await.remove(addr);
}
pub async fn connections_count(&self) -> usize {
self.pending_channels.lock().await.len()
self.channels.lock().await.len()
}
pub fn settings(&self) -> SettingsPtr {

View File

@@ -76,6 +76,11 @@ impl OutboundSession {
.register_channel(channel.clone(), executor.clone())
.await?;
// Channel is now connected but not yet setup
// Remove pending lock since register_channel will add the channel to p2p
self.p2p().remove_pending(&addr).await;
self.clone()
.attach_protocols(channel, executor.clone())
.await?;
@@ -91,8 +96,9 @@ impl OutboundSession {
}
async fn load_address(&self, slot_number: u32) -> NetResult<SocketAddr> {
let hosts = self.p2p().hosts();
let inbound_addr = self.p2p().settings().inbound;
let p2p = self.p2p();
let hosts = p2p.hosts();
let inbound_addr = p2p.settings().inbound;
loop {
let addr = hosts.load_single().await;
@@ -110,6 +116,15 @@ impl OutboundSession {
continue;
}
if p2p.exists(&addr).await {
continue;
}
// Obtain a lock on this address to prevent duplicate connections
if !p2p.add_pending(addr).await {
continue;
}
return Ok(addr);
}
}

View File

@@ -57,7 +57,7 @@ pub trait Session: Sync {
// Channel is now initialized
// Add channel to p2p
self.p2p().clone().store(channel.clone()).await;
self.p2p().store(channel.clone()).await;
// Subscribe to stop, so can remove from p2p
executor