diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index 2faf4c2e8a..55b70877f5 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -56,7 +56,11 @@ const GRACE_PERIOD: Duration = Duration::from_secs(2); /// [`MAX_P2P_CAPACITY`] is the maximum number of messages that can be buffered to be sent in the /// `p2p` stream. -const MAX_P2P_CAPACITY: usize = 64; +/// +/// Note: this default is rather low because it is expected that the [P2PStream] wraps an +/// [ECIESStream](reth_ecies::stream::ECIESStream) which internally already buffers a few MB of +/// encoded data. +const MAX_P2P_CAPACITY: usize = 2; /// An un-authenticated [`P2PStream`]. This is consumed and returns a [`P2PStream`] after the /// `Hello` handshake is completed. @@ -212,6 +216,10 @@ pub struct P2PStream { /// Outgoing messages buffered for sending to the underlying stream. outgoing_messages: VecDeque, + /// Maximum number of messages that we can buffer here before the [Sink] impl returns + /// [Poll::Pending]. + outgoing_message_buffer_capacity: usize, + /// Whether this stream is currently in the process of disconnecting by sending a disconnect /// message. disconnecting: bool, @@ -229,10 +237,20 @@ impl P2PStream { pinger: Pinger::new(PING_INTERVAL, PING_TIMEOUT), shared_capability: capability, outgoing_messages: VecDeque::new(), + outgoing_message_buffer_capacity: MAX_P2P_CAPACITY, disconnecting: false, } } + /// Sets a custom outgoing message buffer capacity. + /// + /// # Panics + /// + /// If the provided capacity is `0`. + pub fn set_outgoing_message_buffer_capacity(&mut self, capacity: usize) { + self.outgoing_message_buffer_capacity = capacity; + } + /// Returns the shared capability for this stream. pub fn shared_capability(&self) -> &SharedCapability { &self.shared_capability @@ -243,6 +261,11 @@ impl P2PStream { self.disconnecting } + /// Returns `true` if the stream has outgoing capacity. + fn has_outgoing_capacity(&self) -> bool { + self.outgoing_messages.len() < self.outgoing_message_buffer_capacity + } + /// Queues in a _snappy_ encoded [`P2PMessage::Pong`] message. fn send_pong(&mut self) { let pong = P2PMessage::Pong; @@ -366,10 +389,6 @@ where let id = *bytes.first().ok_or(P2PStreamError::EmptyProtocolMessage)?; match id { _ if id == P2PMessageID::Ping as u8 => { - if this.outgoing_messages.len() > MAX_P2P_CAPACITY { - return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull))) - } - tracing::trace!("Received Ping, Sending Pong"); this.send_pong(); } @@ -467,7 +486,7 @@ where } } - if self.outgoing_messages.len() < MAX_P2P_CAPACITY { + if self.has_outgoing_capacity() { // still has capacity Poll::Ready(Ok(())) } else { @@ -476,13 +495,13 @@ where } fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> { - let this = self.project(); - // ensure we have free capacity - if this.outgoing_messages.len() >= MAX_P2P_CAPACITY { + if !self.has_outgoing_capacity() { return Err(P2PStreamError::SendBufferFull) } + let this = self.project(); + let mut compressed = BytesMut::zeroed(1 + snap::raw::max_compress_len(item.len() - 1)); let compressed_size = this.encoder.compress(&item[1..], &mut compressed[1..]).map_err(|err| {