diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 4dc8535b1e..594a3c6707 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -46,12 +46,7 @@ pub struct StateFetcher { impl StateFetcher { /// Invoked when connected to a new peer. - pub(crate) fn new_connected_peer( - &mut self, - peer_id: PeerId, - best_hash: H256, - best_number: u64, - ) { + pub(crate) fn new_active_peer(&mut self, peer_id: PeerId, best_hash: H256, best_number: u64) { self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number }); } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 8e77fe8499..10e2b4fb32 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -188,7 +188,7 @@ where /// How many peers we're currently connected to. pub fn num_connected_peers(&self) -> usize { - self.swarm.state().num_connected_peers() + self.swarm.state().num_active_peers() } /// Returns the [`PeerId`] used in the network. diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index ea8d644c15..a2acc65e84 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -40,8 +40,8 @@ const PEER_BLOCK_CACHE_LIMIT: usize = 512; /// /// This type is also responsible for responding for received request. pub struct NetworkState { - /// All connected peers and their state. - connected_peers: HashMap, + /// All active peers and their state. + active_peers: HashMap, /// Manages connections to peers. peers_manager: PeersManager, /// Buffered messages until polled. @@ -71,7 +71,7 @@ where genesis_hash: H256, ) -> Self { Self { - connected_peers: Default::default(), + active_peers: Default::default(), peers_manager, queued_messages: Default::default(), client, @@ -96,14 +96,14 @@ where self.state_fetcher.client() } - /// How many peers we're currently connected to. + /// Configured genesis hash. pub fn genesis_hash(&self) -> H256 { self.genesis_hash } /// How many peers we're currently connected to. - pub fn num_connected_peers(&self) -> usize { - self.connected_peers.len() + pub fn num_active_peers(&self) -> usize { + self.active_peers.len() } /// Event hook for an activated session for the peer. @@ -117,16 +117,16 @@ where status: Status, request_tx: PeerRequestSender, ) { - debug_assert!(!self.connected_peers.contains_key(&peer), "Already connected; not possible"); + debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible"); // find the corresponding block number let block_number = self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default(); - self.state_fetcher.new_connected_peer(peer, status.blockhash, block_number); + self.state_fetcher.new_active_peer(peer, status.blockhash, block_number); - self.connected_peers.insert( + self.active_peers.insert( peer, - ConnectedPeer { + ActivePeer { best_hash: status.blockhash, capabilities, request_tx, @@ -138,7 +138,7 @@ where /// Event hook for a disconnected session for the peer. pub(crate) fn on_session_closed(&mut self, peer: PeerId) { - self.connected_peers.remove(&peer); + self.active_peers.remove(&peer); self.state_fetcher.on_session_closed(&peer); } @@ -153,11 +153,11 @@ where pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage) { // send a `NewBlock` message to a fraction fo the connected peers (square root of the total // number of peers) - let num_propagate = (self.connected_peers.len() as f64).sqrt() as u64 + 1; + let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1; let number = msg.block.block.header.number; let mut count = 0; - for (peer_id, peer) in self.connected_peers.iter_mut() { + for (peer_id, peer) in self.active_peers.iter_mut() { if peer.blocks.contains(&msg.hash) { // skip peers which already reported the block continue @@ -190,7 +190,7 @@ where pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage) { let number = msg.block.block.header.number; let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]); - for (peer_id, peer) in self.connected_peers.iter_mut() { + for (peer_id, peer) in self.active_peers.iter_mut() { if peer.blocks.contains(&msg.hash) { // skip peers which already reported the block continue @@ -209,7 +209,7 @@ where /// Updates the block information for the peer. pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: H256, number: u64) { - if let Some(peer) = self.connected_peers.get_mut(peer_id) { + if let Some(peer) = self.active_peers.get_mut(peer_id) { peer.best_hash = hash; } self.state_fetcher.update_peer_block(peer_id, hash, number); @@ -220,7 +220,7 @@ where /// This will keep track of blocks we know a peer has pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: H256) { // Mark the blocks as seen - if let Some(peer) = self.connected_peers.get_mut(&peer_id) { + if let Some(peer) = self.active_peers.get_mut(&peer_id) { peer.blocks.insert(hash); } } @@ -228,7 +228,7 @@ where /// Invoked for a `NewBlockHashes` broadcast message. pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec) { // Mark the blocks as seen - if let Some(peer) = self.connected_peers.get_mut(&peer_id) { + if let Some(peer) = self.active_peers.get_mut(&peer_id) { peer.blocks.extend(hashes.into_iter().map(|b| b.hash)); } } @@ -267,7 +267,7 @@ where /// Disconnect the session fn on_session_disconnected(&mut self, peer: PeerId) { - self.connected_peers.remove(&peer); + self.active_peers.remove(&peer); } /// Sends The message to the peer's session and queues in a response. @@ -275,7 +275,7 @@ where /// Caution: this will replace an already pending response. It's the responsibility of the /// caller to select the peer. fn handle_block_request(&mut self, peer: PeerId, request: BlockRequest) { - if let Some(ref mut peer) = self.connected_peers.get_mut(&peer) { + if let Some(ref mut peer) = self.active_peers.get_mut(&peer) { let (request, response) = match request { BlockRequest::GetBlockHeaders(request) => { let (response, rx) = oneshot::channel(); @@ -352,8 +352,8 @@ where let mut received_responses = Vec::new(); // poll all connected peers for responses - for (id, peer) in self.connected_peers.iter_mut() { - if let Some(response) = peer.pending_response.as_mut() { + for (id, peer) in self.active_peers.iter_mut() { + if let Some(mut response) = peer.pending_response.take() { match response.poll(cx) { Poll::Ready(Err(_)) => { trace!( @@ -364,20 +364,20 @@ where disconnect_sessions.push(*id); } Poll::Ready(Ok(resp)) => received_responses.push((*id, resp)), - Poll::Pending => continue, + Poll::Pending => { + // not ready yet, store again. + peer.pending_response = Some(response); + } }; } - - // request has either returned a response or was canceled here - peer.pending_response.take(); } - for node in disconnect_sessions { - self.on_session_disconnected(node) + for peer in disconnect_sessions { + self.on_session_disconnected(peer) } - for (id, resp) in received_responses { - if let Some(action) = self.on_eth_response(id, resp) { + for (peer_id, resp) in received_responses { + if let Some(action) = self.on_eth_response(peer_id, resp) { self.queued_messages.push_back(action); } } @@ -394,13 +394,13 @@ where } } -/// Tracks the state of a Peer. +/// Tracks the state of a Peer with an active Session. /// /// For example known blocks,so we can decide what to announce. -pub(crate) struct ConnectedPeer { +pub(crate) struct ActivePeer { /// Best block of the peer. pub(crate) best_hash: H256, - /// The capabilities of the connected peer. + /// The capabilities of the remote peer. #[allow(unused)] pub(crate) capabilities: Arc, /// A communication channel directly to the session task.