diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 2683ded205..f548e826f5 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -3,6 +3,7 @@ use crate::{ message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult}, session::{ + config::INITIAL_REQUEST_TIMEOUT, handle::{ActiveSessionMessage, SessionCommand}, SessionId, }, @@ -75,6 +76,17 @@ pub(crate) struct ActiveSession { pub(crate) timeout_interval: Interval, } +/// Constants for timeout updating + +/// Minimum timeout value +const MINIMUM_TIMEOUT: Duration = Duration::from_millis(1); +/// Maximum timeout value +const MAXIMUM_TIMEOUT: Duration = INITIAL_REQUEST_TIMEOUT; +/// How much the new measurements affect the current timeout (X percent) +const SAMPLE_IMPACT: f64 = 0.1; +/// Amount of RTTs before timeout +const TIMEOUT_SCALING: u32 = 3; + impl ActiveSession { /// Returns `true` if the session is currently in the process of disconnecting fn is_disconnecting(&self) -> bool { @@ -94,7 +106,7 @@ impl ActiveSession { fn on_incoming(&mut self, msg: EthMessage) -> Option<(EthStreamError, EthMessage)> { /// A macro that handles an incoming request /// This creates a new channel and tries to send the sender half to the session while - /// storing to receiver half internally so the pending response can be polled. + /// storing the receiver half internally so the pending response can be polled. macro_rules! on_request { ($req:ident, $resp_item:ident, $req_item:ident) => { let RequestPair { request_id, message: request } = $req; @@ -118,18 +130,19 @@ impl ActiveSession { /// Processes a response received from the peer macro_rules! on_response { - ($this:ident, $resp:ident, $item:ident) => { + ($resp:ident, $item:ident) => { let RequestPair { request_id, message } = $resp; #[allow(clippy::collapsible_match)] if let Some(req) = self.inflight_requests.remove(&request_id) { if let PeerRequest::$item { response, .. } = req.request { let _ = response.send(Ok(message)); + self.update_request_timeout(req.timestamp, Instant::now()) } else { req.request.send_bad_response(); - $this.on_bad_message(); + self.on_bad_message(); } } else { - $this.on_bad_message() + self.on_bad_message() } }; } @@ -159,31 +172,31 @@ impl ActiveSession { on_request!(req, BlockHeaders, GetBlockHeaders); } EthMessage::BlockHeaders(resp) => { - on_response!(self, resp, GetBlockHeaders); + on_response!(resp, GetBlockHeaders); } EthMessage::GetBlockBodies(req) => { on_request!(req, BlockBodies, GetBlockBodies); } EthMessage::BlockBodies(resp) => { - on_response!(self, resp, GetBlockBodies); + on_response!(resp, GetBlockBodies); } EthMessage::GetPooledTransactions(req) => { on_request!(req, PooledTransactions, GetPooledTransactions); } EthMessage::PooledTransactions(resp) => { - on_response!(self, resp, GetPooledTransactions); + on_response!(resp, GetPooledTransactions); } EthMessage::GetNodeData(req) => { on_request!(req, NodeData, GetNodeData); } EthMessage::NodeData(resp) => { - on_response!(self, resp, GetNodeData); + on_response!(resp, GetNodeData); } EthMessage::GetReceipts(req) => { on_request!(req, Receipts, GetReceipts); } EthMessage::Receipts(resp) => { - on_response!(self, resp, GetReceipts); + on_response!(resp, GetReceipts); } }; @@ -195,7 +208,7 @@ impl ActiveSession { let request_id = self.next_id(); let msg = request.create_request_message(request_id); self.queued_outgoing.push_back(msg.into()); - let req = InflightRequest { request, deadline }; + let req = InflightRequest { request, timestamp: Instant::now(), deadline }; self.inflight_requests.insert(request_id, req); } @@ -334,9 +347,29 @@ impl ActiveSession { for id in timedout { warn!(target: "net::session", ?id, remote_peer_id=?self.remote_peer_id, "timed out outgoing request"); let req = self.inflight_requests.remove(&id).expect("exists; qed"); + self.update_request_timeout(req.timestamp, req.deadline); req.request.send_err_response(RequestError::Timeout); } } + + /// Updates the request timeout with a request's timestamps + fn update_request_timeout(&mut self, sent: Instant, received: Instant) { + let elapsed = received.saturating_duration_since(sent); + + self.request_timeout = calculate_new_timeout(self.request_timeout, elapsed); + self.timeout_interval = tokio::time::interval(self.request_timeout); + } +} + +/// Calculates a new timeout using an updated estimation of the RTT +#[inline] +fn calculate_new_timeout(current_timeout: Duration, estimated_rtt: Duration) -> Duration { + let new_timeout = estimated_rtt.mul_f64(SAMPLE_IMPACT) * TIMEOUT_SCALING; + + // this dampens sudden changes by taking a weighted mean of the old and new values + let smoothened_timeout = current_timeout.mul_f64(1.0 - SAMPLE_IMPACT) + new_timeout; + + smoothened_timeout.clamp(MINIMUM_TIMEOUT, MAXIMUM_TIMEOUT) } impl Future for ActiveSession { @@ -490,7 +523,11 @@ pub(crate) struct ReceivedRequest { /// A request that waits for a response from the peer pub(crate) struct InflightRequest { + /// Request sent to peer request: PeerRequest, + /// Instant when the request was sent + timestamp: Instant, + /// Time limit for the response deadline: Instant, } @@ -519,7 +556,8 @@ mod tests { #![allow(dead_code)] use super::*; use crate::session::{ - config::REQUEST_TIMEOUT, handle::PendingSessionEvent, start_pending_incoming_session, + config::INITIAL_REQUEST_TIMEOUT, handle::PendingSessionEvent, + start_pending_incoming_session, }; use reth_ecies::util::pk2id; use reth_eth_wire::{ @@ -643,8 +681,8 @@ mod tests { conn, queued_outgoing: Default::default(), received_requests: Default::default(), - timeout_interval: tokio::time::interval(REQUEST_TIMEOUT), - request_timeout: REQUEST_TIMEOUT, + timeout_interval: tokio::time::interval(INITIAL_REQUEST_TIMEOUT), + request_timeout: INITIAL_REQUEST_TIMEOUT, } } _ => { @@ -790,4 +828,20 @@ mod tests { rx.await.unwrap(); } + + #[test] + fn timeout_calculation_sanity_tests() { + let rtt = Duration::from_millis(200); + // timeout for an RTT of `rtt` + let timeout = rtt * TIMEOUT_SCALING; + + // if rtt hasn't changed, timeout shouldn't change + assert!(calculate_new_timeout(timeout, rtt) == timeout); + + // if rtt changed, the new timeout should change less than it + assert!(calculate_new_timeout(timeout, rtt / 2) < timeout); + assert!(calculate_new_timeout(timeout, rtt / 2) > timeout / 2); + assert!(calculate_new_timeout(timeout, rtt * 2) > timeout); + assert!(calculate_new_timeout(timeout, rtt * 2) < timeout * 2); + } } diff --git a/crates/net/network/src/session/config.rs b/crates/net/network/src/session/config.rs index 6401a5a6eb..6a5a1ba4dc 100644 --- a/crates/net/network/src/session/config.rs +++ b/crates/net/network/src/session/config.rs @@ -6,7 +6,7 @@ use std::time::Duration; /// Default request timeout for a single request. /// /// This represents the time we wait for a response until we consider it timed out. -pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20); +pub const INITIAL_REQUEST_TIMEOUT: Duration = Duration::from_secs(20); /// Configuration options when creating a [SessionManager](crate::session::SessionManager). pub struct SessionsConfig { @@ -34,7 +34,7 @@ impl Default for SessionsConfig { // `poll`. session_event_buffer: 128, limits: Default::default(), - request_timeout: REQUEST_TIMEOUT, + request_timeout: INITIAL_REQUEST_TIMEOUT, } } }