fix(net): add already connected check (#521)

* fix(net): add already connected check

* fix: get events before adding peer
This commit is contained in:
Matthias Seitz
2022-12-19 18:03:10 +01:00
committed by GitHub
parent 5758c297b3
commit aac078a132
5 changed files with 71 additions and 4 deletions

View File

@@ -4,6 +4,7 @@ use crate::{
reputation::{is_banned_reputation, DEFAULT_REPUTATION},
ReputationChangeKind, ReputationChangeWeights,
},
session::Direction,
};
use futures::StreamExt;
use reth_eth_wire::{error::EthStreamError, DisconnectReason};
@@ -235,6 +236,20 @@ impl PeersManager {
self.fill_outbound_slots();
}
/// Invoked if a session was disconnected because there's already a connection to the peer.
///
/// If the session was an outgoing connection, this means that the peer initiated a connection
/// to us at the same time and this connection is already established.
pub(crate) fn on_already_connected(&mut self, direction: Direction) {
match direction {
Direction::Incoming => {}
Direction::Outgoing(_) => {
// need to decrement the outgoing counter
self.connection_info.decr_out();
}
}
}
/// Called when a session to a peer was forcefully disconnected.
///
/// Depending on whether the error is fatal, the peer will be removed from the peer set

View File

@@ -165,6 +165,14 @@ impl SessionManager {
self.counter.ensure_pending_inbound()?;
let session_id = self.next_id();
trace!(
target : "net::session",
?remote_addr,
?session_id,
"new pending incoming session"
);
let (disconnect_tx, disconnect_rx) = oneshot::channel();
let pending_events = self.pending_sessions_tx.clone();
self.spawn(start_pending_incoming_session(
@@ -308,6 +316,30 @@ impl SessionManager {
// move from pending to established.
self.remove_pending_session(&session_id);
// If there's already a session to the peer then we disconnect right away
if self.active_sessions.contains_key(&peer_id) {
trace!(
target : "net::session",
?session_id,
?remote_addr,
?peer_id,
?direction,
"already connected"
);
self.spawn(async move {
// send a disconnect message
let _ =
conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
});
return Poll::Ready(SessionEvent::AlreadyConnected {
peer_id,
remote_addr,
direction,
})
}
let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer);
let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
@@ -427,6 +459,7 @@ impl SessionManager {
}
/// Events produced by the [`SessionManager`]
#[derive(Debug)]
pub(crate) enum SessionEvent {
/// A new session was successfully authenticated.
///
@@ -439,6 +472,11 @@ pub(crate) enum SessionEvent {
messages: PeerRequestSender,
direction: Direction,
},
AlreadyConnected {
peer_id: PeerId,
remote_addr: SocketAddr,
direction: Direction,
},
/// A session received a valid message via RLPx.
ValidMessage {
peer_id: PeerId,
@@ -470,7 +508,11 @@ pub(crate) enum SessionEvent {
error: Option<PendingSessionHandshakeError>,
},
/// Failed to establish a tcp stream
OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
OutgoingConnectionError {
remote_addr: SocketAddr,
peer_id: PeerId,
error: io::Error,
},
/// Session was closed due to an error
SessionClosedOnConnectionError {
/// The id of the remote peer.
@@ -481,7 +523,10 @@ pub(crate) enum SessionEvent {
error: EthStreamError,
},
/// Active session was gracefully disconnected.
Disconnected { peer_id: PeerId, remote_addr: SocketAddr },
Disconnected {
peer_id: PeerId,
remote_addr: SocketAddr,
},
}
/// Errors that can occur during handshaking/authenticating the underlying streams.

View File

@@ -137,7 +137,7 @@ where
);
}
/// Event hook for a disconnected session for the peer.
/// Event hook for a disconnected session for the given peer.
pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
self.active_peers.remove(&peer);
self.state_fetcher.on_session_closed(&peer);

View File

@@ -137,6 +137,11 @@ where
direction,
})
}
SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
trace!( target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
self.state.peers_mut().on_already_connected(direction);
None
}
SessionEvent::ValidMessage { peer_id, message } => {
Some(SwarmEvent::ValidMessage { peer_id, message })
}

View File

@@ -154,6 +154,8 @@ async fn test_incoming_node_id_blacklist() {
let network = NetworkManager::new(config).await.unwrap();
let handle = network.handle().clone();
let events = handle.event_listener();
tokio::task::spawn(network);
// make geth connect to us
@@ -162,12 +164,12 @@ async fn test_incoming_node_id_blacklist() {
provider.add_peer(our_enode).await.unwrap();
let events = handle.event_listener();
let mut event_stream = NetworkEventStream::new(events);
// check for session to be opened
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
// check to see that the session was closed
let incoming_peer_id = event_stream.next_session_closed().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);