feat(net): adaptable request timeouts (#789)

This commit is contained in:
LambdaClass
2023-01-12 07:10:14 -03:00
committed by GitHub
parent 5b63437039
commit 4460dc7b25
2 changed files with 69 additions and 15 deletions

View File

@@ -3,6 +3,7 @@
use crate::{
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult},
session::{
config::INITIAL_REQUEST_TIMEOUT,
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
},
@@ -75,6 +76,17 @@ pub(crate) struct ActiveSession {
pub(crate) timeout_interval: Interval,
}
/// Constants for timeout updating
/// Minimum timeout value
const MINIMUM_TIMEOUT: Duration = Duration::from_millis(1);
/// Maximum timeout value
const MAXIMUM_TIMEOUT: Duration = INITIAL_REQUEST_TIMEOUT;
/// How much the new measurements affect the current timeout (X percent)
const SAMPLE_IMPACT: f64 = 0.1;
/// Amount of RTTs before timeout
const TIMEOUT_SCALING: u32 = 3;
impl ActiveSession {
/// Returns `true` if the session is currently in the process of disconnecting
fn is_disconnecting(&self) -> bool {
@@ -94,7 +106,7 @@ impl ActiveSession {
fn on_incoming(&mut self, msg: EthMessage) -> Option<(EthStreamError, EthMessage)> {
/// A macro that handles an incoming request
/// This creates a new channel and tries to send the sender half to the session while
/// storing to receiver half internally so the pending response can be polled.
/// storing the receiver half internally so the pending response can be polled.
macro_rules! on_request {
($req:ident, $resp_item:ident, $req_item:ident) => {
let RequestPair { request_id, message: request } = $req;
@@ -118,18 +130,19 @@ impl ActiveSession {
/// Processes a response received from the peer
macro_rules! on_response {
($this:ident, $resp:ident, $item:ident) => {
($resp:ident, $item:ident) => {
let RequestPair { request_id, message } = $resp;
#[allow(clippy::collapsible_match)]
if let Some(req) = self.inflight_requests.remove(&request_id) {
if let PeerRequest::$item { response, .. } = req.request {
let _ = response.send(Ok(message));
self.update_request_timeout(req.timestamp, Instant::now())
} else {
req.request.send_bad_response();
$this.on_bad_message();
self.on_bad_message();
}
} else {
$this.on_bad_message()
self.on_bad_message()
}
};
}
@@ -159,31 +172,31 @@ impl ActiveSession {
on_request!(req, BlockHeaders, GetBlockHeaders);
}
EthMessage::BlockHeaders(resp) => {
on_response!(self, resp, GetBlockHeaders);
on_response!(resp, GetBlockHeaders);
}
EthMessage::GetBlockBodies(req) => {
on_request!(req, BlockBodies, GetBlockBodies);
}
EthMessage::BlockBodies(resp) => {
on_response!(self, resp, GetBlockBodies);
on_response!(resp, GetBlockBodies);
}
EthMessage::GetPooledTransactions(req) => {
on_request!(req, PooledTransactions, GetPooledTransactions);
}
EthMessage::PooledTransactions(resp) => {
on_response!(self, resp, GetPooledTransactions);
on_response!(resp, GetPooledTransactions);
}
EthMessage::GetNodeData(req) => {
on_request!(req, NodeData, GetNodeData);
}
EthMessage::NodeData(resp) => {
on_response!(self, resp, GetNodeData);
on_response!(resp, GetNodeData);
}
EthMessage::GetReceipts(req) => {
on_request!(req, Receipts, GetReceipts);
}
EthMessage::Receipts(resp) => {
on_response!(self, resp, GetReceipts);
on_response!(resp, GetReceipts);
}
};
@@ -195,7 +208,7 @@ impl ActiveSession {
let request_id = self.next_id();
let msg = request.create_request_message(request_id);
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest { request, deadline };
let req = InflightRequest { request, timestamp: Instant::now(), deadline };
self.inflight_requests.insert(request_id, req);
}
@@ -334,9 +347,29 @@ impl ActiveSession {
for id in timedout {
warn!(target: "net::session", ?id, remote_peer_id=?self.remote_peer_id, "timed out outgoing request");
let req = self.inflight_requests.remove(&id).expect("exists; qed");
self.update_request_timeout(req.timestamp, req.deadline);
req.request.send_err_response(RequestError::Timeout);
}
}
/// Updates the request timeout with a request's timestamps
fn update_request_timeout(&mut self, sent: Instant, received: Instant) {
let elapsed = received.saturating_duration_since(sent);
self.request_timeout = calculate_new_timeout(self.request_timeout, elapsed);
self.timeout_interval = tokio::time::interval(self.request_timeout);
}
}
/// Calculates a new timeout using an updated estimation of the RTT
#[inline]
fn calculate_new_timeout(current_timeout: Duration, estimated_rtt: Duration) -> Duration {
let new_timeout = estimated_rtt.mul_f64(SAMPLE_IMPACT) * TIMEOUT_SCALING;
// this dampens sudden changes by taking a weighted mean of the old and new values
let smoothened_timeout = current_timeout.mul_f64(1.0 - SAMPLE_IMPACT) + new_timeout;
smoothened_timeout.clamp(MINIMUM_TIMEOUT, MAXIMUM_TIMEOUT)
}
impl Future for ActiveSession {
@@ -490,7 +523,11 @@ pub(crate) struct ReceivedRequest {
/// A request that waits for a response from the peer
pub(crate) struct InflightRequest {
/// Request sent to peer
request: PeerRequest,
/// Instant when the request was sent
timestamp: Instant,
/// Time limit for the response
deadline: Instant,
}
@@ -519,7 +556,8 @@ mod tests {
#![allow(dead_code)]
use super::*;
use crate::session::{
config::REQUEST_TIMEOUT, handle::PendingSessionEvent, start_pending_incoming_session,
config::INITIAL_REQUEST_TIMEOUT, handle::PendingSessionEvent,
start_pending_incoming_session,
};
use reth_ecies::util::pk2id;
use reth_eth_wire::{
@@ -643,8 +681,8 @@ mod tests {
conn,
queued_outgoing: Default::default(),
received_requests: Default::default(),
timeout_interval: tokio::time::interval(REQUEST_TIMEOUT),
request_timeout: REQUEST_TIMEOUT,
timeout_interval: tokio::time::interval(INITIAL_REQUEST_TIMEOUT),
request_timeout: INITIAL_REQUEST_TIMEOUT,
}
}
_ => {
@@ -790,4 +828,20 @@ mod tests {
rx.await.unwrap();
}
#[test]
fn timeout_calculation_sanity_tests() {
let rtt = Duration::from_millis(200);
// timeout for an RTT of `rtt`
let timeout = rtt * TIMEOUT_SCALING;
// if rtt hasn't changed, timeout shouldn't change
assert!(calculate_new_timeout(timeout, rtt) == timeout);
// if rtt changed, the new timeout should change less than it
assert!(calculate_new_timeout(timeout, rtt / 2) < timeout);
assert!(calculate_new_timeout(timeout, rtt / 2) > timeout / 2);
assert!(calculate_new_timeout(timeout, rtt * 2) > timeout);
assert!(calculate_new_timeout(timeout, rtt * 2) < timeout * 2);
}
}

View File

@@ -6,7 +6,7 @@ use std::time::Duration;
/// Default request timeout for a single request.
///
/// This represents the time we wait for a response until we consider it timed out.
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
pub const INITIAL_REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
/// Configuration options when creating a [SessionManager](crate::session::SessionManager).
pub struct SessionsConfig {
@@ -34,7 +34,7 @@ impl Default for SessionsConfig {
// `poll`.
session_event_buffer: 128,
limits: Default::default(),
request_timeout: REQUEST_TIMEOUT,
request_timeout: INITIAL_REQUEST_TIMEOUT,
}
}
}