chore(net): improve naming for request handling (#1196)

This commit is contained in:
Matthias Seitz
2023-02-06 19:17:50 +01:00
committed by GitHub
parent c77de627d1
commit d99ba4b4e2
2 changed files with 17 additions and 16 deletions

View File

@@ -56,7 +56,8 @@ const TIMEOUT_SCALING: u32 = 3;
///
/// It listens for
/// - incoming commands from the [`SessionManager`](super::SessionManager)
/// - incoming requests via the request channel
/// - incoming _internal_ requests/broadcasts via the request/command channel
/// - incoming requests/broadcasts _from remote_ via the connection
/// - responses for handled ETH requests received from the remote peer.
#[allow(unused)]
pub(crate) struct ActiveSession {
@@ -79,11 +80,11 @@ pub(crate) struct ActiveSession {
/// A message that needs to be delivered to the session manager
pub(crate) pending_message_to_session: Option<ActiveSessionMessage>,
/// Incoming request to send to delegate to the remote peer.
pub(crate) request_tx: Fuse<ReceiverStream<PeerRequest>>,
pub(crate) internal_request_tx: Fuse<ReceiverStream<PeerRequest>>,
/// All requests sent to the remote peer we're waiting on a response
pub(crate) inflight_requests: FnvHashMap<u64, InflightRequest>,
/// All requests that were sent by the remote peer.
pub(crate) received_requests: Vec<ReceivedRequest>,
pub(crate) received_requests_from_remote: Vec<ReceivedRequest>,
/// Buffered messages that should be handled and sent to the peer.
pub(crate) queued_outgoing: VecDeque<OutgoingMessage>,
/// The maximum time we wait for a response from a peer.
@@ -124,7 +125,7 @@ impl ActiveSession {
rx: PeerResponse::$resp_item { response },
received: Instant::now(),
};
self.received_requests.push(received);
self.received_requests_from_remote.push(received);
self.try_emit_request(PeerMessage::EthRequest(PeerRequest::$req_item {
request,
response: tx,
@@ -214,7 +215,7 @@ impl ActiveSession {
}
/// Handle an internal peer request that will be sent to the remote.
fn on_peer_request(&mut self, request: PeerRequest, deadline: Instant) {
fn on_internal_peer_request(&mut self, request: PeerRequest, deadline: Instant) {
let request_id = self.next_id();
let msg = request.create_request_message(request_id);
self.queued_outgoing.push_back(msg.into());
@@ -240,7 +241,7 @@ impl ActiveSession {
}
PeerMessage::EthRequest(req) => {
let deadline = self.request_deadline();
self.on_peer_request(req, deadline);
self.on_internal_peer_request(req, deadline);
}
PeerMessage::SendTransactions(msg) => {
self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into());
@@ -474,19 +475,19 @@ impl Future for ActiveSession {
let deadline = this.request_deadline();
while let Poll::Ready(Some(req)) = this.request_tx.poll_next_unpin(cx) {
while let Poll::Ready(Some(req)) = this.internal_request_tx.poll_next_unpin(cx) {
progress = true;
this.on_peer_request(req, deadline);
this.on_internal_peer_request(req, deadline);
}
// Advance all active requests.
// We remove each request one by one and add them back.
for idx in (0..this.received_requests.len()).rev() {
let mut req = this.received_requests.swap_remove(idx);
for idx in (0..this.received_requests_from_remote.len()).rev() {
let mut req = this.received_requests_from_remote.swap_remove(idx);
match req.rx.poll(cx) {
Poll::Pending => {
// not ready yet
this.received_requests.push(req);
this.received_requests_from_remote.push(req);
}
Poll::Ready(resp) => {
this.handle_outgoing_response(req.request_id, resp);
@@ -817,11 +818,11 @@ mod tests {
"network_active_session",
),
pending_message_to_session: None,
request_tx: ReceiverStream::new(messages_rx).fuse(),
internal_request_tx: ReceiverStream::new(messages_rx).fuse(),
inflight_requests: Default::default(),
conn,
queued_outgoing: Default::default(),
received_requests: Default::default(),
received_requests_from_remote: Default::default(),
internal_request_timeout_interval: tokio::time::interval(
INITIAL_REQUEST_TIMEOUT,
),
@@ -976,7 +977,7 @@ mod tests {
session.protocol_breach_request_timeout = drop_timeout;
let (tx, rx) = oneshot::channel();
let req = PeerRequest::GetBlockBodies { request: GetBlockBodies(vec![]), response: tx };
session.on_peer_request(req, Instant::now());
session.on_internal_peer_request(req, Instant::now());
tokio::spawn(session);
let err = rx.await.unwrap().unwrap_err();

View File

@@ -418,11 +418,11 @@ impl SessionManager {
commands_rx: ReceiverStream::new(commands_rx),
to_session: self.active_session_tx.clone(),
pending_message_to_session: None,
request_tx: ReceiverStream::new(messages_rx).fuse(),
internal_request_tx: ReceiverStream::new(messages_rx).fuse(),
inflight_requests: Default::default(),
conn,
queued_outgoing: Default::default(),
received_requests: Default::default(),
received_requests_from_remote: Default::default(),
internal_request_timeout_interval: tokio::time::interval(
self.initial_internal_request_timeout,
),