diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index 4333cf1408..bda5f84c76 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -85,6 +85,8 @@ pub struct SessionManagerMetrics { pub(crate) total_dial_successes: Counter, /// Number of dropped outgoing peer messages. pub(crate) total_outgoing_peer_messages_dropped: Counter, + /// Number of queued outgoing messages + pub(crate) queued_outgoing_messages: Gauge, } /// Metrics for the [`TransactionsManager`](crate::transactions::TransactionsManager). diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index e83f5d9f12..10048823c5 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -12,6 +12,7 @@ use std::{ }; use futures::{stream::Fuse, SinkExt, StreamExt}; +use metrics::Gauge; use reth_eth_wire::{ errors::{EthHandshakeError, EthStreamError, P2PStreamError}, message::{EthBroadcastMessage, RequestPair}, @@ -87,7 +88,7 @@ pub(crate) struct ActiveSession { /// All requests that were sent by the remote peer and we're waiting on an internal response pub(crate) received_requests_from_remote: Vec, /// Buffered messages that should be handled and sent to the peer. - pub(crate) queued_outgoing: VecDeque, + pub(crate) queued_outgoing: QueuedOutgoingMessages, /// The maximum time we wait for a response from a peer. pub(crate) internal_request_timeout: Arc, /// Interval when to check for timed out requests. @@ -757,6 +758,32 @@ fn calculate_new_timeout(current_timeout: Duration, estimated_rtt: Duration) -> smoothened_timeout.clamp(MINIMUM_TIMEOUT, MAXIMUM_TIMEOUT) } + +/// A helper struct that wraps the queue of outgoing messages and a metric to track their count +pub(crate) struct QueuedOutgoingMessages { + messages: VecDeque, + count: Gauge, +} + +impl QueuedOutgoingMessages { + pub(crate) const fn new(metric: Gauge) -> Self { + Self { messages: VecDeque::new(), count: metric } + } + + pub(crate) fn push_back(&mut self, message: OutgoingMessage) { + self.messages.push_back(message); + self.count.increment(1); + } + + pub(crate) fn pop_front(&mut self) -> Option { + self.messages.pop_front().inspect(|_| self.count.decrement(1)) + } + + pub(crate) fn shrink_to_fit(&mut self) { + self.messages.shrink_to_fit(); + } +} + #[cfg(test)] mod tests { use super::*; @@ -882,7 +909,7 @@ mod tests { internal_request_tx: ReceiverStream::new(messages_rx).fuse(), inflight_requests: Default::default(), conn, - queued_outgoing: Default::default(), + queued_outgoing: QueuedOutgoingMessages::new(Gauge::noop()), received_requests_from_remote: Default::default(), internal_request_timeout_interval: tokio::time::interval( INITIAL_REQUEST_TIMEOUT, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 3522aa6a75..712f076b47 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -5,6 +5,7 @@ mod conn; mod counter; mod handle; +use active::QueuedOutgoingMessages; pub use conn::EthRlpxConnection; pub use handle::{ ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, @@ -495,7 +496,9 @@ impl SessionManager { internal_request_tx: ReceiverStream::new(messages_rx).fuse(), inflight_requests: Default::default(), conn, - queued_outgoing: Default::default(), + queued_outgoing: QueuedOutgoingMessages::new( + self.metrics.queued_outgoing_messages.clone(), + ), received_requests_from_remote: Default::default(), internal_request_timeout_interval: tokio::time::interval( self.initial_internal_request_timeout,