//! Capability messaging //! //! An `RLPx` stream is multiplexed via the prepended message-id of a framed message. //! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, use futures::FutureExt; use reth_eth_wire::{ capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockHeaders, EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, SharedTransactions, Transactions, }; use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_peers::PeerId; use reth_primitives::{ BlockBody, Bytes, Header, PooledTransactionsElement, ReceiptWithBloom, B256, }; use std::{ fmt, sync::Arc, task::{ready, Context, Poll}, }; use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot}; /// Internal form of a `NewBlock` message #[derive(Debug, Clone)] pub struct NewBlockMessage { /// Hash of the block pub hash: B256, /// Raw received message pub block: Arc, } // === impl NewBlockMessage === impl NewBlockMessage { /// Returns the block number of the block pub fn number(&self) -> u64 { self.block.block.header.number } } /// All Bi-directional eth-message variants that can be sent to a session or received from a /// session. #[derive(Debug)] pub enum PeerMessage { /// Announce new block hashes NewBlockHashes(NewBlockHashes), /// Broadcast new block. NewBlock(NewBlockMessage), /// Received transactions _from_ the peer ReceivedTransaction(Transactions), /// Broadcast transactions _from_ local _to_ a peer. SendTransactions(SharedTransactions), /// Send new pooled transactions PooledTransactions(NewPooledTransactionHashes), /// All `eth` request variants. EthRequest(PeerRequest), /// Other than eth namespace message Other(RawCapabilityMessage), } /// Request Variants that only target block related data. #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockRequest { /// Requests block headers from the peer. /// /// The response should be sent through the channel. GetBlockHeaders(GetBlockHeaders), /// Requests block bodies from the peer. /// /// The response should be sent through the channel. GetBlockBodies(GetBlockBodies), } /// Protocol related request messages that expect a response #[derive(Debug)] pub enum PeerRequest { /// Requests block headers from the peer. /// /// The response should be sent through the channel. GetBlockHeaders { /// The request for block headers. request: GetBlockHeaders, /// The channel to send the response for block headers. response: oneshot::Sender>, }, /// Requests block bodies from the peer. /// /// The response should be sent through the channel. GetBlockBodies { /// The request for block bodies. request: GetBlockBodies, /// The channel to send the response for block bodies. response: oneshot::Sender>, }, /// Requests pooled transactions from the peer. /// /// The response should be sent through the channel. GetPooledTransactions { /// The request for pooled transactions. request: GetPooledTransactions, /// The channel to send the response for pooled transactions. response: oneshot::Sender>, }, /// Requests `NodeData` from the peer. /// /// The response should be sent through the channel. GetNodeData { /// The request for `NodeData`. request: GetNodeData, /// The channel to send the response for `NodeData`. response: oneshot::Sender>, }, /// Requests receipts from the peer. /// /// The response should be sent through the channel. GetReceipts { /// The request for receipts. request: GetReceipts, /// The channel to send the response for receipts. response: oneshot::Sender>, }, } // === impl PeerRequest === impl PeerRequest { /// Invoked if we received a response which does not match the request pub(crate) fn send_bad_response(self) { self.send_err_response(RequestError::BadResponse) } /// Send an error back to the receiver. pub(crate) fn send_err_response(self, err: RequestError) { let _ = match self { Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(), Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(), Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(), Self::GetNodeData { response, .. } => response.send(Err(err)).ok(), Self::GetReceipts { response, .. } => response.send(Err(err)).ok(), }; } /// Returns the [`EthMessage`] for this type pub fn create_request_message(&self, request_id: u64) -> EthMessage { match self { Self::GetBlockHeaders { request, .. } => { EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request }) } Self::GetBlockBodies { request, .. } => { EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() }) } Self::GetPooledTransactions { request, .. } => { EthMessage::GetPooledTransactions(RequestPair { request_id, message: request.clone(), }) } Self::GetNodeData { request, .. } => { EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() }) } Self::GetReceipts { request, .. } => { EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() }) } } } /// Consumes the type and returns the inner [`GetPooledTransactions`] variant. pub fn into_get_pooled_transactions(self) -> Option { match self { Self::GetPooledTransactions { request, .. } => Some(request), _ => None, } } } /// Corresponding variant for [`PeerRequest`]. #[derive(Debug)] pub enum PeerResponse { /// Represents a response to a request for block headers. BlockHeaders { /// The receiver channel for the response to a block headers request. response: oneshot::Receiver>, }, /// Represents a response to a request for block bodies. BlockBodies { /// The receiver channel for the response to a block bodies request. response: oneshot::Receiver>, }, /// Represents a response to a request for pooled transactions. PooledTransactions { /// The receiver channel for the response to a pooled transactions request. response: oneshot::Receiver>, }, /// Represents a response to a request for `NodeData`. NodeData { /// The receiver channel for the response to a `NodeData` request. response: oneshot::Receiver>, }, /// Represents a response to a request for receipts. Receipts { /// The receiver channel for the response to a receipts request. response: oneshot::Receiver>, }, } // === impl PeerResponse === impl PeerResponse { /// Polls the type to completion. pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { macro_rules! poll_request { ($response:ident, $item:ident, $cx:ident) => { match ready!($response.poll_unpin($cx)) { Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)), Err(err) => PeerResponseResult::$item(Err(err.into())), } }; } let res = match self { Self::BlockHeaders { response } => { poll_request!(response, BlockHeaders, cx) } Self::BlockBodies { response } => { poll_request!(response, BlockBodies, cx) } Self::PooledTransactions { response } => { poll_request!(response, PooledTransactions, cx) } Self::NodeData { response } => { poll_request!(response, NodeData, cx) } Self::Receipts { response } => { poll_request!(response, Receipts, cx) } }; Poll::Ready(res) } } /// All response variants for [`PeerResponse`] #[derive(Debug)] pub enum PeerResponseResult { /// Represents a result containing block headers or an error. BlockHeaders(RequestResult>), /// Represents a result containing block bodies or an error. BlockBodies(RequestResult>), /// Represents a result containing pooled transactions or an error. PooledTransactions(RequestResult>), /// Represents a result containing node data or an error. NodeData(RequestResult>), /// Represents a result containing receipts or an error. Receipts(RequestResult>>), } // === impl PeerResponseResult === impl PeerResponseResult { /// Converts this response into an [`EthMessage`] pub fn try_into_message(self, id: u64) -> RequestResult { macro_rules! to_message { ($response:ident, $item:ident, $request_id:ident) => { match $response { Ok(res) => { let request = RequestPair { request_id: $request_id, message: $item(res) }; Ok(EthMessage::$item(request)) } Err(err) => Err(err), } }; } match self { Self::BlockHeaders(resp) => { to_message!(resp, BlockHeaders, id) } Self::BlockBodies(resp) => { to_message!(resp, BlockBodies, id) } Self::PooledTransactions(resp) => { to_message!(resp, PooledTransactions, id) } Self::NodeData(resp) => { to_message!(resp, NodeData, id) } Self::Receipts(resp) => { to_message!(resp, Receipts, id) } } } /// Returns the `Err` value if the result is an error. pub fn err(&self) -> Option<&RequestError> { match self { Self::BlockHeaders(res) => res.as_ref().err(), Self::BlockBodies(res) => res.as_ref().err(), Self::PooledTransactions(res) => res.as_ref().err(), Self::NodeData(res) => res.as_ref().err(), Self::Receipts(res) => res.as_ref().err(), } } /// Returns whether this result is an error. pub fn is_err(&self) -> bool { self.err().is_some() } } /// A Cloneable connection for sending _requests_ directly to the session of a peer. #[derive(Clone)] pub struct PeerRequestSender { /// id of the remote node. pub(crate) peer_id: PeerId, /// The Sender half connected to a session. pub(crate) to_session_tx: mpsc::Sender, } // === impl PeerRequestSender === impl PeerRequestSender { /// Constructs a new sender instance that's wired to a session pub(crate) const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender) -> Self { Self { peer_id, to_session_tx } } /// Attempts to immediately send a message on this Sender pub fn try_send(&self, req: PeerRequest) -> Result<(), TrySendError> { self.to_session_tx.try_send(req) } /// Returns the peer id of the remote peer. pub const fn peer_id(&self) -> &PeerId { &self.peer_id } } impl fmt::Debug for PeerRequestSender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive() } }