From b5300aafec51d8f1c11e7aa6dfd5915f86db9d8e Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 28 Nov 2022 23:41:39 +0100 Subject: [PATCH] feat(net): add request timeout (#273) --- crates/net/network/src/fetch/mod.rs | 7 +-- crates/net/network/src/session/active.rs | 62 ++++++++++++++++++++---- crates/net/network/src/session/config.rs | 9 +++- crates/net/network/src/session/mod.rs | 7 ++- 4 files changed, 68 insertions(+), 17 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 12d92145eb..acd2e6e629 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -11,7 +11,6 @@ use reth_primitives::{Header, PeerId, H256, U256}; use std::{ collections::{HashMap, VecDeque}, task::{Context, Poll}, - time::Instant, }; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -160,10 +159,9 @@ impl StateFetcher { peer.state = req.peer_state(); } - let started = Instant::now(); match req { DownloadRequest::GetBlockHeaders { request, response } => { - let inflight = Request { request, response, started }; + let inflight = Request { request, response }; self.inflight_headers_requests.insert(peer_id, inflight); unimplemented!("unify start types"); @@ -177,7 +175,7 @@ impl StateFetcher { // }) } DownloadRequest::GetBlockBodies { request, response } => { - let inflight = Request { request: request.clone(), response, started }; + let inflight = Request { request: request.clone(), response }; self.inflight_bodies_requests.insert(peer_id, inflight); BlockRequest::GetBlockBodies(GetBlockBodies(request)) } @@ -314,7 +312,6 @@ impl PeerState { struct Request { request: Req, response: oneshot::Sender, - started: Instant, } /// A message to update the status. diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 53f4eea2d2..74ee2ca793 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -16,6 +16,7 @@ use reth_eth_wire::{ message::{EthBroadcastMessage, RequestPair}, DisconnectReason, EthMessage, EthStream, P2PStream, }; +use reth_interfaces::p2p::error::RequestError; use reth_primitives::PeerId; use std::{ collections::VecDeque, @@ -24,11 +25,12 @@ use std::{ pin::Pin, sync::Arc, task::{ready, Context, Poll}, - time::Instant, + time::{Duration, Instant}, }; use tokio::{ net::TcpStream, sync::{mpsc, oneshot}, + time::Interval, }; use tokio_stream::wrappers::ReceiverStream; use tracing::{error, warn}; @@ -60,11 +62,15 @@ pub(crate) struct ActiveSession { /// Incoming request to send to delegate to the remote peer. pub(crate) request_tx: Fuse>, /// All requests sent to the remote peer we're waiting on a response - pub(crate) inflight_requests: FnvHashMap, + pub(crate) inflight_requests: FnvHashMap, /// All requests that were sent by the remote peer. pub(crate) received_requests: Vec, /// Buffered messages that should be handled and sent to the peer. pub(crate) queued_outgoing: VecDeque, + /// The maximum time we wait for a response from a peer. + pub(crate) request_timeout: Duration, + /// Interval when to check for timed out requests. + pub(crate) timeout_interval: Interval, } impl ActiveSession { @@ -113,11 +119,11 @@ impl ActiveSession { ($this:ident, $resp:ident, $item:ident) => { let RequestPair { request_id, message } = $resp; #[allow(clippy::collapsible_match)] - if let Some(resp) = self.inflight_requests.remove(&request_id) { - if let PeerRequest::$item { response, .. } = resp { + if let Some(req) = self.inflight_requests.remove(&request_id) { + if let PeerRequest::$item { response, .. } = req.request { let _ = response.send(Ok(message)); } else { - resp.send_bad_response(); + req.request.send_bad_response(); $this.on_bad_message(); } } else { @@ -180,10 +186,11 @@ impl ActiveSession { } /// Handle an incoming peer request. - fn on_peer_request(&mut self, req: PeerRequest) { + fn on_peer_request(&mut self, request: PeerRequest, deadline: Instant) { let request_id = self.next_id(); - let msg = req.create_request_message(request_id); + let msg = request.create_request_message(request_id); self.queued_outgoing.push_back(msg.into()); + let req = InflightRequest { request, deadline }; self.inflight_requests.insert(request_id, req); } @@ -204,12 +211,18 @@ impl ActiveSession { .push_back(EthBroadcastMessage::NewPooledTransactionHashes(msg).into()); } PeerMessage::EthRequest(req) => { - self.on_peer_request(req); + let deadline = self.request_deadline(); + self.on_peer_request(req, deadline); } PeerMessage::Other(_) => {} } } + /// Returns the deadline timestamp at which the request times out + fn request_deadline(&self) -> Instant { + Instant::now() + self.request_timeout + } + /// Handle a Response to the peer fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult) { match resp.try_into_message(id) { @@ -272,6 +285,20 @@ impl ActiveSession { fn start_disconnect(&mut self, reason: DisconnectReason) { self.conn.inner_mut().start_disconnect(reason); } + + /// Removes all timed out requests + fn evict_timed_out_requests(&mut self, now: Instant) { + let mut timedout = Vec::new(); + for (id, req) in self.inflight_requests.iter() { + if now > req.deadline { + timedout.push(*id) + } + } + for id in timedout { + let req = self.inflight_requests.remove(&id).expect("exists; qed"); + req.request.send_err_response(RequestError::Timeout); + } + } } impl Future for ActiveSession { @@ -315,9 +342,11 @@ impl Future for ActiveSession { } } + let deadline = this.request_deadline(); + while let Poll::Ready(Some(req)) = this.request_tx.poll_next_unpin(cx) { progress = true; - this.on_peer_request(req); + this.on_peer_request(req, deadline); } // Advance all active requests. @@ -385,6 +414,9 @@ impl Future for ActiveSession { } if !progress { + // check for timed out requests + this.evict_timed_out_requests(Instant::now()); + return Poll::Pending } } @@ -401,6 +433,12 @@ pub(crate) struct ReceivedRequest { received: Instant, } +/// A request that waits for a response from the peer +pub(crate) struct InflightRequest { + request: PeerRequest, + deadline: Instant, +} + /// Outgoing messages that can be sent over the wire. pub(crate) enum OutgoingMessage { /// A message that is owned. @@ -424,7 +462,9 @@ impl From for OutgoingMessage { #[cfg(test)] mod tests { use super::*; - use crate::session::{handle::PendingSessionEvent, start_pending_incoming_session}; + use crate::session::{ + config::REQUEST_TIMEOUT, handle::PendingSessionEvent, start_pending_incoming_session, + }; use reth_ecies::util::pk2id; use reth_eth_wire::{ EthVersion, HelloMessage, NewPooledTransactionHashes, ProtocolVersion, Status, @@ -543,6 +583,8 @@ mod tests { conn, queued_outgoing: Default::default(), received_requests: Default::default(), + timeout_interval: tokio::time::interval(REQUEST_TIMEOUT), + request_timeout: REQUEST_TIMEOUT, } } _ => { diff --git a/crates/net/network/src/session/config.rs b/crates/net/network/src/session/config.rs index f94a03e154..f6ce7b7570 100644 --- a/crates/net/network/src/session/config.rs +++ b/crates/net/network/src/session/config.rs @@ -1,6 +1,10 @@ //! Configuration types for [`SessionsManager`] use crate::session::{Direction, ExceedsSessionLimit}; +use std::time::Duration; + +/// Default request timeout. +pub const REQUEST_TIMEOUT: Duration = Duration::from_millis(500u64); /// Configuration options when creating a [`SessionsManager`]. pub struct SessionsConfig { @@ -10,8 +14,10 @@ pub struct SessionsConfig { pub session_event_buffer: usize, /// Limits to enforce. /// - /// By default, no limits will be enforced + /// By default, no limits will be enforced. pub limits: SessionLimits, + /// The maximum time we wait for a response from a peer. + pub request_timeout: Duration, } impl Default for SessionsConfig { @@ -26,6 +32,7 @@ impl Default for SessionsConfig { // `poll`. session_event_buffer: 64, limits: Default::default(), + request_timeout: REQUEST_TIMEOUT, } } } diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 05e1546fb5..78bf017499 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -28,7 +28,7 @@ use std::{ net::SocketAddr, sync::Arc, task::{Context, Poll}, - time::Instant, + time::{Duration, Instant}, }; use tokio::{ net::TcpStream, @@ -55,6 +55,8 @@ pub(crate) struct SessionManager { next_id: usize, /// Keeps track of all sessions counter: SessionCounter, + /// The maximum time we wait for a response from a peer. + request_timeout: Duration, /// The secret key used for authenticating sessions. secret_key: SecretKey, /// The node id of node @@ -114,6 +116,7 @@ impl SessionManager { Self { next_id: 0, counter: SessionCounter::new(config.limits), + request_timeout: config.request_timeout, secret_key, peer_id, status, @@ -333,6 +336,8 @@ impl SessionManager { conn, queued_outgoing: Default::default(), received_requests: Default::default(), + timeout_interval: tokio::time::interval(self.request_timeout), + request_timeout: self.request_timeout, }; self.spawn(session);