diff --git a/Cargo.lock b/Cargo.lock index c077783af8..3fececbd57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3465,7 +3465,7 @@ dependencies = [ "async-trait", "bitflags", "fnv", - "futures", + "futures-util", "linked-hash-map", "parking_lot 0.12.1", "paste", @@ -3473,6 +3473,7 @@ dependencies = [ "reth-primitives", "serde", "thiserror", + "tokio", "tracing", ] diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 9c8f079de7..3b797c9f4c 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -50,7 +50,7 @@ impl NetworkHandle { } /// Sends a [`NetworkHandleMessage`] to the manager - fn send_message(&self, msg: NetworkHandleMessage) { + pub(crate) fn send_message(&self, msg: NetworkHandleMessage) { let _ = self.inner.to_manager_tx.send(msg); } diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 9a0a07630b..d952439bec 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -1,24 +1,35 @@ //! Transaction management for the p2p network. -use crate::{cache::LruCache, manager::NetworkEvent, message::PeerRequestSender, NetworkHandle}; -use futures::stream::FuturesUnordered; -use reth_primitives::{PeerId, Transaction, H256}; -use reth_transaction_pool::TransactionPool; +use crate::{ + cache::LruCache, + manager::NetworkEvent, + message::{PeerRequest, PeerRequestSender}, + network::NetworkHandleMessage, + NetworkHandle, +}; +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use reth_eth_wire::{GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions}; +use reth_interfaces::p2p::error::RequestResult; +use reth_primitives::{ + FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256, +}; +use reth_transaction_pool::{error::PoolResult, TransactionPool}; use std::{ collections::{hash_map::Entry, HashMap}, future::Future, num::NonZeroUsize, pin::Pin, sync::Arc, + task::{Context, Poll}, }; -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio::sync::{mpsc, oneshot, oneshot::Sender}; +use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; /// Cache limit of transactions to keep track of for a single peer. -const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024; +const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10; /// The future for inserting a function into the pool -pub type PoolImportFuture = Pin + Send>>; +pub type PoolImportFuture = Pin> + Send + 'static>>; /// Api to interact with [`TransactionsManager`] task. pub struct TransactionsHandle { @@ -52,11 +63,13 @@ pub struct TransactionsManager { /// /// From which we get all new incoming transaction related messages. network_events: UnboundedReceiverStream, + /// All currently active requests for pooled transactions. + inflight_requests: Vec, /// All currently pending transactions grouped by peers. /// /// This way we can track incoming transactions and prevent multiple pool imports for the same /// transaction - transactions_by_peers: HashMap>, + transactions_by_peers: HashMap>, /// Transactions that are currently imported into the `Pool` pool_imports: FuturesUnordered, /// All the connected peers. @@ -65,28 +78,36 @@ pub struct TransactionsManager { command_tx: mpsc::UnboundedSender, /// Incoming commands from [`TransactionsHandle`]. command_rx: UnboundedReceiverStream, + /// Incoming commands from [`TransactionsHandle`]. + pending_transactions: ReceiverStream, } // === impl TransactionsManager === impl TransactionsManager where - Pool: TransactionPool, + Pool: TransactionPool + Clone, + ::Transaction: IntoRecoveredTransaction, { /// Sets up a new instance. pub fn new(network: NetworkHandle, pool: Pool) -> Self { let network_events = network.event_listener(); let (command_tx, command_rx) = mpsc::unbounded_channel(); + // install a listener for new transactions + let pending = pool.pending_transactions_listener(); + Self { pool, network, network_events: UnboundedReceiverStream::new(network_events), + inflight_requests: Default::default(), transactions_by_peers: Default::default(), pool_imports: Default::default(), peers: Default::default(), command_tx, command_rx: UnboundedReceiverStream::new(command_rx), + pending_transactions: ReceiverStream::new(pending), } } @@ -95,8 +116,63 @@ where TransactionsHandle { manager_tx: self.command_tx.clone() } } + /// Request handler for an incoming request for transactions + fn on_get_pooled_transactions( + &mut self, + peer_id: PeerId, + request: GetPooledTransactions, + response: Sender>, + ) { + if let Some(peer) = self.peers.get_mut(&peer_id) { + let transactions = self + .pool + .get_all(request.0) + .into_iter() + .map(|tx| tx.transaction.to_recovered_transaction().into_signed()) + .collect::>(); + + // we sent a response at which point we assume that the peer is aware of the transaction + peer.transactions.extend(transactions.iter().map(|tx| tx.hash())); + + let resp = PooledTransactions(transactions); + let _ = response.send(Ok(resp)); + } + } + + /// Request handler for an incoming `NewPooledTransactionHashes` + fn on_new_pooled_transactions( + &mut self, + peer_id: PeerId, + msg: Arc, + ) { + if let Some(peer) = self.peers.get_mut(&peer_id) { + let mut transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone()).0; + + // keep track of the transactions the peer knows + peer.transactions.extend(transactions.clone()); + + self.pool.retain_unknown(&mut transactions); + + if transactions.is_empty() { + // nothing to request + return + } + + // request the missing transactions + let (response, rx) = oneshot::channel(); + let req = PeerRequest::GetPooledTransactions { + request: GetPooledTransactions(transactions), + response, + }; + + if peer.request_tx.try_send(req).is_ok() { + self.inflight_requests.push(GetPooledTxRequest { peer_id, response: rx }) + } + } + } + /// Handles a received event - async fn on_event(&mut self, event: NetworkEvent) { + fn on_event(&mut self, event: NetworkEvent) { match event { NetworkEvent::SessionClosed { peer_id } => { // remove the peer @@ -114,35 +190,140 @@ where }, ); - // TODO send `NewPooledTransactionHashes + // Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the + // pool + let msg = Arc::new(NewPooledTransactionHashes(self.pool.pooled_transactions())); + self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes { + peer_id, + msg, + }) } NetworkEvent::IncomingTransactions { peer_id, msg } => { let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone()); - - if let Some(peer) = self.peers.get_mut(&peer_id) { - for tx in transactions.0 { - // track that the peer knows this transaction - peer.transactions.insert(tx.hash); - - match self.transactions_by_peers.entry(tx.hash) { - Entry::Occupied(mut entry) => { - // transaction was already inserted - entry.get_mut().push(peer_id); - } - Entry::Vacant(_) => { - // TODO import into the pool - } - } - } + self.import_transactions(peer_id, transactions.0); + } + NetworkEvent::IncomingPooledTransactionHashes { peer_id, msg } => { + self.on_new_pooled_transactions(peer_id, msg) + } + NetworkEvent::GetPooledTransactions { peer_id, request, response } => { + if let Ok(response) = Arc::try_unwrap(response) { + // TODO(mattsse): there should be a dedicated channel for the transaction + // manager instead + self.on_get_pooled_transactions(peer_id, request, response) } } - NetworkEvent::IncomingPooledTransactionHashes { .. } => {} - NetworkEvent::GetPooledTransactions { .. } => {} } } - /// Executes an endless future - pub async fn run(self) {} + /// Starts the import process for the given transactions. + fn import_transactions(&mut self, peer_id: PeerId, transactions: Vec) { + if let Some(peer) = self.peers.get_mut(&peer_id) { + for tx in transactions { + // recover transaction + let tx = if let Some(tx) = tx.into_ecrecovered() { + tx + } else { + // TODO: report peer? + continue + }; + + // track that the peer knows this transaction + peer.transactions.insert(tx.hash); + + match self.transactions_by_peers.entry(tx.hash) { + Entry::Occupied(mut entry) => { + // transaction was already inserted + entry.get_mut().push(peer_id); + } + Entry::Vacant(entry) => { + // this is a new transaction that should be imported into the pool + let pool_transaction = ::from_recovered_transaction(tx); + + let pool = self.pool.clone(); + let import = Box::pin(async move { + pool.add_external_transaction(pool_transaction).await + }); + + self.pool_imports.push(import); + entry.insert(vec![peer_id]); + } + } + } + } + } + + fn on_good_import(&mut self, hash: TxHash) { + if let Some(_peers) = self.transactions_by_peers.remove(&hash) { + // TODO report good peer? + } + } + + fn on_bad_import(&mut self, hash: TxHash) { + if let Some(_peers) = self.transactions_by_peers.remove(&hash) { + // TODO report bad peer? + } + } +} + +/// An endless future. +/// +/// This should be spawned or used as part of `tokio::select!`. +impl Future for TransactionsManager +where + Pool: TransactionPool + Clone + Unpin, + ::Transaction: IntoRecoveredTransaction, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // Advance all imports + while let Poll::Ready(Some(import_res)) = this.pool_imports.poll_next_unpin(cx) { + match import_res { + Ok(hash) => { + this.on_good_import(hash); + } + Err(err) => { + this.on_bad_import(*err.hash()); + } + } + } + + // handle new transactions + while let Poll::Ready(Some(_hash)) = this.pending_transactions.poll_next_unpin(cx) { + // TODO(mattsse): propagate new transactions + } + + // Advance all requests. + // We remove each request one by one and add them back. + for idx in (0..this.inflight_requests.len()).rev() { + let mut req = this.inflight_requests.swap_remove(idx); + match req.response.poll_unpin(cx) { + Poll::Pending => { + this.inflight_requests.push(req); + } + Poll::Ready(Ok(Ok(txs))) => { + this.import_transactions(req.peer_id, txs.0); + } + Poll::Ready(Ok(Err(_))) => { + // TODO report bad peer + } + Poll::Ready(Err(_)) => { + // TODO report bad peer + } + } + } + + Poll::Pending + } +} + +/// An inflight request for `PooledTransactions` from a peer +#[allow(missing_docs)] +struct GetPooledTxRequest { + peer_id: PeerId, + response: oneshot::Receiver>, } /// Tracks a single peer diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index e810143dc1..a513e468e1 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -42,8 +42,8 @@ pub use log::Log; pub use receipt::Receipt; pub use storage::StorageEntry; pub use transaction::{ - AccessList, AccessListItem, FromRecoveredTransaction, Signature, Transaction, TransactionKind, - TransactionSigned, TransactionSignedEcRecovered, TxType, + AccessList, AccessListItem, FromRecoveredTransaction, IntoRecoveredTransaction, Signature, + Transaction, TransactionKind, TransactionSigned, TransactionSignedEcRecovered, TxType, }; /// A block hash. diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index 6a6dd826b0..5915959872 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -571,6 +571,8 @@ impl TransactionSigned { } /// Recover signer from signature and hash. + /// + /// Returns `None` if the transaction's signature is invalid. pub fn recover_signer(&self) -> Option
{ let signature_hash = self.signature_hash(); self.signature.recover_signer(signature_hash) @@ -724,6 +726,22 @@ impl FromRecoveredTransaction for TransactionSignedEcRecovered { } } +/// The inverse of [`FromRecoveredTransaction`] that ensure the transaction can be sent over the +/// network +pub trait IntoRecoveredTransaction { + /// Converts to this type into a [`TransactionSignedEcRecovered`]. + /// + /// Note: this takes `&self` since indented usage is via `Arc`. + fn to_recovered_transaction(&self) -> TransactionSignedEcRecovered; +} + +impl IntoRecoveredTransaction for TransactionSignedEcRecovered { + #[inline] + fn to_recovered_transaction(&self) -> TransactionSignedEcRecovered { + self.clone() + } +} + #[cfg(test)] mod tests { use crate::{ diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index a739870e0a..eb1ee3276f 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -16,8 +16,9 @@ reth-primitives = { path = "../primitives" } # async/futures async-trait = "0.1" -futures = "0.3" +futures-util = "0.3" parking_lot = "0.12" +tokio = { version = "1", default-features = false, features = ["sync"] } # misc aquamarine = "0.1" # docs diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 8a80d6d478..a29554212f 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -87,9 +87,9 @@ use crate::{ traits::{NewTransactionEvent, PoolStatus, TransactionOrigin}, validate::ValidPoolTransaction, }; -use futures::channel::mpsc::Receiver; use reth_primitives::{BlockID, TxHash, U256, U64}; use std::{collections::HashMap, sync::Arc}; +use tokio::sync::mpsc::Receiver; mod config; pub mod error; @@ -131,11 +131,12 @@ where origin: TransactionOrigin, transactions: impl IntoIterator, ) -> PoolResult>> { - let outcome = - futures::future::join_all(transactions.into_iter().map(|tx| self.validate(origin, tx))) - .await - .into_iter() - .collect::>(); + let outcome = futures_util::future::join_all( + transactions.into_iter().map(|tx| self.validate(origin, tx)), + ) + .await + .into_iter() + .collect::>(); Ok(outcome) } @@ -209,6 +210,10 @@ where self.pool.add_transaction_listener() } + fn pooled_transactions(&self) -> Vec { + self.pool.pooled_transactions() + } + fn best_transactions( &self, ) -> Box>>> { @@ -222,6 +227,10 @@ where todo!() } + fn retain_unknown(&self, hashes: &mut Vec) { + self.pool.retain_unknown(hashes) + } + fn get(&self, tx_hash: &TxHash) -> Option>> { self.inner().get(tx_hash) } diff --git a/crates/transaction-pool/src/pool/listener.rs b/crates/transaction-pool/src/pool/listener.rs index b9d2592134..05c84261f8 100644 --- a/crates/transaction-pool/src/pool/listener.rs +++ b/crates/transaction-pool/src/pool/listener.rs @@ -1,9 +1,9 @@ //! Listeners for the transaction-pool use crate::pool::events::TransactionEvent; -use futures::channel::mpsc::UnboundedSender; use reth_primitives::H256; use std::{collections::HashMap, hash}; +use tokio::sync::mpsc::UnboundedSender; type EventSink = UnboundedSender>; @@ -75,7 +75,7 @@ struct PoolEventNotifier { impl PoolEventNotifier { fn notify(&mut self, event: TransactionEvent) { - self.senders.retain(|sender| sender.unbounded_send(event.clone()).is_ok()) + self.senders.retain(|sender| sender.send(event.clone()).is_ok()) } fn is_done(&self) -> bool { diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index eb6dfda91a..6d8c3c4074 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -73,7 +73,6 @@ use crate::{ }; use best::BestTransactions; pub use events::TransactionEvent; -use futures::channel::mpsc::{channel, Receiver, Sender}; use parking_lot::{Mutex, RwLock}; use reth_primitives::{Address, TxHash, H256}; use std::{ @@ -81,6 +80,7 @@ use std::{ sync::Arc, time::Instant, }; +use tokio::sync::mpsc; use tracing::warn; mod best; @@ -107,9 +107,9 @@ pub struct PoolInner { /// Manages listeners for transaction state change events. event_listener: RwLock>, /// Listeners for new ready transactions. - pending_transaction_listener: Mutex>>, + pending_transaction_listener: Mutex>>, /// Listeners for new transactions added to the pool. - transaction_listener: Mutex>>>, + transaction_listener: Mutex>>>, } // === impl PoolInner === @@ -149,17 +149,23 @@ where /// Adds a new transaction listener to the pool that gets notified about every new _ready_ /// transaction - pub fn add_pending_listener(&self) -> Receiver { + pub fn add_pending_listener(&self) -> mpsc::Receiver { const TX_LISTENER_BUFFER_SIZE: usize = 2048; - let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE); + let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE); self.pending_transaction_listener.lock().push(tx); rx } + /// Returns hashes of _all_ transactions in the pool. + pub(crate) fn pooled_transactions(&self) -> Vec { + let pool = self.pool.read(); + pool.all().hashes_iter().collect() + } + /// Adds a new transaction listener to the pool that gets notified about every new transaction - pub fn add_transaction_listener(&self) -> Receiver> { + pub fn add_transaction_listener(&self) -> mpsc::Receiver> { const TX_LISTENER_BUFFER_SIZE: usize = 1024; - let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE); + let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE); self.transaction_listener.lock().push(tx); rx } @@ -256,8 +262,8 @@ where let mut transaction_listeners = self.pending_transaction_listener.lock(); transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) { Ok(()) => true, - Err(e) => { - if e.is_full() { + Err(err) => { + if matches!(err, mpsc::error::TrySendError::Full(_)) { warn!( target: "txpool", "[{:?}] dropping full ready transaction listener", @@ -277,8 +283,8 @@ where transaction_listeners.retain_mut(|listener| match listener.try_send(event.clone()) { Ok(()) => true, - Err(e) => { - if e.is_full() { + Err(err) => { + if matches!(err, mpsc::error::TrySendError::Full(_)) { warn!( target: "txpool", "dropping full transaction listener", @@ -325,6 +331,12 @@ where self.pool.read().best_transactions() } + /// Removes all transactions transactions that are missing in the pool. + pub(crate) fn retain_unknown(&self, hashes: &mut Vec) { + let pool = self.pool.read(); + hashes.retain(|tx| !pool.contains(tx)) + } + /// Returns the transaction by hash. pub(crate) fn get( &self, diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 391496c576..0cbc6ed121 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -102,6 +102,11 @@ impl TxPool { } } + /// Returns access to the [`AllTransactions`] container. + pub(crate) fn all(&self) -> &AllTransactions { + &self.all_transactions + } + /// Returns stats about the pool. pub(crate) fn status(&self) -> PoolStatus { PoolStatus { @@ -417,10 +422,6 @@ impl TxPool { #[cfg(test)] #[allow(missing_docs)] impl TxPool { - pub(crate) fn all(&self) -> &AllTransactions { - &self.all_transactions - } - pub(crate) fn pending(&self) -> &PendingPool { &self.pending_pool } @@ -463,6 +464,11 @@ impl AllTransactions { Self { max_account_slots, ..Default::default() } } + /// Returns an iterator over all _unique_ hashes in the pool + pub(crate) fn hashes_iter(&self) -> impl Iterator + '_ { + self.by_hash.keys().copied() + } + /// Returns if the transaction for the given hash is already included in this pool pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool { self.by_hash.contains_key(tx_hash) diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index e07821eebf..47b4790646 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1,7 +1,7 @@ use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction, BlockID}; -use futures::{channel::mpsc::Receiver, future::Shared}; use reth_primitives::{Address, FromRecoveredTransaction, TxHash, H256, U256}; use std::{fmt, sync::Arc}; +use tokio::sync::mpsc::Receiver; /// General purpose abstraction fo a transaction-pool. /// @@ -27,6 +27,8 @@ pub trait TransactionPool: Send + Sync + 'static { /// /// This is intended to be used by the network to insert incoming transactions received over the /// p2p network. + /// + /// Consumer: P2P async fn add_external_transaction(&self, transaction: Self::Transaction) -> PoolResult { self.add_transaction(TransactionOrigin::External, transaction).await } @@ -59,6 +61,13 @@ pub trait TransactionPool: Send + Sync + 'static { /// Returns a new stream that yields new valid transactions added to the pool. fn transactions_listener(&self) -> Receiver>; + /// Returns hashes of all transactions in the pool. + /// + /// Note: This returns a `Vec` but should guarantee that all hashes are unique. + /// + /// Consumer: P2P + fn pooled_transactions(&self) -> Vec; + /// Returns an iterator that yields transactions that are ready for block production. /// /// Consumer: Block production @@ -76,6 +85,13 @@ pub trait TransactionPool: Send + Sync + 'static { tx_hashes: &[TxHash], ) -> Vec>>; + /// Retains only those hashes that are unknown to the pool. + /// In other words, removes all transactions from the given set that are currently present in + /// the pool. + /// + /// Consumer: P2P + fn retain_unknown(&self, hashes: &mut Vec); + /// Returns if the transaction for the given hash is already included in this pool. fn contains(&self, tx_hash: &TxHash) -> bool { self.get(tx_hash).is_some()