diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 4b40aa6743..bc06f1cc1b 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -56,7 +56,8 @@ const TIMEOUT_SCALING: u32 = 3; /// /// It listens for /// - incoming commands from the [`SessionManager`](super::SessionManager) -/// - incoming requests via the request channel +/// - incoming _internal_ requests/broadcasts via the request/command channel +/// - incoming requests/broadcasts _from remote_ via the connection /// - responses for handled ETH requests received from the remote peer. #[allow(unused)] pub(crate) struct ActiveSession { @@ -79,11 +80,11 @@ pub(crate) struct ActiveSession { /// A message that needs to be delivered to the session manager pub(crate) pending_message_to_session: Option, /// Incoming request to send to delegate to the remote peer. - pub(crate) request_tx: Fuse>, + pub(crate) internal_request_tx: Fuse>, /// All requests sent to the remote peer we're waiting on a response pub(crate) inflight_requests: FnvHashMap, /// All requests that were sent by the remote peer. - pub(crate) received_requests: Vec, + pub(crate) received_requests_from_remote: Vec, /// Buffered messages that should be handled and sent to the peer. pub(crate) queued_outgoing: VecDeque, /// The maximum time we wait for a response from a peer. @@ -124,7 +125,7 @@ impl ActiveSession { rx: PeerResponse::$resp_item { response }, received: Instant::now(), }; - self.received_requests.push(received); + self.received_requests_from_remote.push(received); self.try_emit_request(PeerMessage::EthRequest(PeerRequest::$req_item { request, response: tx, @@ -214,7 +215,7 @@ impl ActiveSession { } /// Handle an internal peer request that will be sent to the remote. - fn on_peer_request(&mut self, request: PeerRequest, deadline: Instant) { + fn on_internal_peer_request(&mut self, request: PeerRequest, deadline: Instant) { let request_id = self.next_id(); let msg = request.create_request_message(request_id); self.queued_outgoing.push_back(msg.into()); @@ -240,7 +241,7 @@ impl ActiveSession { } PeerMessage::EthRequest(req) => { let deadline = self.request_deadline(); - self.on_peer_request(req, deadline); + self.on_internal_peer_request(req, deadline); } PeerMessage::SendTransactions(msg) => { self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into()); @@ -474,19 +475,19 @@ impl Future for ActiveSession { let deadline = this.request_deadline(); - while let Poll::Ready(Some(req)) = this.request_tx.poll_next_unpin(cx) { + while let Poll::Ready(Some(req)) = this.internal_request_tx.poll_next_unpin(cx) { progress = true; - this.on_peer_request(req, deadline); + this.on_internal_peer_request(req, deadline); } // Advance all active requests. // We remove each request one by one and add them back. - for idx in (0..this.received_requests.len()).rev() { - let mut req = this.received_requests.swap_remove(idx); + for idx in (0..this.received_requests_from_remote.len()).rev() { + let mut req = this.received_requests_from_remote.swap_remove(idx); match req.rx.poll(cx) { Poll::Pending => { // not ready yet - this.received_requests.push(req); + this.received_requests_from_remote.push(req); } Poll::Ready(resp) => { this.handle_outgoing_response(req.request_id, resp); @@ -817,11 +818,11 @@ mod tests { "network_active_session", ), pending_message_to_session: None, - request_tx: ReceiverStream::new(messages_rx).fuse(), + internal_request_tx: ReceiverStream::new(messages_rx).fuse(), inflight_requests: Default::default(), conn, queued_outgoing: Default::default(), - received_requests: Default::default(), + received_requests_from_remote: Default::default(), internal_request_timeout_interval: tokio::time::interval( INITIAL_REQUEST_TIMEOUT, ), @@ -976,7 +977,7 @@ mod tests { session.protocol_breach_request_timeout = drop_timeout; let (tx, rx) = oneshot::channel(); let req = PeerRequest::GetBlockBodies { request: GetBlockBodies(vec![]), response: tx }; - session.on_peer_request(req, Instant::now()); + session.on_internal_peer_request(req, Instant::now()); tokio::spawn(session); let err = rx.await.unwrap().unwrap_err(); diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 020e5a7788..fcd399f515 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -418,11 +418,11 @@ impl SessionManager { commands_rx: ReceiverStream::new(commands_rx), to_session: self.active_session_tx.clone(), pending_message_to_session: None, - request_tx: ReceiverStream::new(messages_rx).fuse(), + internal_request_tx: ReceiverStream::new(messages_rx).fuse(), inflight_requests: Default::default(), conn, queued_outgoing: Default::default(), - received_requests: Default::default(), + received_requests_from_remote: Default::default(), internal_request_timeout_interval: tokio::time::interval( self.initial_internal_request_timeout, ),