diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 4bf839154a..77d37a46a8 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -138,3 +138,5 @@ pub use manager::{NetworkEvent, NetworkManager}; pub use message::PeerRequest; pub use network::NetworkHandle; pub use peers::PeersConfig; + +pub use reth_eth_wire::DisconnectReason; diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 1074b9cbdb..01a551d5eb 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -459,8 +459,11 @@ where .swarm .sessions_mut() .send_message(&peer_id, PeerMessage::PooledTransactions(msg)), - NetworkHandleMessage::AddPeerAddress(peer, addr) => { - self.swarm.state_mut().add_peer_address(peer, addr); + NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => { + self.swarm.state_mut().add_peer_kind(peer, kind, addr); + } + NetworkHandleMessage::RemovePeer(peer_id, kind) => { + self.swarm.state_mut().remove_peer(peer_id, kind); } NetworkHandleMessage::DisconnectPeer(peer_id, reason) => { self.swarm.sessions_mut().disconnect(peer_id, reason); diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 1015b12d8d..8c3173e3e7 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -2,7 +2,7 @@ use crate::{ config::NetworkMode, manager::NetworkEvent, message::PeerRequest, - peers::{PeersHandle, ReputationChangeKind}, + peers::{PeerKind, PeersHandle, ReputationChangeKind}, session::PeerInfo, FetchClient, }; @@ -142,7 +142,25 @@ impl NetworkHandle { /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known /// set pub fn add_peer(&self, peer: PeerId, addr: SocketAddr) { - let _ = self.inner.to_manager_tx.send(NetworkHandleMessage::AddPeerAddress(peer, addr)); + self.add_peer_kind(peer, PeerKind::Basic, addr); + } + + /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a trusted peer + /// to the known set + pub fn add_trusted_peer(&self, peer: PeerId, addr: SocketAddr) { + self.add_peer_kind(peer, PeerKind::Trusted, addr); + } + + /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known + /// set, with the given kind. + pub fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr) { + self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr)); + } + + /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to remove a peer from the + /// set corresponding to given kind. + pub fn remove_peer(&self, peer: PeerId, kind: PeerKind) { + self.send_message(NetworkHandleMessage::RemovePeer(peer, kind)) } /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to disconnect an existing @@ -226,7 +244,9 @@ struct NetworkInner { #[allow(missing_docs)] pub(crate) enum NetworkHandleMessage { /// Adds an address for a peer. - AddPeerAddress(PeerId, SocketAddr), + AddPeerAddress(PeerId, PeerKind, SocketAddr), + /// Removes a peer from the peerset correponding to the given kind. + RemovePeer(PeerId, PeerKind), /// Disconnect a connection to a peer if it exists. DisconnectPeer(PeerId, Option), /// Add a new listener for [`NetworkEvent`]. diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index f899dedf33..5262509c29 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -403,9 +403,23 @@ impl PeersManager { /// Called for a newly discovered peer. /// - /// If the peer already exists, then the address will be updated. If the addresses differ, the - /// old address is returned - pub(crate) fn add_discovered_node(&mut self, peer_id: PeerId, addr: SocketAddr) { + /// If the peer already exists, then the address and kind will be updated. + pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: SocketAddr) { + self.add_peer_kind(peer_id, PeerKind::Basic, addr) + } + + /// Called for a newly discovered trusted peer. + /// + /// If the peer already exists, then the address and kind will be updated. + #[allow(dead_code)] + pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: SocketAddr) { + self.add_peer_kind(peer_id, PeerKind::Trusted, addr) + } + + /// Called for a newly discovered peer. + /// + /// If the peer already exists, then the address and kind will be updated. + pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: SocketAddr) { if self.ban_list.is_banned(&peer_id, &addr.ip()) { return } @@ -413,12 +427,13 @@ impl PeersManager { match self.peers.entry(peer_id) { Entry::Occupied(mut entry) => { let node = entry.get_mut(); + node.kind = kind; node.addr = addr; return } Entry::Vacant(entry) => { trace!(target : "net::peers", ?peer_id, ?addr, "discovered new node"); - entry.insert(Peer::new(addr)); + entry.insert(Peer::with_kind(addr, kind)); self.queued_actions.push_back(PeerAction::PeerAdded(peer_id)); } } @@ -427,7 +442,7 @@ impl PeersManager { } /// Removes the tracked node from the set. - pub(crate) fn remove_discovered_node(&mut self, peer_id: PeerId) { + pub(crate) fn remove_peer(&mut self, peer_id: PeerId) { let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return }; if entry.get().is_trusted() { return @@ -453,6 +468,18 @@ impl PeersManager { } } + /// Removes the tracked node from the trusted set. + pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) { + let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return }; + if !entry.get().is_trusted() { + return + } + + let peer = entry.get_mut(); + + peer.kind = PeerKind::Basic; + } + /// Returns the idle peer with the highest reputation. /// /// Peers that are `trusted`, see [PeerKind], are prioritized as long as they're not currently @@ -539,9 +566,9 @@ impl PeersManager { while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) { match cmd { PeerCommand::Add(peer_id, addr) => { - self.add_discovered_node(peer_id, addr); + self.add_peer(peer_id, addr); } - PeerCommand::Remove(peer) => self.remove_discovered_node(peer), + PeerCommand::Remove(peer) => self.remove_peer(peer), PeerCommand::ReputationChange(peer_id, rep) => { self.apply_reputation_change(&peer_id, rep) } @@ -679,6 +706,10 @@ impl Peer { } } + fn with_kind(addr: SocketAddr, kind: PeerKind) -> Self { + Self { kind, ..Self::new(addr) } + } + /// Applies a reputation change to the peer and returns what action should be taken. fn apply_reputation(&mut self, reputation: i32) -> ReputationChangeOutcome { let previous = self.reputation; @@ -778,7 +809,7 @@ impl PeerConnectionState { /// Represents the kind of peer #[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] -enum PeerKind { +pub enum PeerKind { /// Basic peer kind. #[default] Basic, @@ -979,7 +1010,7 @@ mod test { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_discovered_node(peer, socket_addr); + peers.add_peer(peer, socket_addr); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1002,7 +1033,7 @@ mod test { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); peers.ban_peer(peer); - peers.add_discovered_node(peer, socket_addr); + peers.add_peer(peer, socket_addr); match event!(peers) { PeerAction::BanPeer { peer_id } => { @@ -1026,7 +1057,7 @@ mod test { let backoff_duration = Duration::from_secs(3); let config = PeersConfig { backoff_duration, ..Default::default() }; let mut peers = PeersManager::new(config); - peers.add_discovered_node(peer, socket_addr); + peers.add_peer(peer, socket_addr); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1089,7 +1120,7 @@ mod test { let backoff_duration = Duration::from_secs(3); let config = PeersConfig { backoff_duration, ..Default::default() }; let mut peers = PeersManager::new(config); - peers.add_discovered_node(peer, socket_addr); + peers.add_peer(peer, socket_addr); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1149,7 +1180,7 @@ mod test { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_discovered_node(peer, socket_addr); + peers.add_peer(peer, socket_addr); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1205,7 +1236,7 @@ mod test { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_discovered_node(peer, socket_addr); + peers.add_peer(peer, socket_addr); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1317,7 +1348,7 @@ mod test { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_discovered_node(peer, socket_addr); + peers.add_peer(peer, socket_addr); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1361,7 +1392,7 @@ mod test { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_discovered_node(peer, socket_addr); + peers.add_peer(peer, socket_addr); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1380,7 +1411,7 @@ mod test { let p = peers.peers.get(&peer).unwrap(); assert_eq!(p.state, PeerConnectionState::Out); - peers.remove_discovered_node(peer); + peers.remove_peer(peer); match event!(peers) { PeerAction::PeerRemoved(peer_id) => { @@ -1398,7 +1429,7 @@ mod test { let p = peers.peers.get(&peer).unwrap(); assert_eq!(p.state, PeerConnectionState::DisconnectingOut); - peers.add_discovered_node(peer, socket_addr); + peers.add_peer(peer, socket_addr); let p = peers.peers.get(&peer).unwrap(); assert_eq!(p.state, PeerConnectionState::DisconnectingOut); @@ -1413,7 +1444,7 @@ mod test { let ban_list = BanList::new(HashSet::new(), vec![ip]); let config = PeersConfig::default().with_ban_list(ban_list); let mut peer_manager = PeersManager::new(config); - peer_manager.add_discovered_node(H512::default(), socket_addr); + peer_manager.add_peer(H512::default(), socket_addr); assert!(peer_manager.peers.is_empty()); } @@ -1505,7 +1536,7 @@ mod test { let basic_peer = PeerId::random(); let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); - peers.add_discovered_node(basic_peer, basic_sock); + peers.add_peer(basic_peer, basic_sock); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1545,7 +1576,7 @@ mod test { let basic_peer = PeerId::random(); let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); - peers.add_discovered_node(basic_peer, basic_sock); + peers.add_peer(basic_peer, basic_sock); match event!(peers) { PeerAction::PeerAdded(peer_id) => { diff --git a/crates/net/network/src/peers/mod.rs b/crates/net/network/src/peers/mod.rs index 0c1e420728..7808b93607 100644 --- a/crates/net/network/src/peers/mod.rs +++ b/crates/net/network/src/peers/mod.rs @@ -4,5 +4,5 @@ mod manager; mod reputation; pub(crate) use manager::{InboundConnectionError, PeerAction, PeersManager}; -pub use manager::{PeersConfig, PeersHandle}; +pub use manager::{PeerKind, PeersConfig, PeersHandle}; pub use reputation::{ReputationChangeKind, ReputationChangeWeights}; diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index c434546666..cd9ea53b67 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -8,7 +8,7 @@ use crate::{ BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse, PeerResponseResult, }, - peers::{PeerAction, PeersManager}, + peers::{PeerAction, PeerKind, PeersManager}, FetchClient, }; use reth_eth_wire::{ @@ -253,16 +253,23 @@ where self.discovery.ban(peer_id, ip) } - /// Adds a peer and its address to the peerset. - pub(crate) fn add_peer_address(&mut self, peer_id: PeerId, addr: SocketAddr) { - self.peers_manager.add_discovered_node(peer_id, addr) + /// Adds a peer and its address with the given kind to the peerset. + pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: SocketAddr) { + self.peers_manager.add_peer_kind(peer_id, kind, addr) + } + + pub(crate) fn remove_peer(&mut self, peer_id: PeerId, kind: PeerKind) { + match kind { + PeerKind::Basic => self.peers_manager.remove_peer(peer_id), + PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id), + } } /// Event hook for events received from the discovery service. fn on_discovery_event(&mut self, event: DiscoveryEvent) { match event { DiscoveryEvent::Discovered(peer, addr) => { - self.peers_manager.add_discovered_node(peer, addr); + self.peers_manager.add_peer(peer, addr); } DiscoveryEvent::EnrForkId(peer_id, fork_id) => { self.queued_messages diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 1522728cb3..0f46384f1e 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -237,7 +237,7 @@ where if self.sessions.is_valid_fork_id(fork_id) { self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id); } else { - self.state_mut().peers_mut().remove_discovered_node(peer_id); + self.state_mut().peers_mut().remove_peer(peer_id); } } } diff --git a/crates/net/rpc-api/src/admin.rs b/crates/net/rpc-api/src/admin.rs index b5723235eb..b634d25324 100644 --- a/crates/net/rpc-api/src/admin.rs +++ b/crates/net/rpc-api/src/admin.rs @@ -1,4 +1,5 @@ use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc}; +use reth_primitives::NodeRecord; /// Admin namespace rpc interface that gives access to several non-standard RPC methods. #[rpc(server)] @@ -6,24 +7,24 @@ use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc}; pub trait AdminApi { /// Adds the given node record to the peerset. #[method(name = "admin_addPeer")] - async fn add_peer(&self, record: String) -> Result; + fn add_peer(&self, record: NodeRecord) -> Result; /// Disconnects from a remote node if the connection exists. /// /// Returns true if the peer was successfully removed. #[method(name = "admin_removePeer")] - async fn remove_peer(&self, record: String) -> Result; + fn remove_peer(&self, record: NodeRecord) -> Result; /// Adds the given node record to the trusted peerset. #[method(name = "admin_addTrustedPeer")] - async fn add_trusted_peer(&self, record: String) -> Result; + fn add_trusted_peer(&self, record: NodeRecord) -> Result; /// Removes a remote node from the trusted peer set, but it does not disconnect it /// automatically. /// /// Returns true if the peer was successfully removed. #[method(name = "admin_removeTrustedPeer")] - async fn remove_trusted_peer(&self, record: String) -> Result; + fn remove_trusted_peer(&self, record: NodeRecord) -> Result; /// Creates an RPC subscription which serves events received from the network. #[subscription( diff --git a/crates/net/rpc-api/src/lib.rs b/crates/net/rpc-api/src/lib.rs index ba31c22d81..ddb6522ddb 100644 --- a/crates/net/rpc-api/src/lib.rs +++ b/crates/net/rpc-api/src/lib.rs @@ -20,7 +20,7 @@ mod trace; mod web3; pub use self::{ - debug::DebugApiServer, engine::EngineApiServer, eth::EthApiServer, + admin::AdminApiServer, debug::DebugApiServer, engine::EngineApiServer, eth::EthApiServer, eth_filter::EthFilterApiServer, eth_pubsub::EthPubSubApiServer, net::NetApiServer, web3::Web3ApiServer, }; diff --git a/crates/net/rpc/src/admin/mod.rs b/crates/net/rpc/src/admin/mod.rs new file mode 100644 index 0000000000..fd7940a367 --- /dev/null +++ b/crates/net/rpc/src/admin/mod.rs @@ -0,0 +1,38 @@ +use jsonrpsee::core::RpcResult; +use reth_network::{peers::PeerKind, NetworkHandle}; +use reth_primitives::NodeRecord; +use reth_rpc_api::AdminApiServer; + +struct AdminApi { + /// An interface to interact with the network + network: NetworkHandle, +} + +impl AdminApiServer for AdminApi { + fn add_peer(&self, record: NodeRecord) -> RpcResult { + self.network.add_peer(record.id, record.tcp_addr()); + Ok(true) + } + + fn remove_peer(&self, record: NodeRecord) -> RpcResult { + self.network.remove_peer(record.id, PeerKind::Basic); + Ok(true) + } + + fn add_trusted_peer(&self, record: NodeRecord) -> RpcResult { + self.network.add_trusted_peer(record.id, record.tcp_addr()); + Ok(true) + } + + fn remove_trusted_peer(&self, record: NodeRecord) -> RpcResult { + self.network.remove_peer(record.id, PeerKind::Trusted); + Ok(true) + } + + fn subscribe( + &self, + _subscription_sink: jsonrpsee::SubscriptionSink, + ) -> jsonrpsee::types::SubscriptionResult { + todo!() + } +} diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index dedf3da738..da2cbd7847 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -11,6 +11,7 @@ //! //! Provides the implementation of all RPC interfaces. +mod admin; mod engine; mod eth; mod net;