diff --git a/crates/net/eth-wire-types/src/message.rs b/crates/net/eth-wire-types/src/message.rs index 2dbcbf8c26..00817cbe37 100644 --- a/crates/net/eth-wire-types/src/message.rs +++ b/crates/net/eth-wire-types/src/message.rs @@ -254,6 +254,30 @@ impl EthMessage { Self::Receipts(_) => EthMessageID::Receipts, } } + + /// Returns true if the message variant is a request. + pub const fn is_request(&self) -> bool { + matches!( + self, + Self::GetBlockBodies(_) | + Self::GetBlockHeaders(_) | + Self::GetReceipts(_) | + Self::GetPooledTransactions(_) | + Self::GetNodeData(_) + ) + } + + /// Returns true if the message variant is a response to a request. + pub const fn is_response(&self) -> bool { + matches!( + self, + Self::PooledTransactions(_) | + Self::Receipts(_) | + Self::BlockHeaders(_) | + Self::BlockBodies(_) | + Self::NodeData(_) + ) + } } impl Encodable for EthMessage { diff --git a/crates/net/network/src/eth_requests.rs b/crates/net/network/src/eth_requests.rs index f273e56f03..e0d40dfb95 100644 --- a/crates/net/network/src/eth_requests.rs +++ b/crates/net/network/src/eth_requests.rs @@ -44,7 +44,7 @@ const MAX_HEADERS_SERVE: usize = 1024; /// `SOFT_RESPONSE_LIMIT`. const MAX_BODIES_SERVE: usize = 1024; -/// Maximum size of replies to data retrievals. +/// Maximum size of replies to data retrievals: 2MB const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024; /// Manages eth related requests on top of the p2p network. @@ -167,7 +167,7 @@ where for hash in request.0 { if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() { - let (_, body) = block.split(); + let body = block.into_body(); total_bytes += body.length(); bodies.push(body); diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 475ede1baf..6781c2032a 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -54,6 +54,19 @@ const SAMPLE_IMPACT: f64 = 0.1; /// Amount of RTTs before timeout const TIMEOUT_SCALING: u32 = 3; +/// Restricts the number of queued outgoing messages for larger responses: +/// - Block Bodies +/// - Receipts +/// - Headers +/// - `PooledTransactions` +/// +/// With proper softlimits in place (2MB) this targets 10MB (4+1 * 2MB) of outgoing response data. +/// +/// This parameter serves as backpressure for reading additional requests from the remote. +/// Once we've queued up more responses than this, the session should priorotize message flushing +/// before reading any more messages from the remote peer, throttling the peer. +const MAX_QUEUED_OUTGOING_RESPONSES: usize = 4; + /// The type that advances an established session by listening for incoming messages (from local /// node or read from connection) and emitting events back to the /// [`SessionManager`](super::SessionManager). @@ -122,6 +135,11 @@ impl ActiveSession { self.queued_outgoing.shrink_to_fit(); } + /// Returns how many responses we've currently queued up. + fn queued_response_count(&self) -> usize { + self.queued_outgoing.messages.iter().filter(|m| m.is_response()).count() + } + /// Handle a message read from the connection. /// /// Returns an error if the message is considered to be in violation of the protocol. @@ -596,6 +614,29 @@ impl Future for ActiveSession { }; } + // check whether we should throttle incoming messages + if this.received_requests_from_remote.len() > MAX_QUEUED_OUTGOING_RESPONSES { + // we're currently waiting for the responses to the peer's requests which aren't + // queued as outgoing yet + // + // Note: we don't need to register the waker here because we polled the requests + // above + break 'receive + } + + // we also need to check if we have multiple responses queued up + if this.queued_outgoing.messages.len() > MAX_QUEUED_OUTGOING_RESPONSES && + this.queued_response_count() > MAX_QUEUED_OUTGOING_RESPONSES + { + // if we've queued up more responses than allowed, we don't poll for new + // messages and break the receive loop early + // + // Note: we don't need to register the waker here because we still have + // queued messages and the sink impl registered the waker because we've + // already advanced it to `Pending` earlier + break 'receive + } + match this.conn.poll_next_unpin(cx) { Poll::Pending => break, Poll::Ready(None) => { @@ -740,6 +781,16 @@ pub(crate) enum OutgoingMessage { Raw(RawCapabilityMessage), } +impl OutgoingMessage { + /// Returns true if this is a response. + const fn is_response(&self) -> bool { + match self { + Self::Eth(msg) => msg.is_response(), + _ => false, + } + } +} + impl From> for OutgoingMessage { fn from(value: EthMessage) -> Self { Self::Eth(value)