From aac078a132f907fcaa7b5a9b35256ba22acd7f67 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 19 Dec 2022 18:03:10 +0100 Subject: [PATCH] fix(net): add already connected check (#521) * fix(net): add already connected check * fix: get events before adding peer --- crates/net/network/src/peers/manager.rs | 15 ++++++++ crates/net/network/src/session/mod.rs | 49 ++++++++++++++++++++++++- crates/net/network/src/state.rs | 2 +- crates/net/network/src/swarm.rs | 5 +++ crates/net/network/tests/it/connect.rs | 4 +- 5 files changed, 71 insertions(+), 4 deletions(-) diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index d22894cbea..003553e102 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -4,6 +4,7 @@ use crate::{ reputation::{is_banned_reputation, DEFAULT_REPUTATION}, ReputationChangeKind, ReputationChangeWeights, }, + session::Direction, }; use futures::StreamExt; use reth_eth_wire::{error::EthStreamError, DisconnectReason}; @@ -235,6 +236,20 @@ impl PeersManager { self.fill_outbound_slots(); } + /// Invoked if a session was disconnected because there's already a connection to the peer. + /// + /// If the session was an outgoing connection, this means that the peer initiated a connection + /// to us at the same time and this connection is already established. + pub(crate) fn on_already_connected(&mut self, direction: Direction) { + match direction { + Direction::Incoming => {} + Direction::Outgoing(_) => { + // need to decrement the outgoing counter + self.connection_info.decr_out(); + } + } + } + /// Called when a session to a peer was forcefully disconnected. /// /// Depending on whether the error is fatal, the peer will be removed from the peer set diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 366b25e905..94192c84a2 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -165,6 +165,14 @@ impl SessionManager { self.counter.ensure_pending_inbound()?; let session_id = self.next_id(); + + trace!( + target : "net::session", + ?remote_addr, + ?session_id, + "new pending incoming session" + ); + let (disconnect_tx, disconnect_rx) = oneshot::channel(); let pending_events = self.pending_sessions_tx.clone(); self.spawn(start_pending_incoming_session( @@ -308,6 +316,30 @@ impl SessionManager { // move from pending to established. self.remove_pending_session(&session_id); + // If there's already a session to the peer then we disconnect right away + if self.active_sessions.contains_key(&peer_id) { + trace!( + target : "net::session", + ?session_id, + ?remote_addr, + ?peer_id, + ?direction, + "already connected" + ); + + self.spawn(async move { + // send a disconnect message + let _ = + conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await; + }); + + return Poll::Ready(SessionEvent::AlreadyConnected { + peer_id, + remote_addr, + direction, + }) + } + let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer); let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer); @@ -427,6 +459,7 @@ impl SessionManager { } /// Events produced by the [`SessionManager`] +#[derive(Debug)] pub(crate) enum SessionEvent { /// A new session was successfully authenticated. /// @@ -439,6 +472,11 @@ pub(crate) enum SessionEvent { messages: PeerRequestSender, direction: Direction, }, + AlreadyConnected { + peer_id: PeerId, + remote_addr: SocketAddr, + direction: Direction, + }, /// A session received a valid message via RLPx. ValidMessage { peer_id: PeerId, @@ -470,7 +508,11 @@ pub(crate) enum SessionEvent { error: Option, }, /// Failed to establish a tcp stream - OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error }, + OutgoingConnectionError { + remote_addr: SocketAddr, + peer_id: PeerId, + error: io::Error, + }, /// Session was closed due to an error SessionClosedOnConnectionError { /// The id of the remote peer. @@ -481,7 +523,10 @@ pub(crate) enum SessionEvent { error: EthStreamError, }, /// Active session was gracefully disconnected. - Disconnected { peer_id: PeerId, remote_addr: SocketAddr }, + Disconnected { + peer_id: PeerId, + remote_addr: SocketAddr, + }, } /// Errors that can occur during handshaking/authenticating the underlying streams. diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 615182589b..94f07d69c0 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -137,7 +137,7 @@ where ); } - /// Event hook for a disconnected session for the peer. + /// Event hook for a disconnected session for the given peer. pub(crate) fn on_session_closed(&mut self, peer: PeerId) { self.active_peers.remove(&peer); self.state_fetcher.on_session_closed(&peer); diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 4577c36efe..9c751890b6 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -137,6 +137,11 @@ where direction, }) } + SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => { + trace!( target: "net", ?peer_id, ?remote_addr, ?direction, "already connected"); + self.state.peers_mut().on_already_connected(direction); + None + } SessionEvent::ValidMessage { peer_id, message } => { Some(SwarmEvent::ValidMessage { peer_id, message }) } diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 0ebbafd103..4e5683cd4a 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -154,6 +154,8 @@ async fn test_incoming_node_id_blacklist() { let network = NetworkManager::new(config).await.unwrap(); let handle = network.handle().clone(); + let events = handle.event_listener(); + tokio::task::spawn(network); // make geth connect to us @@ -162,12 +164,12 @@ async fn test_incoming_node_id_blacklist() { provider.add_peer(our_enode).await.unwrap(); - let events = handle.event_listener(); let mut event_stream = NetworkEventStream::new(events); // check for session to be opened let incoming_peer_id = event_stream.next_session_established().await.unwrap(); assert_eq!(incoming_peer_id, geth_peer_id); + // check to see that the session was closed let incoming_peer_id = event_stream.next_session_closed().await.unwrap(); assert_eq!(incoming_peer_id, geth_peer_id);