perf(net): backoff on incoming if channel at capacity (#1191)

This commit is contained in:
Matthias Seitz
2023-02-06 15:09:56 +01:00
committed by GitHub
parent 5e97b4e28e
commit 9572ba0b30
2 changed files with 191 additions and 67 deletions

View File

@@ -33,7 +33,7 @@ use std::{
};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
sync::{mpsc::error::TrySendError, oneshot},
time::Interval,
};
use tokio_stream::wrappers::ReceiverStream;
@@ -76,6 +76,8 @@ pub(crate) struct ActiveSession {
pub(crate) commands_rx: ReceiverStream<SessionCommand>,
/// Sink to send messages to the [`SessionManager`](super::SessionManager).
pub(crate) to_session: MeteredSender<ActiveSessionMessage>,
/// A message that needs to be delivered to the session manager
pub(crate) pending_message_to_session: Option<ActiveSessionMessage>,
/// Incoming request to send to delegate to the remote peer.
pub(crate) request_tx: Fuse<ReceiverStream<PeerRequest>>,
/// All requests sent to the remote peer we're waiting on a response
@@ -109,12 +111,12 @@ impl ActiveSession {
/// Handle a message read from the connection.
///
/// Returns an error if the message is considered to be in violation of the protocol.
fn on_incoming(&mut self, msg: EthMessage) -> Option<(EthStreamError, EthMessage)> {
fn on_incoming(&mut self, msg: EthMessage) -> OnIncomingMessageOutcome {
/// A macro that handles an incoming request
/// This creates a new channel and tries to send the sender half to the session while
/// storing the receiver half internally so the pending response can be polled.
macro_rules! on_request {
($req:ident, $resp_item:ident, $req_item:ident) => {
($req:ident, $resp_item:ident, $req_item:ident) => {{
let RequestPair { request_id, message: request } = $req;
let (tx, response) = oneshot::channel();
let received = ReceivedRequest {
@@ -122,98 +124,93 @@ impl ActiveSession {
rx: PeerResponse::$resp_item { response },
received: Instant::now(),
};
if self
.emit_request(PeerMessage::EthRequest(PeerRequest::$req_item {
request,
response: tx,
}))
.is_ok()
{
self.received_requests.push(received);
}
};
self.received_requests.push(received);
self.try_emit_request(PeerMessage::EthRequest(PeerRequest::$req_item {
request,
response: tx,
}))
.into()
}};
}
/// Processes a response received from the peer
macro_rules! on_response {
($resp:ident, $item:ident) => {
($resp:ident, $item:ident) => {{
let RequestPair { request_id, message } = $resp;
#[allow(clippy::collapsible_match)]
if let Some(req) = self.inflight_requests.remove(&request_id) {
match req.request {
RequestState::Waiting(PeerRequest::$item { response, .. }) => {
let _ = response.send(Ok(message));
self.update_request_timeout(req.timestamp, Instant::now())
self.update_request_timeout(req.timestamp, Instant::now());
}
RequestState::Waiting(request) => {
request.send_bad_response();
}
RequestState::TimedOut => {
// request was already timed out internally
self.update_request_timeout(req.timestamp, Instant::now())
self.update_request_timeout(req.timestamp, Instant::now());
}
}
};
} else {
// we received a response to a request we never sent
self.on_bad_message();
}
};
OnIncomingMessageOutcome::Ok
}};
}
match msg {
msg @ EthMessage::Status(_) => {
return Some((
EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake),
msg,
))
}
message @ EthMessage::Status(_) => OnIncomingMessageOutcome::BadMessage {
error: EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake),
message,
},
EthMessage::NewBlockHashes(msg) => {
self.emit_broadcast(PeerMessage::NewBlockHashes(msg));
self.try_emit_broadcast(PeerMessage::NewBlockHashes(msg)).into()
}
EthMessage::NewBlock(msg) => {
let block =
NewBlockMessage { hash: msg.block.header.hash_slow(), block: Arc::new(*msg) };
self.emit_broadcast(PeerMessage::NewBlock(block));
self.try_emit_broadcast(PeerMessage::NewBlock(block)).into()
}
EthMessage::Transactions(msg) => {
self.emit_broadcast(PeerMessage::ReceivedTransaction(msg));
self.try_emit_broadcast(PeerMessage::ReceivedTransaction(msg)).into()
}
EthMessage::NewPooledTransactionHashes(msg) => {
self.emit_broadcast(PeerMessage::PooledTransactions(msg));
self.try_emit_broadcast(PeerMessage::PooledTransactions(msg)).into()
}
EthMessage::GetBlockHeaders(req) => {
on_request!(req, BlockHeaders, GetBlockHeaders);
on_request!(req, BlockHeaders, GetBlockHeaders)
}
EthMessage::BlockHeaders(resp) => {
on_response!(resp, GetBlockHeaders);
on_response!(resp, GetBlockHeaders)
}
EthMessage::GetBlockBodies(req) => {
on_request!(req, BlockBodies, GetBlockBodies);
on_request!(req, BlockBodies, GetBlockBodies)
}
EthMessage::BlockBodies(resp) => {
on_response!(resp, GetBlockBodies);
on_response!(resp, GetBlockBodies)
}
EthMessage::GetPooledTransactions(req) => {
on_request!(req, PooledTransactions, GetPooledTransactions);
on_request!(req, PooledTransactions, GetPooledTransactions)
}
EthMessage::PooledTransactions(resp) => {
on_response!(resp, GetPooledTransactions);
on_response!(resp, GetPooledTransactions)
}
EthMessage::GetNodeData(req) => {
on_request!(req, NodeData, GetNodeData);
on_request!(req, NodeData, GetNodeData)
}
EthMessage::NodeData(resp) => {
on_response!(resp, GetNodeData);
on_response!(resp, GetNodeData)
}
EthMessage::GetReceipts(req) => {
on_request!(req, Receipts, GetReceipts);
on_request!(req, Receipts, GetReceipts)
}
EthMessage::Receipts(resp) => {
on_response!(resp, GetReceipts);
on_response!(resp, GetReceipts)
}
};
None
}
}
/// Handle an internal peer request that will be sent to the remote.
@@ -275,38 +272,57 @@ impl ActiveSession {
}
}
/// Send a message back to the [`SessionManager`](super::SessionManager)
fn emit_broadcast(&self, message: PeerMessage) {
let _ = self
/// Send a message back to the [`SessionManager`](super::SessionManager).
///
/// Returns the message if the bounded channel is currently unable to handle this message.
#[allow(clippy::result_large_err)]
fn try_emit_broadcast(&self, message: PeerMessage) -> Result<(), ActiveSessionMessage> {
match self
.to_session
.try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message })
.map_err(|err| {
{
Ok(_) => Ok(()),
Err(err) => {
trace!(
target : "net",
%err,
"dropping incoming broadcast",
"no capacity for incoming broadcast",
);
});
match err {
TrySendError::Full(msg) => Err(msg),
TrySendError::Closed(_) => Ok(()),
}
}
}
}
/// Send a message back to the [`SessionManager`](super::SessionManager)
/// covering both broadcasts and incoming requests
fn emit_request(
&self,
message: PeerMessage,
) -> Result<(), mpsc::error::TrySendError<ActiveSessionMessage>> {
self.to_session
// we want this message to always arrive, so we clone the sender
.clone()
/// covering both broadcasts and incoming requests.
///
/// Returns the message if the bounded channel is currently unable to handle this message.
#[allow(clippy::result_large_err)]
fn try_emit_request(&self, message: PeerMessage) -> Result<(), ActiveSessionMessage> {
match self
.to_session
.try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message })
.map_err(|err| {
warn!(
{
Ok(_) => Ok(()),
Err(err) => {
trace!(
target : "net",
%err,
"dropping incoming request",
"no capacity for incoming request",
);
err
})
match err {
TrySendError::Full(msg) => Err(msg),
TrySendError::Closed(_) => {
// Note: this would mean the `SessionManager` was dropped, which is already
// handled by checking if the command receiver channel has been closed.
Ok(())
}
}
}
}
}
/// Notify the manager that the peer sent a bad message
@@ -498,7 +514,27 @@ impl Future for ActiveSession {
}
}
loop {
// read incoming messages from the wire
'receive: loop {
// try to resend the pending message that we could not send because the channel was
// full.
if let Some(msg) = this.pending_message_to_session.take() {
match this.to_session.try_send(msg) {
Ok(_) => {}
Err(err) => {
match err {
TrySendError::Full(msg) => {
this.pending_message_to_session = Some(msg);
// ensure we're woken up again
cx.waker().wake_by_ref();
break 'receive
}
TrySendError::Closed(_) => {}
}
}
}
}
match this.conn.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
@@ -511,15 +547,25 @@ impl Future for ActiveSession {
}
}
Poll::Ready(Some(res)) => {
progress = true;
match res {
Ok(msg) => {
trace!(target: "net::session", msg_id=?msg.message_id(), remote_peer_id=?this.remote_peer_id, "received eth message");
// decode and handle message
if let Some((err, bad_protocol_msg)) = this.on_incoming(msg) {
error!(target: "net::session", ?err, msg=?bad_protocol_msg, remote_peer_id=?this.remote_peer_id, "received invalid protocol message");
this.close_on_error(err);
return Poll::Ready(())
match this.on_incoming(msg) {
OnIncomingMessageOutcome::Ok => {
// handled successfully
progress = true;
}
OnIncomingMessageOutcome::BadMessage { error, message } => {
error!(target: "net::session", ?error, msg=?message, remote_peer_id=?this.remote_peer_id, "received invalid protocol message");
this.close_on_error(error);
return Poll::Ready(())
}
OnIncomingMessageOutcome::NoCapacity(msg) => {
// failed to send due to lack of capacity
this.pending_message_to_session = Some(msg);
continue 'receive
}
}
}
Err(err) => {
@@ -594,6 +640,25 @@ impl InflightRequest {
}
}
/// All outcome variants when handling an incoming message
enum OnIncomingMessageOutcome {
/// Message successfully handled.
Ok,
/// Message is considered to be in violation fo the protocol
BadMessage { error: EthStreamError, message: EthMessage },
/// Currently no capacity to handle the message
NoCapacity(ActiveSessionMessage),
}
impl From<Result<(), ActiveSessionMessage>> for OnIncomingMessageOutcome {
fn from(res: Result<(), ActiveSessionMessage>) -> Self {
match res {
Ok(_) => OnIncomingMessageOutcome::Ok,
Err(msg) => OnIncomingMessageOutcome::NoCapacity(msg),
}
}
}
enum RequestState {
/// Waiting for the response
Waiting(PeerRequest),
@@ -640,7 +705,7 @@ mod tests {
use reth_primitives::{ForkFilter, Hardfork, MAINNET};
use secp256k1::{SecretKey, SECP256K1};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::{net::TcpListener, sync::mpsc};
/// Returns a testing `HelloMessage` and new secretkey
fn eth_hello(server_key: &SecretKey) -> HelloMessage {
@@ -751,6 +816,7 @@ mod tests {
self.active_session_tx.clone(),
"network_active_session",
),
pending_message_to_session: None,
request_tx: ReceiverStream::new(messages_rx).fuse(),
inflight_requests: Default::default(),
conn,
@@ -951,6 +1017,63 @@ mod tests {
rx.await.unwrap();
}
// This tests that incoming messages are delivered when there's capacity.
#[tokio::test(flavor = "multi_thread")]
async fn test_send_at_capacity() {
let mut builder = SessionBuilder::default();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move {
client_stream
.send(EthMessage::NewPooledTransactionHashes(NewPooledTransactionHashes(vec![])))
.await
.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(100), client_stream.next()).await;
});
tokio::task::spawn(fut);
let (incoming, _) = listener.accept().await.unwrap();
let session = builder.connect_incoming(incoming).await;
// fill the entire message buffer with an unrelated message
let mut num_fill_messages = 0;
loop {
if builder
.active_session_tx
.try_send(ActiveSessionMessage::ProtocolBreach { peer_id: PeerId::random() })
.is_err()
{
break
}
num_fill_messages += 1;
}
tokio::task::spawn(async move {
session.await;
});
tokio::time::sleep(Duration::from_millis(100)).await;
for _ in 0..num_fill_messages {
let message = builder.active_session_rx.next().await.unwrap();
match message {
ActiveSessionMessage::ProtocolBreach { .. } => {}
_ => unreachable!(),
}
}
let message = builder.active_session_rx.next().await.unwrap();
match message {
ActiveSessionMessage::ValidMessage {
message: PeerMessage::PooledTransactions(_),
..
} => {}
_ => unreachable!(),
}
}
#[test]
fn timeout_calculation_sanity_tests() {
let rtt = Duration::from_secs(5);

View File

@@ -417,6 +417,7 @@ impl SessionManager {
session_id,
commands_rx: ReceiverStream::new(commands_rx),
to_session: self.active_session_tx.clone(),
pending_message_to_session: None,
request_tx: ReceiverStream::new(messages_rx).fuse(),
inflight_requests: Default::default(),
conn,