From b6d9fe87b9bbd6ccac70ef87151a0919645102f7 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 25 Nov 2022 22:13:20 +0100 Subject: [PATCH] feat(net): propagate new transactions (#256) --- crates/net/network/src/network.rs | 5 ++ crates/net/network/src/transactions.rs | 62 +++++++++++++++++++- crates/transaction-pool/src/lib.rs | 9 ++- crates/transaction-pool/src/pool/events.rs | 3 + crates/transaction-pool/src/pool/listener.rs | 12 +++- crates/transaction-pool/src/pool/mod.rs | 11 +++- crates/transaction-pool/src/traits.rs | 46 ++++++++++++++- 7 files changed, 140 insertions(+), 8 deletions(-) diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 236223a2b2..f6f78b8c62 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -105,6 +105,11 @@ impl NetworkHandle { pub fn send_request(&self, peer_id: PeerId, request: PeerRequest) { self.send_message(NetworkHandleMessage::EthRequest { peer_id, request }) } + + /// Send full transactions to the peer + pub fn send_transactions(&self, peer_id: PeerId, msg: Arc) { + self.send_message(NetworkHandleMessage::SendTransaction { peer_id, msg }) + } } struct NetworkInner { diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 3a0e2a7f94..9bc79cdb8d 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -16,7 +16,9 @@ use reth_interfaces::p2p::error::RequestResult; use reth_primitives::{ FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256, }; -use reth_transaction_pool::{error::PoolResult, TransactionPool}; +use reth_transaction_pool::{ + error::PoolResult, PropagateKind, PropagatedTransactions, TransactionPool, +}; use std::{ collections::{hash_map::Entry, HashMap}, future::Future, @@ -27,6 +29,7 @@ use std::{ }; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; +use tracing::trace; /// Cache limit of transactions to keep track of for a single peer. const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10; @@ -151,6 +154,55 @@ where } } + /// Invoked when a new transaction is pending. + /// + /// When new transactions appear in the pool, we propagate them to the network using the + /// `Transactions` and `NewPooledTransactionHashes` messages. The Transactions message relays + /// complete transaction objects and is typically sent to a small, random fraction of connected + /// peers. + /// + /// All other peers receive a notification of the transaction hash and can request the + /// complete transaction object if it is unknown to them. The dissemination of complete + /// transactions to a fraction of peers usually ensures that all nodes receive the transaction + /// and won't need to request it. + fn on_new_transactions(&mut self, hashes: impl IntoIterator) { + trace!(target: "net::tx", "Start propagating transactions"); + + let propagated = self.propagate_transactions( + self.pool + .get_all(hashes) + .into_iter() + .map(|tx| (*tx.hash(), tx.transaction.to_recovered_transaction().into_signed())) + .collect(), + ); + + // notify pool so events get fired + self.pool.on_propagated(propagated); + } + + fn propagate_transactions( + &mut self, + txs: Vec<(TxHash, TransactionSigned)>, + ) -> PropagatedTransactions { + let mut propagated = PropagatedTransactions::default(); + + for (peer_id, peer) in self.peers.iter_mut() { + let (hashes, full): (Vec<_>, Vec<_>) = + txs.iter().filter(|(hash, _)| peer.transactions.insert(*hash)).cloned().unzip(); + + if !full.is_empty() { + // TODO select peer for full or hash + self.network.send_transactions(*peer_id, Arc::new(Transactions(full))); + + for hash in hashes { + propagated.0.entry(hash).or_default().push(PropagateKind::Full(*peer_id)); + } + } + } + + propagated + } + /// Request handler for an incoming `NewPooledTransactionHashes` fn on_new_pooled_transactions( &mut self, @@ -323,8 +375,12 @@ where } // handle new transactions - while let Poll::Ready(Some(_hash)) = this.pending_transactions.poll_next_unpin(cx) { - // TODO(mattsse): propagate new transactions + let mut new_txs = Vec::new(); + while let Poll::Ready(Some(hash)) = this.pending_transactions.poll_next_unpin(cx) { + new_txs.push(hash); + } + if !new_txs.is_empty() { + this.on_new_transactions(new_txs); } // Advance all requests. diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 01dc45dfb5..78ab42c4d0 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -78,7 +78,10 @@ pub use crate::{ config::PoolConfig, ordering::TransactionOrdering, - traits::{BestTransactions, OnNewBlockEvent, PoolTransaction, TransactionPool}, + traits::{ + BestTransactions, OnNewBlockEvent, PoolTransaction, PropagateKind, PropagatedTransactions, + TransactionPool, + }, validate::{TransactionValidationOutcome, TransactionValidator}, }; use crate::{ @@ -241,6 +244,10 @@ where ) -> Vec>> { self.inner().get_all(txs) } + + fn on_propagated(&self, txs: PropagatedTransactions) { + self.inner().on_propagated(txs) + } } impl Clone for Pool { diff --git a/crates/transaction-pool/src/pool/events.rs b/crates/transaction-pool/src/pool/events.rs index 7853c95b37..ddae684540 100644 --- a/crates/transaction-pool/src/pool/events.rs +++ b/crates/transaction-pool/src/pool/events.rs @@ -1,3 +1,4 @@ +use crate::traits::PropagateKind; use reth_primitives::{TxHash, H256}; use serde::{Deserialize, Serialize}; @@ -18,4 +19,6 @@ pub enum TransactionEvent { Discarded, /// Transaction became invalid indefinitely. Invalid, + /// Transaction was propagated to peers. + Propagated(Vec), } diff --git a/crates/transaction-pool/src/pool/listener.rs b/crates/transaction-pool/src/pool/listener.rs index 7c423da0e3..3491288652 100644 --- a/crates/transaction-pool/src/pool/listener.rs +++ b/crates/transaction-pool/src/pool/listener.rs @@ -1,6 +1,6 @@ //! Listeners for the transaction-pool -use crate::pool::events::TransactionEvent; +use crate::{pool::events::TransactionEvent, traits::PropagateKind}; use reth_primitives::{rpc::TxHash, H256}; use std::{collections::HashMap, hash}; use tokio::sync::mpsc::UnboundedSender; @@ -47,6 +47,11 @@ impl PoolEventListener { self.notify_with(tx, |notifier| notifier.queued()); } + /// Notify listeners about a transaction that was propagated. + pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec) { + self.notify_with(tx, |notifier| notifier.propagated(peers)); + } + /// Notify listeners about a transaction that was discarded. pub(crate) fn discarded(&mut self, tx: &TxHash) { self.notify_with(tx, |notifier| notifier.discarded()); @@ -99,6 +104,11 @@ impl PoolEventNotifier { self.is_done = true; } + /// Transaction was propagated. + fn propagated(&mut self, peers: Vec) { + self.notify(TransactionEvent::Propagated(peers)); + } + /// Transaction was replaced with the given transaction fn discarded(&mut self) { self.notify(TransactionEvent::Discarded); diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 6d1bb206b0..99f47a90e4 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -69,7 +69,9 @@ use crate::{ error::{PoolError, PoolResult}, identifier::{SenderId, SenderIdentifiers, TransactionId}, pool::{listener::PoolEventListener, state::SubPool, txpool::TxPool}, - traits::{NewTransactionEvent, PoolStatus, PoolTransaction, TransactionOrigin}, + traits::{ + NewTransactionEvent, PoolStatus, PoolTransaction, PropagatedTransactions, TransactionOrigin, + }, validate::{TransactionValidationOutcome, ValidPoolTransaction}, OnNewBlockEvent, PoolConfig, TransactionOrdering, TransactionValidator, U256, }; @@ -357,6 +359,13 @@ where self.pool.read().get_all(txs).collect() } + /// Notify about propagated transactions. + pub(crate) fn on_propagated(&self, txs: PropagatedTransactions) { + let mut listener = self.event_listener.write(); + + txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers)) + } + /// Number of transactions in the entire pool pub(crate) fn len(&self) -> usize { self.pool.read().len() diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 47b4790646..2d8e85e0e9 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1,6 +1,7 @@ use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction, BlockID}; -use reth_primitives::{Address, FromRecoveredTransaction, TxHash, H256, U256}; -use std::{fmt, sync::Arc}; +use reth_primitives::{Address, FromRecoveredTransaction, PeerId, TxHash, H256, U256}; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, fmt, sync::Arc}; use tokio::sync::mpsc::Receiver; /// General purpose abstraction fo a transaction-pool. @@ -109,6 +110,47 @@ pub trait TransactionPool: Send + Sync + 'static { &self, txs: impl IntoIterator, ) -> Vec>>; + + /// Notify the pool about transactions that are propagated to peers. + /// + /// Consumer: P2P + fn on_propagated(&self, txs: PropagatedTransactions); +} + +/// Represents a transaction that was propagated over the network. +#[derive(Debug, Clone, Eq, PartialEq, Default)] +pub struct PropagatedTransactions(pub HashMap>); + +/// Represents how a transaction was propagated over the network. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub enum PropagateKind { + /// The full transaction object was sent to the peer. + /// + /// This is equivalent to the `Transaction` message + Full(PeerId), + /// Only the Hash was propagated to the peer. + Hash(PeerId), +} + +// === impl PropagateKind === + +impl PropagateKind { + /// Returns the peer the transaction was sent to + pub fn peer(&self) -> &PeerId { + match self { + PropagateKind::Full(peer) => peer, + PropagateKind::Hash(peer) => peer, + } + } +} + +impl From for PeerId { + fn from(value: PropagateKind) -> Self { + match value { + PropagateKind::Full(peer) => peer, + PropagateKind::Hash(peer) => peer, + } + } } /// Represents a new transaction