diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 4d31876ce4..9dbf058e85 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -56,6 +56,7 @@ use std::{ Arc, }, task::{Context, Poll}, + time::{Duration, Instant}, }; use tokio::sync::mpsc::{self, error::TrySendError}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -147,6 +148,19 @@ impl NetworkManager { pub fn secret_key(&self) -> SecretKey { self.swarm.sessions().secret_key() } + + #[inline] + fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) { + let metrics = &self.metrics; + + let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations; + + // update metrics for whole poll function + metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64()); + // update poll metrics for nested items + metrics.duration_poll_network_handle.set(acc_network_handle.as_secs_f64()); + metrics.duration_poll_swarm.set(acc_swarm.as_secs_f64()); + } } impl NetworkManager @@ -602,6 +616,249 @@ where } } } + + fn on_swarm_event(&mut self, event: SwarmEvent) { + // handle event + match event { + SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message), + SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => { + self.on_invalid_message(peer_id, capabilities, message); + self.metrics.invalid_messages_received.increment(1); + } + SwarmEvent::TcpListenerClosed { remote_addr } => { + trace!(target: "net", ?remote_addr, "TCP listener closed."); + } + SwarmEvent::TcpListenerError(err) => { + trace!(target: "net", %err, "TCP connection error."); + } + SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => { + trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection"); + self.metrics.total_incoming_connections.increment(1); + self.metrics + .incoming_connections + .set(self.swarm.state().peers().num_inbound_connections() as f64); + } + SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => { + trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection."); + self.metrics.total_outgoing_connections.increment(1); + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + } + SwarmEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + version, + messages, + status, + direction, + } => { + let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; + self.metrics.connected_peers.set(total_active as f64); + trace!( + target: "net", + ?remote_addr, + %client_version, + ?peer_id, + ?total_active, + kind=%direction, + peer_enode=%NodeRecord::new(remote_addr, peer_id), + "Session established" + ); + + if direction.is_incoming() { + self.swarm + .state_mut() + .peers_mut() + .on_incoming_session_established(peer_id, remote_addr); + } + self.event_listeners.notify(NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + version, + status, + messages, + }); + } + SwarmEvent::PeerAdded(peer_id) => { + trace!(target: "net", ?peer_id, "Peer added"); + self.event_listeners.notify(NetworkEvent::PeerAdded(peer_id)); + self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); + } + SwarmEvent::PeerRemoved(peer_id) => { + trace!(target: "net", ?peer_id, "Peer dropped"); + self.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id)); + self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); + } + SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { + let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; + self.metrics.connected_peers.set(total_active as f64); + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?total_active, + ?error, + "Session disconnected" + ); + + let mut reason = None; + if let Some(ref err) = error { + // If the connection was closed due to an error, we report + // the peer + self.swarm.state_mut().peers_mut().on_active_session_dropped( + &remote_addr, + &peer_id, + err, + ); + reason = err.as_disconnected(); + } else { + // Gracefully disconnected + self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id); + } + self.metrics.closed_sessions.increment(1); + // This can either be an incoming or outgoing connection which + // was closed. So we update + // both metrics + self.metrics + .incoming_connections + .set(self.swarm.state().peers().num_inbound_connections() as f64); + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + if let Some(reason) = reason { + self.disconnect_metrics.increment(reason); + } + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + self.event_listeners.notify(NetworkEvent::SessionClosed { peer_id, reason }); + } + SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { + trace!( + target: "net", + ?remote_addr, + ?error, + "Incoming pending session failed" + ); + + if let Some(ref err) = error { + self.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_dropped(remote_addr, err); + self.metrics.pending_session_failures.increment(1); + if let Some(reason) = err.as_disconnected() { + self.disconnect_metrics.increment(reason); + } + } else { + self.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_gracefully_closed(); + } + self.metrics.closed_sessions.increment(1); + self.metrics + .incoming_connections + .set(self.swarm.state().peers().num_inbound_connections() as f64); + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => { + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?error, + "Outgoing pending session failed" + ); + + if let Some(ref err) = error { + self.swarm.state_mut().peers_mut().on_pending_session_dropped( + &remote_addr, + &peer_id, + err, + ); + self.metrics.pending_session_failures.increment(1); + if let Some(reason) = err.as_disconnected() { + self.disconnect_metrics.increment(reason); + } + } else { + self.swarm + .state_mut() + .peers_mut() + .on_pending_session_gracefully_closed(&peer_id); + } + self.metrics.closed_sessions.increment(1); + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { + trace!( + target: "net", + ?remote_addr, + ?peer_id, + %error, + "Outgoing connection error" + ); + + self.swarm.state_mut().peers_mut().on_outgoing_connection_failure( + &remote_addr, + &peer_id, + &error, + ); + + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::BadMessage { peer_id } => { + self.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage); + self.metrics.invalid_messages_received.increment(1); + } + SwarmEvent::ProtocolBreach { peer_id } => { + self.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol); + } + } + } } impl NetworkManager @@ -639,20 +896,25 @@ where type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let start = Instant::now(); + let mut poll_durations = NetworkManagerPollDurations::default(); + let this = self.get_mut(); - // poll new block imports + // poll new block imports (expected to be a noop for POS) while let Poll::Ready(outcome) = this.block_import.poll(cx) { this.on_block_import_result(outcome); } // process incoming messages from a handle + let start_network_handle = Instant::now(); loop { match this.from_handle_rx.poll_next_unpin(cx) { Poll::Pending => break, Poll::Ready(None) => { - // This is only possible if the channel was deliberately closed since we always - // have an instance of `NetworkHandle` + // This is only possible if the channel was deliberately closed since we + // always have an instance of + // `NetworkHandle` error!("Network message channel closed."); return Poll::Ready(()) } @@ -660,22 +922,25 @@ where }; } - // This loop drives the entire state of network and does a lot of work. - // Under heavy load (many messages/events), data may arrive faster than it can be processed - // (incoming messages/requests -> events), and it is possible that more data has already - // arrived by the time an internal event is processed. Which could turn this loop into a - // busy loop. Without yielding back to the executor, it can starve other tasks waiting on - // that executor to execute them, or drive underlying resources To prevent this, we - // preemptively return control when the `budget` is exhausted. The value itself is - // chosen somewhat arbitrarily, it is high enough so the swarm can make meaningful progress - // but low enough that this loop does not starve other tasks for too long. - // If the budget is exhausted we manually yield back control to the (coop) scheduler. This - // manual yield point should prevent situations where polling appears to be frozen. See also - // And tokio's docs on cooperative scheduling + poll_durations.acc_network_handle = start_network_handle.elapsed(); + + // This loop drives the entire state of network and does a lot of work. Under heavy load + // (many messages/events), data may arrive faster than it can be processed (incoming + // messages/requests -> events), and it is possible that more data has already arrived by + // the time an internal event is processed. Which could turn this loop into a busy loop. + // Without yielding back to the executor, it can starve other tasks waiting on that + // executor to execute them, or drive underlying resources To prevent this, we + // preemptively return control when the `budget` is exhausted. The value itself is chosen + // somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but + // low enough that this loop does not starve other tasks for too long. If the budget is + // exhausted we manually yield back control to the (coop) scheduler. This manual yield + // point should prevent situations where polling appears to be frozen. See also + // And tokio's docs on cooperative scheduling + // // // Testing has shown that this loop naturally reaches the pending state within 1-5 - // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside - // the range of what's recommended as rule of thumb. + // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the + // range of what's recommended as rule of thumb. // let mut budget = 10; @@ -683,246 +948,7 @@ where // advance the swarm match this.swarm.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => break, - Poll::Ready(Some(event)) => { - // handle event - match event { - SwarmEvent::ValidMessage { peer_id, message } => { - this.on_peer_message(peer_id, message) - } - SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => { - this.on_invalid_message(peer_id, capabilities, message); - this.metrics.invalid_messages_received.increment(1); - } - SwarmEvent::TcpListenerClosed { remote_addr } => { - trace!(target: "net", ?remote_addr, "TCP listener closed."); - } - SwarmEvent::TcpListenerError(err) => { - trace!(target: "net", %err, "TCP connection error."); - } - SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => { - trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection"); - this.metrics.total_incoming_connections.increment(1); - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() as f64); - } - SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => { - trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection."); - this.metrics.total_outgoing_connections.increment(1); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - } - SwarmEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - version, - messages, - status, - direction, - } => { - let total_active = - this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; - this.metrics.connected_peers.set(total_active as f64); - trace!( - target: "net", - ?remote_addr, - %client_version, - ?peer_id, - ?total_active, - kind=%direction, - peer_enode=%NodeRecord::new(remote_addr, peer_id), - "Session established" - ); - - if direction.is_incoming() { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_session_established(peer_id, remote_addr); - } - this.event_listeners.notify(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - version, - status, - messages, - }); - } - SwarmEvent::PeerAdded(peer_id) => { - trace!(target: "net", ?peer_id, "Peer added"); - this.event_listeners.notify(NetworkEvent::PeerAdded(peer_id)); - this.metrics - .tracked_peers - .set(this.swarm.state().peers().num_known_peers() as f64); - } - SwarmEvent::PeerRemoved(peer_id) => { - trace!(target: "net", ?peer_id, "Peer dropped"); - this.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id)); - this.metrics - .tracked_peers - .set(this.swarm.state().peers().num_known_peers() as f64); - } - SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { - let total_active = - this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; - this.metrics.connected_peers.set(total_active as f64); - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?total_active, - ?error, - "Session disconnected" - ); - - let mut reason = None; - if let Some(ref err) = error { - // If the connection was closed due to an error, we report the peer - this.swarm.state_mut().peers_mut().on_active_session_dropped( - &remote_addr, - &peer_id, - err, - ); - reason = err.as_disconnected(); - } else { - // Gracefully disconnected - this.swarm - .state_mut() - .peers_mut() - .on_active_session_gracefully_closed(peer_id); - } - this.metrics.closed_sessions.increment(1); - // This can either be an incoming or outgoing connection which was - // closed. So we update both metrics - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() as f64); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - if let Some(reason) = reason { - this.disconnect_metrics.increment(reason); - } - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - this.event_listeners - .notify(NetworkEvent::SessionClosed { peer_id, reason }); - } - SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { - trace!( - target: "net", - ?remote_addr, - ?error, - "Incoming pending session failed" - ); - - if let Some(ref err) = error { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_pending_session_dropped(remote_addr, err); - this.metrics.pending_session_failures.increment(1); - if let Some(reason) = err.as_disconnected() { - this.disconnect_metrics.increment(reason); - } - } else { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_pending_session_gracefully_closed(); - } - this.metrics.closed_sessions.increment(1); - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() as f64); - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - } - SwarmEvent::OutgoingPendingSessionClosed { - remote_addr, - peer_id, - error, - } => { - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?error, - "Outgoing pending session failed" - ); - - if let Some(ref err) = error { - this.swarm.state_mut().peers_mut().on_pending_session_dropped( - &remote_addr, - &peer_id, - err, - ); - this.metrics.pending_session_failures.increment(1); - if let Some(reason) = err.as_disconnected() { - this.disconnect_metrics.increment(reason); - } - } else { - this.swarm - .state_mut() - .peers_mut() - .on_pending_session_gracefully_closed(&peer_id); - } - this.metrics.closed_sessions.increment(1); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - } - SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { - trace!( - target: "net", - ?remote_addr, - ?peer_id, - %error, - "Outgoing connection error" - ); - - this.swarm.state_mut().peers_mut().on_outgoing_connection_failure( - &remote_addr, - &peer_id, - &error, - ); - - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - } - SwarmEvent::BadMessage { peer_id } => { - this.swarm.state_mut().peers_mut().apply_reputation_change( - &peer_id, - ReputationChangeKind::BadMessage, - ); - this.metrics.invalid_messages_received.increment(1); - } - SwarmEvent::ProtocolBreach { peer_id } => { - this.swarm.state_mut().peers_mut().apply_reputation_change( - &peer_id, - ReputationChangeKind::BadProtocol, - ); - } - } - } + Poll::Ready(Some(event)) => this.on_swarm_event(event), } // ensure we still have enough budget for another iteration @@ -935,6 +961,11 @@ where } } + poll_durations.acc_swarm = + start_network_handle.elapsed() - poll_durations.acc_network_handle; + + this.update_poll_metrics(start, poll_durations); + Poll::Pending } } @@ -979,3 +1010,9 @@ pub enum NetworkEvent { pub enum DiscoveredEvent { EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option }, } + +#[derive(Debug, Default)] +struct NetworkManagerPollDurations { + acc_network_handle: Duration, + acc_swarm: Duration, +} diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index 45e2490717..6a85d47c6f 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -45,6 +45,30 @@ pub struct NetworkMetrics { /// Number of Eth Requests dropped due to channel being at full capacity pub(crate) total_dropped_eth_requests_at_full_capacity: Counter, + + /* ================ POLL DURATION ================ */ + + /* -- Total poll duration of `NetworksManager` future -- */ + /// Duration in seconds of call to + /// [`NetworkManager`](crate::NetworkManager)'s poll function. + /// + /// True duration of this call, should be sum of the accumulated durations of calling nested + // items. + pub(crate) duration_poll_network_manager: Gauge, + + /* -- Poll duration of items nested in `NetworkManager` future -- */ + /// Time spent streaming messages sent over the [`NetworkHandle`](crate::NetworkHandle), which + /// can be cloned and shared via [`NetworkManager::handle`](crate::NetworkManager::handle), in + /// one call to poll the [`NetworkManager`](crate::NetworkManager) future. + /// + /// Duration in seconds. + // todo: find out how many components hold the network handle. + pub(crate) duration_poll_network_handle: Gauge, + /// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the + /// [`NetworkManager`](crate::NetworkManager) future. + /// + /// Duration in seconds. + pub(crate) duration_poll_swarm: Gauge, } /// Metrics for SessionManager @@ -119,7 +143,7 @@ pub struct TransactionsManagerMetrics { /// [`TransactionsManager`](crate::transactions::TransactionsManager)'s poll function. /// /// Updating metrics could take time, so the true duration of this call could - /// be longer than the sum of the accumulated durations of polling nested streams. + /// be longer than the sum of the accumulated durations of polling nested items. pub(crate) duration_poll_tx_manager: Gauge, /* -- Poll duration of items nested in `TransactionsManager` future -- */ @@ -167,6 +191,19 @@ pub struct TransactionsManagerMetrics { pub(crate) acc_duration_poll_commands: Gauge, } +/// Measures the duration of executing the given code block. The duration is added to the given +/// accumulator value passed as a mutable reference. +#[macro_export] +macro_rules! duration_metered_exec { + ($code:block, $acc:ident) => { + let start = Instant::now(); + + $code; + + *$acc += start.elapsed(); + }; +} + /// Metrics for Disconnection types /// /// These are just counters, and ideally we would implement these metrics on a peer-by-peer basis, diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 4b0bc01558..700da8f2bd 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -29,6 +29,7 @@ use crate::{ cache::LruCache, + duration_metered_exec, manager::NetworkEvent, message::{PeerRequest, PeerRequestSender}, metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, @@ -369,7 +370,7 @@ where // update metrics for whole poll function metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64()); - // update poll metrics for nested streams + // update metrics for nested expressions metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64()); metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64()); metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64()); @@ -1163,18 +1164,6 @@ where } } -/// Measures the duration of executing the given code block. The duration is added to the given -/// accumulator value passed as a mutable reference. -macro_rules! duration_metered_exec { - ($code:block, $acc:ident) => { - let start = Instant::now(); - - $code; - - *$acc += start.elapsed(); - }; -} - #[derive(Debug, Default)] struct TxManagerPollDurations { acc_network_events: Duration,