From de6630a12197cad211633b6bb1578ca8ce9e7408 Mon Sep 17 00:00:00 2001 From: Ayush Date: Thu, 29 Dec 2022 18:50:17 +0530 Subject: [PATCH] feat(network): get peer info (#603) * Create struct PeerInfo * Add GetPeerInfo command * Minor changes * Use oneshot::sender * Add GetPeerInfoById * Use async/await * Add client_id to establish connection * Add method calls * Add test_get_peer * Add test_get_peer_by_id * fmt changes * clippy changes * chore: rustfmt * smol touch ups Co-authored-by: Matthias Seitz --- crates/net/network/src/manager.rs | 6 +++ crates/net/network/src/network.rs | 24 +++++++++ crates/net/network/src/session/handle.rs | 20 +++++++ crates/net/network/src/session/mod.rs | 33 +++++++++++- crates/net/network/tests/it/connect.rs | 69 ++++++++++++++++++++++++ 5 files changed, 151 insertions(+), 1 deletion(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 47a10c3b56..3cea4b1a08 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -475,6 +475,12 @@ where self.swarm.state_mut().update_fork_id(transition.current); } } + NetworkHandleMessage::GetPeerInfo(tx) => { + let _ = tx.send(self.swarm.sessions_mut().get_peer_info()); + } + NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => { + let _ = tx.send(self.swarm.sessions_mut().get_peer_info_by_id(peer_id)); + } } } } diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 30976efe97..4a41c56f4a 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -3,6 +3,7 @@ use crate::{ manager::NetworkEvent, message::PeerRequest, peers::{PeersHandle, ReputationChangeKind}, + session::PeerInfo, FetchClient, }; use parking_lot::Mutex; @@ -93,6 +94,25 @@ impl NetworkHandle { rx.await } + /// Returns [`PeerInfo`] for all connected peers + pub async fn get_peers(&self) -> Result, oneshot::error::RecvError> { + let (tx, rx) = oneshot::channel(); + let _ = self.manager().send(NetworkHandleMessage::GetPeerInfo(tx)); + rx.await + } + + /// Returns [`PeerInfo`] for a given peer. + /// + /// Returns `None` if there's no active session to the peer. + pub async fn get_peer_by_id( + &self, + peer_id: PeerId, + ) -> Result, oneshot::error::RecvError> { + let (tx, rx) = oneshot::channel(); + let _ = self.manager().send(NetworkHandleMessage::GetPeerInfoById(peer_id, tx)); + rx.await + } + /// Returns the mode of the network, either pow, or pos pub fn mode(&self) -> &NetworkMode { &self.inner.network_mode @@ -211,4 +231,8 @@ pub(crate) enum NetworkHandleMessage { FetchClient(oneshot::Sender), /// Apply a status update. StatusUpdate { height: u64, hash: H256, total_difficulty: U256 }, + /// Get PeerInfo fro all the peers + GetPeerInfo(oneshot::Sender>), + /// Get PeerInfo for a specific peer + GetPeerInfoById(PeerId, oneshot::Sender>), } diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index 2021afc3aa..38b6987107 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -47,6 +47,10 @@ pub(crate) struct ActiveSessionHandle { pub(crate) capabilities: Arc, /// Sender half of the command channel used send commands _to_ the spawned session pub(crate) commands_to_session: mpsc::Sender, + /// The client's name and version + pub(crate) client_version: String, + /// The address we're connected to + pub(crate) remote_addr: SocketAddr, } // === impl ActiveSessionHandle === @@ -59,6 +63,21 @@ impl ActiveSessionHandle { } } +#[derive(Debug)] +#[allow(unused)] +pub struct PeerInfo { + /// Announced capabilities of the peer + pub(crate) capabilities: Arc, + /// The identifier of the remote peer + pub(crate) remote_id: PeerId, + /// The client's name and version + pub(crate) client_version: String, + /// The address we're connected to + pub(crate) remote_addr: SocketAddr, + /// The direction of the session + pub(crate) direction: Direction, +} + /// Events a pending session can produce. /// /// This represents the state changes a session can undergo until it is ready to send capability messages . @@ -76,6 +95,7 @@ pub(crate) enum PendingSessionEvent { status: Status, conn: EthStream>>, direction: Direction, + client_id: String, }, /// Handshake unsuccessful, session was disconnected. Disconnected { diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index ce3277810a..2a2649e4b1 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -1,5 +1,4 @@ //! Support for handling peer sessions. -pub use crate::message::PeerRequestSender; use crate::{ message::PeerMessage, session::{ @@ -11,6 +10,7 @@ use crate::{ }, }, }; +pub use crate::{message::PeerRequestSender, session::handle::PeerInfo}; use fnv::FnvHashMap; use futures::{future::Either, io, FutureExt, StreamExt}; use reth_ecies::{stream::ECIESStream, ECIESError}; @@ -323,6 +323,7 @@ impl SessionManager { conn, status, direction, + client_id, } => { // move from pending to established. self.remove_pending_session(&session_id); @@ -383,6 +384,8 @@ impl SessionManager { established: Instant::now(), capabilities: Arc::clone(&capabilities), commands_to_session, + client_version: client_id, + remote_addr, }; self.active_sessions.insert(peer_id, handle); @@ -467,6 +470,33 @@ impl SessionManager { } } } + + /// Returns [`PeerInfo`] for all connected peers + pub(crate) fn get_peer_info(&self) -> Vec { + self.active_sessions + .values() + .map(|session| PeerInfo { + remote_id: session.remote_id, + direction: session.direction, + remote_addr: session.remote_addr, + capabilities: session.capabilities.clone(), + client_version: session.client_version.clone(), + }) + .collect() + } + + /// Returns [`PeerInfo`] for a given peer. + /// + /// Returns `None` if there's no active session to the peer. + pub(crate) fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option { + self.active_sessions.get(&peer_id).map(|session| PeerInfo { + remote_id: session.remote_id, + direction: session.direction, + remote_addr: session.remote_addr, + capabilities: session.capabilities.clone(), + client_version: session.client_version.clone(), + }) + } } /// Events produced by the [`SessionManager`] @@ -765,5 +795,6 @@ async fn authenticate_stream( status: their_status, conn: eth_stream, direction, + client_id: their_hello.client_version, } } diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 680e75a339..e1c0642f2d 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -132,6 +132,75 @@ async fn test_already_connected() { assert_eq!(handle1.num_connected_peers(), 1); } +#[tokio::test(flavor = "multi_thread")] +async fn test_get_peer() { + reth_tracing::init_tracing(); + let mut net = Testnet::default(); + + let secret_key = SecretKey::new(&mut rand::thread_rng()); + let secret_key_1 = SecretKey::new(&mut rand::thread_rng()); + let client = Arc::new(NoopProvider::default()); + let p1 = PeerConfig::default(); + let p2 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key); + let p3 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key_1); + + net.extend_peer_with_config(vec![p1, p2, p3]).await.unwrap(); + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + let handle2 = handles.next().unwrap(); + + drop(handles); + let _handle = net.spawn(); + + let mut listener0 = NetworkEventStream::new(handle0.event_listener()); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + let _ = listener0.next_session_established().await.unwrap(); + + handle0.add_peer(*handle2.peer_id(), handle2.local_addr()); + let _ = listener0.next_session_established().await.unwrap(); + + let peers = handle0.get_peers().await.unwrap(); + assert_eq!(handle0.num_connected_peers(), peers.len()); + dbg!(peers); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_get_peer_by_id() { + reth_tracing::init_tracing(); + let mut net = Testnet::default(); + + let secret_key = SecretKey::new(&mut rand::thread_rng()); + let secret_key_1 = SecretKey::new(&mut rand::thread_rng()); + let client = Arc::new(NoopProvider::default()); + let p1 = PeerConfig::default(); + let p2 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key); + let p3 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key_1); + + net.extend_peer_with_config(vec![p1, p2, p3]).await.unwrap(); + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + let handle2 = handles.next().unwrap(); + + drop(handles); + let _handle = net.spawn(); + + let mut listener0 = NetworkEventStream::new(handle0.event_listener()); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + let _ = listener0.next_session_established().await.unwrap(); + + let peer = handle0.get_peer_by_id(*handle1.peer_id()).await.unwrap(); + assert!(peer.is_some()); + + let peer = handle0.get_peer_by_id(*handle2.peer_id()).await.unwrap(); + assert!(peer.is_none()); +} + #[tokio::test(flavor = "multi_thread")] #[ignore] async fn test_connect_with_boot_nodes() {