From e53ed8ffc27b9e4b13672f539b93c4cf2ea87921 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 29 Nov 2022 11:43:45 +0100 Subject: [PATCH] refactor(net): use shared objects on a per peer basis (#270) --- crates/net/eth-wire/src/types/broadcast.rs | 11 ++++++++++ crates/net/eth-wire/src/types/message.rs | 21 ++++++++---------- crates/net/network/src/manager.rs | 14 ++++++------ crates/net/network/src/message.rs | 15 +++++++------ crates/net/network/src/network.rs | 15 +++++++------ crates/net/network/src/session/active.rs | 20 +++++++++-------- crates/net/network/src/state.rs | 6 +++--- crates/net/network/src/transactions.rs | 25 ++++++++++------------ 8 files changed, 71 insertions(+), 56 deletions(-) diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index a80dc23789..fe49e91d08 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -1,6 +1,7 @@ //! Types for broadcasting new data. use reth_primitives::{Header, TransactionSigned, H256, U128}; use reth_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper}; +use std::sync::Arc; /// This informs peers of new blocks that have appeared on the network. #[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper)] @@ -86,6 +87,16 @@ impl From for Vec { } } +/// Same as [`Transactions`] but this is intended as egress message send from local to _many_ peers. +/// +/// The list of transactions is constructed on per-peers basis, but the underlying transaction +/// objects are shared. +#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper)] +pub struct SharedTransactions( + /// New transactions for the peer to include in its mempool. + pub Vec>, +); + /// This informs peers of transaction hashes for transactions that have appeared on the network, /// but have not been included in a block. #[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper)] diff --git a/crates/net/eth-wire/src/types/message.rs b/crates/net/eth-wire/src/types/message.rs index eb773c8091..a88b0fd566 100644 --- a/crates/net/eth-wire/src/types/message.rs +++ b/crates/net/eth-wire/src/types/message.rs @@ -4,6 +4,7 @@ use super::{ GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, Status, Transactions, }; +use crate::SharedTransactions; use bytes::{Buf, BufMut}; use reth_rlp::{length_of_length, Decodable, Encodable, Header}; use std::{fmt::Debug, sync::Arc}; @@ -225,13 +226,17 @@ impl Encodable for EthMessage { } } -/// Represents broadcast messages of [`EthMessage`] that can be sent to multiple peers +/// Represents broadcast messages of [`EthMessage`] with the same object that can be sent to +/// multiple peers. +/// +/// Messages that contain a list of hashes depend on the peer the message is sent to. A peer should +/// never receive a hash of an object (block, transaction) it has already seen. +/// +/// Note: This is only useful for outgoing messages. #[derive(Clone, Debug, PartialEq, Eq)] pub enum EthBroadcastMessage { - NewBlockHashes(Arc), NewBlock(Arc), - Transactions(Arc), - NewPooledTransactionHashes(Arc), + Transactions(SharedTransactions), } // === impl EthBroadcastMessage === @@ -240,12 +245,8 @@ impl EthBroadcastMessage { /// Returns the message's ID. pub fn message_id(&self) -> EthMessageID { match self { - EthBroadcastMessage::NewBlockHashes(_) => EthMessageID::NewBlockHashes, EthBroadcastMessage::NewBlock(_) => EthMessageID::NewBlock, EthBroadcastMessage::Transactions(_) => EthMessageID::Transactions, - EthBroadcastMessage::NewPooledTransactionHashes(_) => { - EthMessageID::NewPooledTransactionHashes - } } } } @@ -253,19 +254,15 @@ impl EthBroadcastMessage { impl Encodable for EthBroadcastMessage { fn encode(&self, out: &mut dyn BufMut) { match self { - EthBroadcastMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out), EthBroadcastMessage::NewBlock(new_block) => new_block.encode(out), EthBroadcastMessage::Transactions(transactions) => transactions.encode(out), - EthBroadcastMessage::NewPooledTransactionHashes(hashes) => hashes.encode(out), } } fn length(&self) -> usize { match self { - EthBroadcastMessage::NewBlockHashes(new_block_hashes) => new_block_hashes.length(), EthBroadcastMessage::NewBlock(new_block) => new_block.length(), EthBroadcastMessage::Transactions(transactions) => transactions.length(), - EthBroadcastMessage::NewPooledTransactionHashes(hashes) => hashes.length(), } } } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 02d723c851..6cd0ca71e0 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -266,12 +266,11 @@ where } } - /// Handles a received Message from the peer. + /// Handles a received Message from the peer's session. fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) { match msg { PeerMessage::NewBlockHashes(hashes) => { self.within_pow_or_disconnect(peer_id, |this| { - let hashes = Arc::try_unwrap(hashes).unwrap_or_else(|arc| (*arc).clone()); // update peer's state, to track what blocks this peer has seen this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0) }) @@ -289,14 +288,17 @@ where msg, }); } - PeerMessage::Transactions(msg) => { + PeerMessage::EthRequest(req) => { + self.on_eth_request(peer_id, req); + } + PeerMessage::ReceivedTransaction(msg) => { self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions { peer_id, msg, }); } - PeerMessage::EthRequest(req) => { - self.on_eth_request(peer_id, req); + PeerMessage::SendTransactions(_) => { + unreachable!("Not emitted by session") } PeerMessage::Other(_) => {} } @@ -320,7 +322,7 @@ where self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request)) } NetworkHandleMessage::SendTransaction { peer_id, msg } => { - self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::Transactions(msg)) + self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg)) } NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self .swarm diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index c97238421d..35b0dd11b9 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -8,7 +8,7 @@ use reth_eth_wire::{ capability::CapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders, EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, - Transactions, + SharedTransactions, Transactions, }; use reth_interfaces::p2p::error::{RequestError, RequestResult}; use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256}; @@ -36,17 +36,20 @@ impl NewBlockMessage { } } -/// Represents all messages that can be sent to a peer session +/// All Bi-directional eth-message variants that can be sent to a session or received from a +/// session. #[derive(Debug)] pub enum PeerMessage { /// Announce new block hashes - NewBlockHashes(Arc), + NewBlockHashes(NewBlockHashes), /// Broadcast new block. NewBlock(NewBlockMessage), - /// Broadcast transactions. - Transactions(Arc), + /// Received transactions _from_ the peer + ReceivedTransaction(Transactions), + /// Broadcast transactions _from_ local _to_ a peer. + SendTransactions(SharedTransactions), /// Send new pooled transactions - PooledTransactions(Arc), + PooledTransactions(NewPooledTransactionHashes), /// All `eth` request variants. EthRequest(PeerRequest), /// Other than eth namespace message diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index f6f78b8c62..3c2b0926bb 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -5,8 +5,8 @@ use crate::{ peers::{PeersHandle, ReputationChangeKind}, }; use parking_lot::Mutex; -use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, Transactions}; -use reth_primitives::{PeerId, H256}; +use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, SharedTransactions}; +use reth_primitives::{PeerId, TransactionSigned, H256}; use std::{ net::SocketAddr, sync::{ @@ -107,8 +107,11 @@ impl NetworkHandle { } /// Send full transactions to the peer - pub fn send_transactions(&self, peer_id: PeerId, msg: Arc) { - self.send_message(NetworkHandleMessage::SendTransaction { peer_id, msg }) + pub fn send_transactions(&self, peer_id: PeerId, msg: Vec>) { + self.send_message(NetworkHandleMessage::SendTransaction { + peer_id, + msg: SharedTransactions(msg), + }) } } @@ -139,9 +142,9 @@ pub(crate) enum NetworkHandleMessage { /// Broadcast event to announce a new block to all nodes. AnnounceBlock(NewBlock, H256), /// Sends the list of transactions to the given peer. - SendTransaction { peer_id: PeerId, msg: Arc }, + SendTransaction { peer_id: PeerId, msg: SharedTransactions }, /// Sends the list of transactions hashes to the given peer. - SendPooledTransactionHashes { peer_id: PeerId, msg: Arc }, + SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes }, /// Send an `eth` protocol request to the peer. EthRequest { /// The peer to send the request to. diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 74ee2ca793..8f770d2f8a 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -137,7 +137,7 @@ impl ActiveSession { return Some(EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake)) } EthMessage::NewBlockHashes(msg) => { - self.emit_message(PeerMessage::NewBlockHashes(Arc::new(msg))); + self.emit_message(PeerMessage::NewBlockHashes(msg)); } EthMessage::NewBlock(msg) => { let block = @@ -145,10 +145,10 @@ impl ActiveSession { self.emit_message(PeerMessage::NewBlock(block)); } EthMessage::Transactions(msg) => { - self.emit_message(PeerMessage::Transactions(Arc::new(msg))); + self.emit_message(PeerMessage::ReceivedTransaction(msg)); } EthMessage::NewPooledTransactionHashes(msg) => { - self.emit_message(PeerMessage::PooledTransactions(Arc::new(msg))); + self.emit_message(PeerMessage::PooledTransactions(msg)); } EthMessage::GetBlockHeaders(req) => { on_request!(req, BlockHeaders, GetBlockHeaders); @@ -198,22 +198,24 @@ impl ActiveSession { fn on_peer_message(&mut self, msg: PeerMessage) { match msg { PeerMessage::NewBlockHashes(msg) => { - self.queued_outgoing.push_back(EthBroadcastMessage::NewBlockHashes(msg).into()); + self.queued_outgoing.push_back(EthMessage::NewBlockHashes(msg).into()); } PeerMessage::NewBlock(msg) => { self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into()); } - PeerMessage::Transactions(msg) => { - self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into()); - } PeerMessage::PooledTransactions(msg) => { - self.queued_outgoing - .push_back(EthBroadcastMessage::NewPooledTransactionHashes(msg).into()); + self.queued_outgoing.push_back(EthMessage::NewPooledTransactionHashes(msg).into()); } PeerMessage::EthRequest(req) => { let deadline = self.request_deadline(); self.on_peer_request(req, deadline); } + PeerMessage::SendTransactions(msg) => { + self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into()); + } + PeerMessage::ReceivedTransaction(_) => { + unreachable!("Not emitted by network") + } PeerMessage::Other(_) => {} } } diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 8983b99363..93b58e2f18 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -173,7 +173,7 @@ where /// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet. pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage) { let number = msg.block.block.header.number; - let hashes = Arc::new(NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }])); + let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]); for (peer_id, peer) in self.connected_peers.iter_mut() { if peer.blocks.contains(&msg.hash) { // skip peers which already reported the block @@ -186,7 +186,7 @@ where self.queued_messages.push_back(StateAction::NewBlockHashes { peer_id: *peer_id, - hashes: Arc::clone(&hashes), + hashes: hashes.clone(), }); } } @@ -409,7 +409,7 @@ pub(crate) enum StateAction { /// Target of the message peer_id: PeerId, /// `NewBlockHashes` message to send to the peer. - hashes: Arc, + hashes: NewBlockHashes, }, /// Create a new connection to the given node. Connect { remote_addr: SocketAddr, peer_id: PeerId }, diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 977dc126eb..adfb05f826 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -172,7 +172,9 @@ where self.pool .get_all(hashes) .into_iter() - .map(|tx| (*tx.hash(), tx.transaction.to_recovered_transaction().into_signed())) + .map(|tx| { + (*tx.hash(), Arc::new(tx.transaction.to_recovered_transaction().into_signed())) + }) .collect(), ); @@ -182,7 +184,7 @@ where fn propagate_transactions( &mut self, - txs: Vec<(TxHash, TransactionSigned)>, + txs: Vec<(TxHash, Arc)>, ) -> PropagatedTransactions { let mut propagated = PropagatedTransactions::default(); @@ -192,7 +194,7 @@ where if !full.is_empty() { // TODO select peer for full or hash - self.network.send_transactions(*peer_id, Arc::new(Transactions(full))); + self.network.send_transactions(*peer_id, full); for hash in hashes { propagated.0.entry(hash).or_default().push(PropagateKind::Full(*peer_id)); @@ -204,13 +206,9 @@ where } /// Request handler for an incoming `NewPooledTransactionHashes` - fn on_new_pooled_transactions( - &mut self, - peer_id: PeerId, - msg: Arc, - ) { + fn on_new_pooled_transactions(&mut self, peer_id: PeerId, msg: NewPooledTransactionHashes) { if let Some(peer) = self.peers.get_mut(&peer_id) { - let mut transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone()).0; + let mut transactions = msg.0; // keep track of the transactions the peer knows peer.transactions.extend(transactions.clone()); @@ -239,8 +237,7 @@ where fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { match event { NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => { - let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone()); - self.import_transactions(peer_id, transactions.0); + self.import_transactions(peer_id, msg.0); } NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => { self.on_new_pooled_transactions(peer_id, msg) @@ -272,7 +269,7 @@ where // Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the // pool - let msg = Arc::new(NewPooledTransactionHashes(self.pool.pooled_transactions())); + let msg = NewPooledTransactionHashes(self.pool.pooled_transactions()); self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg, @@ -434,9 +431,9 @@ enum TransactionsCommand { #[derive(Debug)] pub enum NetworkTransactionEvent { /// Received list of transactions to the given peer. - IncomingTransactions { peer_id: PeerId, msg: Arc }, + IncomingTransactions { peer_id: PeerId, msg: Transactions }, /// Received list of transactions hashes to the given peer. - IncomingPooledTransactionHashes { peer_id: PeerId, msg: Arc }, + IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes }, /// Incoming `GetPooledTransactions` request from a peer. GetPooledTransactions { peer_id: PeerId,