feat(net): add request timeout (#273)

This commit is contained in:
Matthias Seitz
2022-11-28 23:41:39 +01:00
committed by GitHub
parent 4d708ce8af
commit b5300aafec
4 changed files with 68 additions and 17 deletions

View File

@@ -11,7 +11,6 @@ use reth_primitives::{Header, PeerId, H256, U256};
use std::{
collections::{HashMap, VecDeque},
task::{Context, Poll},
time::Instant,
};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -160,10 +159,9 @@ impl StateFetcher {
peer.state = req.peer_state();
}
let started = Instant::now();
match req {
DownloadRequest::GetBlockHeaders { request, response } => {
let inflight = Request { request, response, started };
let inflight = Request { request, response };
self.inflight_headers_requests.insert(peer_id, inflight);
unimplemented!("unify start types");
@@ -177,7 +175,7 @@ impl StateFetcher {
// })
}
DownloadRequest::GetBlockBodies { request, response } => {
let inflight = Request { request: request.clone(), response, started };
let inflight = Request { request: request.clone(), response };
self.inflight_bodies_requests.insert(peer_id, inflight);
BlockRequest::GetBlockBodies(GetBlockBodies(request))
}
@@ -314,7 +312,6 @@ impl PeerState {
struct Request<Req, Resp> {
request: Req,
response: oneshot::Sender<Resp>,
started: Instant,
}
/// A message to update the status.

View File

@@ -16,6 +16,7 @@ use reth_eth_wire::{
message::{EthBroadcastMessage, RequestPair},
DisconnectReason, EthMessage, EthStream, P2PStream,
};
use reth_interfaces::p2p::error::RequestError;
use reth_primitives::PeerId;
use std::{
collections::VecDeque,
@@ -24,11 +25,12 @@ use std::{
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::Instant,
time::{Duration, Instant},
};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
time::Interval,
};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, warn};
@@ -60,11 +62,15 @@ pub(crate) struct ActiveSession {
/// Incoming request to send to delegate to the remote peer.
pub(crate) request_tx: Fuse<ReceiverStream<PeerRequest>>,
/// All requests sent to the remote peer we're waiting on a response
pub(crate) inflight_requests: FnvHashMap<u64, PeerRequest>,
pub(crate) inflight_requests: FnvHashMap<u64, InflightRequest>,
/// All requests that were sent by the remote peer.
pub(crate) received_requests: Vec<ReceivedRequest>,
/// Buffered messages that should be handled and sent to the peer.
pub(crate) queued_outgoing: VecDeque<OutgoingMessage>,
/// The maximum time we wait for a response from a peer.
pub(crate) request_timeout: Duration,
/// Interval when to check for timed out requests.
pub(crate) timeout_interval: Interval,
}
impl ActiveSession {
@@ -113,11 +119,11 @@ impl ActiveSession {
($this:ident, $resp:ident, $item:ident) => {
let RequestPair { request_id, message } = $resp;
#[allow(clippy::collapsible_match)]
if let Some(resp) = self.inflight_requests.remove(&request_id) {
if let PeerRequest::$item { response, .. } = resp {
if let Some(req) = self.inflight_requests.remove(&request_id) {
if let PeerRequest::$item { response, .. } = req.request {
let _ = response.send(Ok(message));
} else {
resp.send_bad_response();
req.request.send_bad_response();
$this.on_bad_message();
}
} else {
@@ -180,10 +186,11 @@ impl ActiveSession {
}
/// Handle an incoming peer request.
fn on_peer_request(&mut self, req: PeerRequest) {
fn on_peer_request(&mut self, request: PeerRequest, deadline: Instant) {
let request_id = self.next_id();
let msg = req.create_request_message(request_id);
let msg = request.create_request_message(request_id);
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest { request, deadline };
self.inflight_requests.insert(request_id, req);
}
@@ -204,12 +211,18 @@ impl ActiveSession {
.push_back(EthBroadcastMessage::NewPooledTransactionHashes(msg).into());
}
PeerMessage::EthRequest(req) => {
self.on_peer_request(req);
let deadline = self.request_deadline();
self.on_peer_request(req, deadline);
}
PeerMessage::Other(_) => {}
}
}
/// Returns the deadline timestamp at which the request times out
fn request_deadline(&self) -> Instant {
Instant::now() + self.request_timeout
}
/// Handle a Response to the peer
fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult) {
match resp.try_into_message(id) {
@@ -272,6 +285,20 @@ impl ActiveSession {
fn start_disconnect(&mut self, reason: DisconnectReason) {
self.conn.inner_mut().start_disconnect(reason);
}
/// Removes all timed out requests
fn evict_timed_out_requests(&mut self, now: Instant) {
let mut timedout = Vec::new();
for (id, req) in self.inflight_requests.iter() {
if now > req.deadline {
timedout.push(*id)
}
}
for id in timedout {
let req = self.inflight_requests.remove(&id).expect("exists; qed");
req.request.send_err_response(RequestError::Timeout);
}
}
}
impl Future for ActiveSession {
@@ -315,9 +342,11 @@ impl Future for ActiveSession {
}
}
let deadline = this.request_deadline();
while let Poll::Ready(Some(req)) = this.request_tx.poll_next_unpin(cx) {
progress = true;
this.on_peer_request(req);
this.on_peer_request(req, deadline);
}
// Advance all active requests.
@@ -385,6 +414,9 @@ impl Future for ActiveSession {
}
if !progress {
// check for timed out requests
this.evict_timed_out_requests(Instant::now());
return Poll::Pending
}
}
@@ -401,6 +433,12 @@ pub(crate) struct ReceivedRequest {
received: Instant,
}
/// A request that waits for a response from the peer
pub(crate) struct InflightRequest {
request: PeerRequest,
deadline: Instant,
}
/// Outgoing messages that can be sent over the wire.
pub(crate) enum OutgoingMessage {
/// A message that is owned.
@@ -424,7 +462,9 @@ impl From<EthBroadcastMessage> for OutgoingMessage {
#[cfg(test)]
mod tests {
use super::*;
use crate::session::{handle::PendingSessionEvent, start_pending_incoming_session};
use crate::session::{
config::REQUEST_TIMEOUT, handle::PendingSessionEvent, start_pending_incoming_session,
};
use reth_ecies::util::pk2id;
use reth_eth_wire::{
EthVersion, HelloMessage, NewPooledTransactionHashes, ProtocolVersion, Status,
@@ -543,6 +583,8 @@ mod tests {
conn,
queued_outgoing: Default::default(),
received_requests: Default::default(),
timeout_interval: tokio::time::interval(REQUEST_TIMEOUT),
request_timeout: REQUEST_TIMEOUT,
}
}
_ => {

View File

@@ -1,6 +1,10 @@
//! Configuration types for [`SessionsManager`]
use crate::session::{Direction, ExceedsSessionLimit};
use std::time::Duration;
/// Default request timeout.
pub const REQUEST_TIMEOUT: Duration = Duration::from_millis(500u64);
/// Configuration options when creating a [`SessionsManager`].
pub struct SessionsConfig {
@@ -10,8 +14,10 @@ pub struct SessionsConfig {
pub session_event_buffer: usize,
/// Limits to enforce.
///
/// By default, no limits will be enforced
/// By default, no limits will be enforced.
pub limits: SessionLimits,
/// The maximum time we wait for a response from a peer.
pub request_timeout: Duration,
}
impl Default for SessionsConfig {
@@ -26,6 +32,7 @@ impl Default for SessionsConfig {
// `poll`.
session_event_buffer: 64,
limits: Default::default(),
request_timeout: REQUEST_TIMEOUT,
}
}
}

View File

@@ -28,7 +28,7 @@ use std::{
net::SocketAddr,
sync::Arc,
task::{Context, Poll},
time::Instant,
time::{Duration, Instant},
};
use tokio::{
net::TcpStream,
@@ -55,6 +55,8 @@ pub(crate) struct SessionManager {
next_id: usize,
/// Keeps track of all sessions
counter: SessionCounter,
/// The maximum time we wait for a response from a peer.
request_timeout: Duration,
/// The secret key used for authenticating sessions.
secret_key: SecretKey,
/// The node id of node
@@ -114,6 +116,7 @@ impl SessionManager {
Self {
next_id: 0,
counter: SessionCounter::new(config.limits),
request_timeout: config.request_timeout,
secret_key,
peer_id,
status,
@@ -333,6 +336,8 @@ impl SessionManager {
conn,
queued_outgoing: Default::default(),
received_requests: Default::default(),
timeout_interval: tokio::time::interval(self.request_timeout),
request_timeout: self.request_timeout,
};
self.spawn(session);