diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index a3f9d160e4..5501c22561 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -279,31 +279,6 @@ where return Poll::Ready(None) } - // poll the pinger to determine if we should send a ping - match this.pinger.poll_ping(cx) { - Poll::Pending => {} - Poll::Ready(Ok(PingerEvent::Ping)) => { - // encode the ping message - let mut ping_bytes = BytesMut::new(); - P2PMessage::Ping.encode(&mut ping_bytes); - - // check if the buffer is full - if this.outgoing_messages.len() >= MAX_P2P_CAPACITY { - return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull))) - } - - // if the sink is not ready, buffer the message - this.outgoing_messages.push_back(ping_bytes.into()); - } - _ => { - // encode the disconnect message - this.start_disconnect(DisconnectReason::PingTimeout)?; - - // End the stream after ping related error - return Poll::Ready(None) - } - } - // we should loop here to ensure we don't return Poll::Pending if we have a message to // return behind any pings we need to respond to while let Poll::Ready(res) = this.inner.poll_next_unpin(cx) { @@ -416,6 +391,31 @@ where fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut(); + // poll the pinger to determine if we should send a ping + match this.pinger.poll_ping(cx) { + Poll::Pending => {} + Poll::Ready(Ok(PingerEvent::Ping)) => { + // encode the ping message + let mut ping_bytes = BytesMut::new(); + P2PMessage::Ping.encode(&mut ping_bytes); + + // check if the buffer is full + if this.outgoing_messages.len() >= MAX_P2P_CAPACITY { + return Poll::Ready(Err(P2PStreamError::SendBufferFull)) + } + + // if the sink is not ready, buffer the message + this.outgoing_messages.push_back(ping_bytes.into()); + } + _ => { + // encode the disconnect message + this.start_disconnect(DisconnectReason::PingTimeout)?; + + // End the stream after ping related error + return Poll::Ready(Ok(())) + } + } + match this.inner.poll_ready_unpin(cx) { Poll::Pending => {} Poll::Ready(Err(err)) => return Poll::Ready(Err(P2PStreamError::Io(err))),