feat(network): get peer info (#603)

* Create struct PeerInfo

* Add GetPeerInfo command

* Minor changes

* Use oneshot::sender

* Add GetPeerInfoById

* Use async/await

* Add client_id to establish connection

* Add method calls

* Add test_get_peer

* Add test_get_peer_by_id

* fmt changes

* clippy changes

* chore: rustfmt

* smol touch ups

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Ayush
2022-12-29 18:50:17 +05:30
committed by GitHub
parent 208118caa4
commit de6630a121
5 changed files with 151 additions and 1 deletions

View File

@@ -475,6 +475,12 @@ where
self.swarm.state_mut().update_fork_id(transition.current);
}
}
NetworkHandleMessage::GetPeerInfo(tx) => {
let _ = tx.send(self.swarm.sessions_mut().get_peer_info());
}
NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
let _ = tx.send(self.swarm.sessions_mut().get_peer_info_by_id(peer_id));
}
}
}
}

View File

@@ -3,6 +3,7 @@ use crate::{
manager::NetworkEvent,
message::PeerRequest,
peers::{PeersHandle, ReputationChangeKind},
session::PeerInfo,
FetchClient,
};
use parking_lot::Mutex;
@@ -93,6 +94,25 @@ impl NetworkHandle {
rx.await
}
/// Returns [`PeerInfo`] for all connected peers
pub async fn get_peers(&self) -> Result<Vec<PeerInfo>, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetPeerInfo(tx));
rx.await
}
/// Returns [`PeerInfo`] for a given peer.
///
/// Returns `None` if there's no active session to the peer.
pub async fn get_peer_by_id(
&self,
peer_id: PeerId,
) -> Result<Option<PeerInfo>, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetPeerInfoById(peer_id, tx));
rx.await
}
/// Returns the mode of the network, either pow, or pos
pub fn mode(&self) -> &NetworkMode {
&self.inner.network_mode
@@ -211,4 +231,8 @@ pub(crate) enum NetworkHandleMessage {
FetchClient(oneshot::Sender<FetchClient>),
/// Apply a status update.
StatusUpdate { height: u64, hash: H256, total_difficulty: U256 },
/// Get PeerInfo fro all the peers
GetPeerInfo(oneshot::Sender<Vec<PeerInfo>>),
/// Get PeerInfo for a specific peer
GetPeerInfoById(PeerId, oneshot::Sender<Option<PeerInfo>>),
}

View File

@@ -47,6 +47,10 @@ pub(crate) struct ActiveSessionHandle {
pub(crate) capabilities: Arc<Capabilities>,
/// Sender half of the command channel used send commands _to_ the spawned session
pub(crate) commands_to_session: mpsc::Sender<SessionCommand>,
/// The client's name and version
pub(crate) client_version: String,
/// The address we're connected to
pub(crate) remote_addr: SocketAddr,
}
// === impl ActiveSessionHandle ===
@@ -59,6 +63,21 @@ impl ActiveSessionHandle {
}
}
#[derive(Debug)]
#[allow(unused)]
pub struct PeerInfo {
/// Announced capabilities of the peer
pub(crate) capabilities: Arc<Capabilities>,
/// The identifier of the remote peer
pub(crate) remote_id: PeerId,
/// The client's name and version
pub(crate) client_version: String,
/// The address we're connected to
pub(crate) remote_addr: SocketAddr,
/// The direction of the session
pub(crate) direction: Direction,
}
/// Events a pending session can produce.
///
/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
@@ -76,6 +95,7 @@ pub(crate) enum PendingSessionEvent {
status: Status,
conn: EthStream<P2PStream<ECIESStream<TcpStream>>>,
direction: Direction,
client_id: String,
},
/// Handshake unsuccessful, session was disconnected.
Disconnected {

View File

@@ -1,5 +1,4 @@
//! Support for handling peer sessions.
pub use crate::message::PeerRequestSender;
use crate::{
message::PeerMessage,
session::{
@@ -11,6 +10,7 @@ use crate::{
},
},
};
pub use crate::{message::PeerRequestSender, session::handle::PeerInfo};
use fnv::FnvHashMap;
use futures::{future::Either, io, FutureExt, StreamExt};
use reth_ecies::{stream::ECIESStream, ECIESError};
@@ -323,6 +323,7 @@ impl SessionManager {
conn,
status,
direction,
client_id,
} => {
// move from pending to established.
self.remove_pending_session(&session_id);
@@ -383,6 +384,8 @@ impl SessionManager {
established: Instant::now(),
capabilities: Arc::clone(&capabilities),
commands_to_session,
client_version: client_id,
remote_addr,
};
self.active_sessions.insert(peer_id, handle);
@@ -467,6 +470,33 @@ impl SessionManager {
}
}
}
/// Returns [`PeerInfo`] for all connected peers
pub(crate) fn get_peer_info(&self) -> Vec<PeerInfo> {
self.active_sessions
.values()
.map(|session| PeerInfo {
remote_id: session.remote_id,
direction: session.direction,
remote_addr: session.remote_addr,
capabilities: session.capabilities.clone(),
client_version: session.client_version.clone(),
})
.collect()
}
/// Returns [`PeerInfo`] for a given peer.
///
/// Returns `None` if there's no active session to the peer.
pub(crate) fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
self.active_sessions.get(&peer_id).map(|session| PeerInfo {
remote_id: session.remote_id,
direction: session.direction,
remote_addr: session.remote_addr,
capabilities: session.capabilities.clone(),
client_version: session.client_version.clone(),
})
}
}
/// Events produced by the [`SessionManager`]
@@ -765,5 +795,6 @@ async fn authenticate_stream(
status: their_status,
conn: eth_stream,
direction,
client_id: their_hello.client_version,
}
}

View File

@@ -132,6 +132,75 @@ async fn test_already_connected() {
assert_eq!(handle1.num_connected_peers(), 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_get_peer() {
reth_tracing::init_tracing();
let mut net = Testnet::default();
let secret_key = SecretKey::new(&mut rand::thread_rng());
let secret_key_1 = SecretKey::new(&mut rand::thread_rng());
let client = Arc::new(NoopProvider::default());
let p1 = PeerConfig::default();
let p2 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key);
let p3 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key_1);
net.extend_peer_with_config(vec![p1, p2, p3]).await.unwrap();
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
let handle2 = handles.next().unwrap();
drop(handles);
let _handle = net.spawn();
let mut listener0 = NetworkEventStream::new(handle0.event_listener());
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let _ = listener0.next_session_established().await.unwrap();
handle0.add_peer(*handle2.peer_id(), handle2.local_addr());
let _ = listener0.next_session_established().await.unwrap();
let peers = handle0.get_peers().await.unwrap();
assert_eq!(handle0.num_connected_peers(), peers.len());
dbg!(peers);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_get_peer_by_id() {
reth_tracing::init_tracing();
let mut net = Testnet::default();
let secret_key = SecretKey::new(&mut rand::thread_rng());
let secret_key_1 = SecretKey::new(&mut rand::thread_rng());
let client = Arc::new(NoopProvider::default());
let p1 = PeerConfig::default();
let p2 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key);
let p3 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key_1);
net.extend_peer_with_config(vec![p1, p2, p3]).await.unwrap();
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
let handle2 = handles.next().unwrap();
drop(handles);
let _handle = net.spawn();
let mut listener0 = NetworkEventStream::new(handle0.event_listener());
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let _ = listener0.next_session_established().await.unwrap();
let peer = handle0.get_peer_by_id(*handle1.peer_id()).await.unwrap();
assert!(peer.is_some());
let peer = handle0.get_peer_by_id(*handle2.peer_id()).await.unwrap();
assert!(peer.is_none());
}
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_connect_with_boot_nodes() {