diff --git a/crates/transaction-pool/src/error.rs b/crates/transaction-pool/src/error.rs index 8a4f971b32..25226b8f26 100644 --- a/crates/transaction-pool/src/error.rs +++ b/crates/transaction-pool/src/error.rs @@ -1,6 +1,6 @@ //! Transaction pool errors -use reth_primitives::{Address, BlockID, TxHash, U256}; +use reth_primitives::{Address, TxHash, U256}; /// Transaction pool result type. pub type PoolResult = Result; diff --git a/crates/transaction-pool/src/identifier.rs b/crates/transaction-pool/src/identifier.rs index df31fc7eb7..07098b68a1 100644 --- a/crates/transaction-pool/src/identifier.rs +++ b/crates/transaction-pool/src/identifier.rs @@ -1,6 +1,6 @@ use fnv::FnvHashMap; use reth_primitives::Address; -use std::{collections::HashMap, ops::Bound}; +use std::collections::HashMap; /// An internal mapping of addresses. /// @@ -17,6 +17,7 @@ pub struct SenderIdentifiers { impl SenderIdentifiers { /// Returns the address for the given identifier. + #[allow(unused)] pub fn address(&self, id: &SenderId) -> Option<&Address> { self.sender_to_address.get(id) } @@ -56,8 +57,9 @@ pub struct SenderId(u64); impl SenderId { /// Returns a `Bound` for `TransactionId` starting with nonce `0` - pub(crate) fn start_bound(self) -> Bound { - Bound::Included(TransactionId::new(self, 0)) + #[cfg(test)] + pub(crate) fn start_bound(self) -> std::ops::Bound { + std::ops::Bound::Included(TransactionId::new(self, 0)) } } diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 78ab42c4d0..f0b9a9ef80 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -1,6 +1,5 @@ #![warn(missing_docs)] // unreachable_pub, missing_debug_implementations -#![allow(unused)] // TODO(mattsse) remove after progress was made #![deny(unused_must_use, rust_2018_idioms)] #![doc(test( no_crate_inject, @@ -86,11 +85,11 @@ pub use crate::{ }; use crate::{ error::PoolResult, - pool::{OnNewBlockOutcome, PoolInner}, - traits::{NewTransactionEvent, PoolStatus, TransactionOrigin}, + pool::PoolInner, + traits::{NewTransactionEvent, PoolSize, TransactionOrigin}, validate::ValidPoolTransaction, }; -use reth_primitives::{BlockID, TxHash, U256, U64}; +use reth_primitives::{TxHash, U256}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::mpsc::Receiver; @@ -128,6 +127,11 @@ where &self.pool } + /// Get the config the pool was configured with. + pub fn config(&self) -> &PoolConfig { + self.inner().config() + } + /// Returns future that validates all transaction in the given iterator. async fn validate_all( &self, @@ -178,8 +182,8 @@ where { type Transaction = T::Transaction; - fn status(&self) -> PoolStatus { - self.pool.status() + fn status(&self) -> PoolSize { + self.pool.size() } fn on_new_block(&self, event: OnNewBlockEvent) { diff --git a/crates/transaction-pool/src/pool/listener.rs b/crates/transaction-pool/src/pool/listener.rs index 3491288652..6fa38a1475 100644 --- a/crates/transaction-pool/src/pool/listener.rs +++ b/crates/transaction-pool/src/pool/listener.rs @@ -2,25 +2,28 @@ use crate::{pool::events::TransactionEvent, traits::PropagateKind}; use reth_primitives::{rpc::TxHash, H256}; -use std::{collections::HashMap, hash}; +use std::collections::HashMap; use tokio::sync::mpsc::UnboundedSender; -type EventSink = UnboundedSender; +type EventBroadcast = UnboundedSender; -/// Transaction pool event listeners. +/// A type that broadcasts [`TransactionEvent`] to installed listeners. +/// +/// This is essentially a multi-producer, multi-consumer channel where each event is broadcasted to +/// all active receivers. #[derive(Default)] -pub(crate) struct PoolEventListener { +pub(crate) struct PoolEventBroadcast { /// All listeners for certain transaction events. - listeners: HashMap, + broadcasters: HashMap, } -impl PoolEventListener { - /// Calls the notification callback with the `PoolEventListenerSender` that belongs to the hash. - fn notify_with(&mut self, hash: &TxHash, callback: F) +impl PoolEventBroadcast { + /// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash. + fn broadcast_with(&mut self, hash: &TxHash, callback: F) where - F: FnOnce(&mut PoolEventNotifier), + F: FnOnce(&mut PoolEventBroadcaster), { - let is_done = if let Some(sink) = self.listeners.get_mut(hash) { + let is_done = if let Some(sink) = self.broadcasters.get_mut(hash) { callback(sink); sink.is_done() } else { @@ -28,53 +31,55 @@ impl PoolEventListener { }; if is_done { - self.listeners.remove(hash); + self.broadcasters.remove(hash); } } /// Notify listeners about a transaction that was added to the pending queue. pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<&TxHash>) { - self.notify_with(tx, |notifier| notifier.pending()); + self.broadcast_with(tx, |notifier| notifier.pending()); if let Some(replaced) = replaced { // notify listeners that this transaction was replaced - self.notify_with(replaced, |notifier| notifier.replaced(*tx)); + self.broadcast_with(replaced, |notifier| notifier.replaced(*tx)); } } /// Notify listeners about a transaction that was added to the queued pool. pub(crate) fn queued(&mut self, tx: &TxHash) { - self.notify_with(tx, |notifier| notifier.queued()); + self.broadcast_with(tx, |notifier| notifier.queued()); } /// Notify listeners about a transaction that was propagated. pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec) { - self.notify_with(tx, |notifier| notifier.propagated(peers)); + self.broadcast_with(tx, |notifier| notifier.propagated(peers)); } /// Notify listeners about a transaction that was discarded. pub(crate) fn discarded(&mut self, tx: &TxHash) { - self.notify_with(tx, |notifier| notifier.discarded()); + self.broadcast_with(tx, |notifier| notifier.discarded()); } /// Notify listeners that the transaction was mined pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: H256) { - self.notify_with(tx, |notifier| notifier.mined(block_hash)); + self.broadcast_with(tx, |notifier| notifier.mined(block_hash)); } } -/// Sender half(s) of the event channels for a specific transaction +/// All Sender half(s) of the event channels for a specific transaction. +/// +/// This mimics [tokio::sync::broadcast] but uses separate channels. #[derive(Debug)] -struct PoolEventNotifier { +struct PoolEventBroadcaster { /// Tracks whether the transaction this notifier can stop because the transaction was /// completed, or removed. is_done: bool, /// Corresponding sender half(s) for event listener channel - senders: Vec, + senders: Vec, } -impl PoolEventNotifier { - fn notify(&mut self, event: TransactionEvent) { +impl PoolEventBroadcaster { + fn broadcast(&mut self, event: TransactionEvent) { self.senders.retain(|sender| sender.send(event.clone()).is_ok()) } @@ -84,34 +89,34 @@ impl PoolEventNotifier { /// Transaction was moved to the pending queue. fn pending(&mut self) { - self.notify(TransactionEvent::Pending) + self.broadcast(TransactionEvent::Pending) } /// Transaction was moved to the queued pool fn queued(&mut self) { - self.notify(TransactionEvent::Queued) + self.broadcast(TransactionEvent::Queued) } /// Transaction was replaced with the given transaction fn replaced(&mut self, hash: TxHash) { - self.notify(TransactionEvent::Replaced(hash)); + self.broadcast(TransactionEvent::Replaced(hash)); self.is_done = true; } /// Transaction was mined. fn mined(&mut self, block_hash: H256) { - self.notify(TransactionEvent::Mined(block_hash)); + self.broadcast(TransactionEvent::Mined(block_hash)); self.is_done = true; } /// Transaction was propagated. fn propagated(&mut self, peers: Vec) { - self.notify(TransactionEvent::Propagated(peers)); + self.broadcast(TransactionEvent::Propagated(peers)); } /// Transaction was replaced with the given transaction fn discarded(&mut self) { - self.notify(TransactionEvent::Discarded); + self.broadcast(TransactionEvent::Discarded); self.is_done = true; } } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 99f47a90e4..766a75e809 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -65,25 +65,23 @@ //! transactions are _currently_ waiting for state changes that eventually move them into //! category (2.) and become pending. +#![allow(dead_code)] // TODO(mattsse): remove once remaining checks implemented + use crate::{ error::{PoolError, PoolResult}, identifier::{SenderId, SenderIdentifiers, TransactionId}, - pool::{listener::PoolEventListener, state::SubPool, txpool::TxPool}, + pool::{listener::PoolEventBroadcast, state::SubPool, txpool::TxPool}, traits::{ - NewTransactionEvent, PoolStatus, PoolTransaction, PropagatedTransactions, TransactionOrigin, + NewTransactionEvent, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin, }, validate::{TransactionValidationOutcome, ValidPoolTransaction}, - OnNewBlockEvent, PoolConfig, TransactionOrdering, TransactionValidator, U256, + OnNewBlockEvent, PoolConfig, TransactionOrdering, TransactionValidator, }; use best::BestTransactions; pub use events::TransactionEvent; use parking_lot::{Mutex, RwLock}; use reth_primitives::{Address, TxHash, H256}; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - time::Instant, -}; +use std::{collections::HashSet, sync::Arc, time::Instant}; use tokio::sync::mpsc; use tracing::warn; @@ -109,7 +107,7 @@ pub struct PoolInner { /// Pool settings. config: PoolConfig, /// Manages listeners for transaction state change events. - event_listener: RwLock, + event_listener: RwLock, /// Listeners for new ready transactions. pending_transaction_listener: Mutex>>, /// Listeners for new transactions added to the pool. @@ -136,9 +134,9 @@ where } } - /// Returns stats about the pool. - pub(crate) fn status(&self) -> PoolStatus { - self.pool.read().status() + /// Returns stats about the size of the pool. + pub(crate) fn size(&self) -> PoolSize { + self.pool.read().size() } /// Returns the internal `SenderId` for this address @@ -146,13 +144,18 @@ where self.identifiers.write().sender_id_or_create(addr) } + /// Get the config the pool was configured with. + pub fn config(&self) -> &PoolConfig { + &self.config + } + /// Get the validator reference. pub fn validator(&self) -> &V { &self.validator } - /// Adds a new transaction listener to the pool that gets notified about every new _ready_ - /// transaction + /// Adds a new transaction listener to the pool that gets notified about every new _pending_ + /// transaction. pub fn add_pending_listener(&self) -> mpsc::Receiver { const TX_LISTENER_BUFFER_SIZE: usize = 2048; let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE); @@ -160,12 +163,6 @@ where rx } - /// Returns hashes of _all_ transactions in the pool. - pub(crate) fn pooled_transactions(&self) -> Vec { - let pool = self.pool.read(); - pool.all().hashes_iter().collect() - } - /// Adds a new transaction listener to the pool that gets notified about every new transaction pub fn add_transaction_listener(&self) -> mpsc::Receiver> { const TX_LISTENER_BUFFER_SIZE: usize = 1024; @@ -174,17 +171,18 @@ where rx } - /// Updates the entire pool after a new block was mined. + /// Returns hashes of _all_ transactions in the pool. + pub(crate) fn pooled_transactions(&self) -> Vec { + let pool = self.pool.read(); + pool.all().hashes_iter().collect() + } + + /// Updates the entire pool after a new block was executed. pub(crate) fn on_new_block(&self, block: OnNewBlockEvent) { let outcome = self.pool.write().on_new_block(block); self.notify_on_new_block(outcome); } - /// Resubmits transactions back into the pool. - pub fn resubmit(&self, _transactions: HashMap>) { - unimplemented!() - } - /// Add a single validated transaction into the pool. /// /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s) @@ -224,8 +222,9 @@ where Ok(hash) } - TransactionValidationOutcome::Invalid(_tx, err) => { - // TODO notify listeners about invalid + TransactionValidationOutcome::Invalid(tx, err) => { + let mut listener = self.event_listener.write(); + listener.discarded(tx.hash()); Err(err) } } @@ -291,7 +290,7 @@ where if matches!(err, mpsc::error::TrySendError::Full(_)) { warn!( target: "txpool", - "dropping full transaction listener", + "skipping transaction on full transaction listener", ); true } else { diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs index 038bb44ac9..3540326729 100644 --- a/crates/transaction-pool/src/pool/parked.rs +++ b/crates/transaction-pool/src/pool/parked.rs @@ -83,6 +83,7 @@ impl ParkedPool { } /// Whether the pool is empty + #[cfg(test)] pub(crate) fn is_empty(&self) -> bool { self.by_id.is_empty() } diff --git a/crates/transaction-pool/src/pool/size.rs b/crates/transaction-pool/src/pool/size.rs index 2c8d207cce..0c4a63a134 100644 --- a/crates/transaction-pool/src/pool/size.rs +++ b/crates/transaction-pool/src/pool/size.rs @@ -12,13 +12,13 @@ pub struct SizeTracker(isize); impl AddAssign for SizeTracker { fn add_assign(&mut self, rhs: usize) { - self.0 += (rhs as isize) + self.0 += rhs as isize } } impl SubAssign for SizeTracker { fn sub_assign(&mut self, rhs: usize) { - self.0 -= (rhs as isize) + self.0 -= rhs as isize } } diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index b55de1f7a6..7dc6fc430f 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -11,7 +11,7 @@ use crate::{ update::{Destination, PoolUpdate}, AddedPendingTransaction, AddedTransaction, OnNewBlockOutcome, }, - traits::{PoolStatus, StateDiff}, + traits::{PoolSize, StateDiff}, OnNewBlockEvent, PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, ValidPoolTransaction, U256, }; @@ -19,7 +19,7 @@ use fnv::FnvHashMap; use reth_primitives::{TxHash, H256}; use std::{ cmp::Ordering, - collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet}, + collections::{btree_map::Entry, hash_map, BTreeMap, HashMap}, fmt, ops::Bound::{Excluded, Unbounded}, sync::Arc, @@ -107,9 +107,9 @@ impl TxPool { &self.all_transactions } - /// Returns stats about the pool. - pub(crate) fn status(&self) -> PoolStatus { - PoolStatus { + /// Returns stats about the size of pool. + pub(crate) fn size(&self) -> PoolSize { + PoolSize { pending: self.pending_pool.len(), pending_size: self.pending_pool.size(), basefee: self.basefee_pool.len(), @@ -210,7 +210,7 @@ impl TxPool { .or_default() .update(on_chain_nonce, on_chain_balance); - let hash = *tx.hash(); + let _hash = *tx.hash(); match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) { Ok(InsertOk { transaction, move_to, replaced_tx, updates, .. }) => { @@ -478,6 +478,7 @@ impl AllTransactions { } /// Returns the internal transaction with additional metadata + #[cfg(test)] pub(crate) fn get(&self, id: &TransactionId) -> Option<&PoolInternalTransaction> { self.txs.get(id) } @@ -518,7 +519,7 @@ impl AllTransactions { pub(crate) fn update( &mut self, pending_block_base_fee: U256, - state_diffs: &StateDiff, + _state_diffs: &StateDiff, ) -> Vec { // update new basefee self.pending_basefee = pending_block_base_fee; @@ -636,6 +637,7 @@ impl AllTransactions { /// Returns an iterator over all transactions for the given sender, starting with the lowest /// nonce #[cfg(test)] + #[allow(unused)] pub(crate) fn txs_iter( &self, sender: SenderId, @@ -648,6 +650,7 @@ impl AllTransactions { /// Returns a mutable iterator over all transactions for the given sender, starting with the /// lowest nonce #[cfg(test)] + #[allow(unused)] pub(crate) fn txs_iter_mut( &mut self, sender: SenderId, diff --git a/crates/transaction-pool/src/pool/update.rs b/crates/transaction-pool/src/pool/update.rs index f07972b168..6541e0a197 100644 --- a/crates/transaction-pool/src/pool/update.rs +++ b/crates/transaction-pool/src/pool/update.rs @@ -1,11 +1,6 @@ //! Support types for updating the pool. -use crate::{ - identifier::TransactionId, - pool::{state::SubPool, txpool::PoolInternalTransaction}, - PoolTransaction, -}; +use crate::{identifier::TransactionId, pool::state::SubPool}; use reth_primitives::TxHash; -use std::ops::{Deref, DerefMut}; /// A change of the transaction's location /// diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 2d8e85e0e9..f15092444e 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1,4 +1,4 @@ -use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction, BlockID}; +use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction}; use reth_primitives::{Address, FromRecoveredTransaction, PeerId, TxHash, H256, U256}; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, fmt, sync::Arc}; @@ -15,7 +15,7 @@ pub trait TransactionPool: Send + Sync + 'static { type Transaction: PoolTransaction; /// Returns stats about the pool. - fn status(&self) -> PoolStatus; + fn status(&self) -> PoolSize; /// Event listener for when a new block was mined. /// @@ -271,7 +271,7 @@ pub trait PoolTransaction: fmt::Debug + Send + Sync + FromRecoveredTransaction { /// Represents the current status of the pool. #[derive(Debug, Clone)] -pub struct PoolStatus { +pub struct PoolSize { /// Number of transactions in the _pending_ sub-pool. pub pending: usize, /// Reported size of transactions in the _pending_ sub-pool. diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index 20aa8e3e33..1d46c87905 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -5,7 +5,7 @@ use crate::{ identifier::{SenderId, TransactionId}, traits::{PoolTransaction, TransactionOrigin}, }; -use reth_primitives::{rpc::Address, BlockID, TxHash, U256}; +use reth_primitives::{rpc::Address, TxHash, U256}; use std::{fmt, time::Instant}; /// A Result type returned after checking a transaction's validity. @@ -103,7 +103,7 @@ impl ValidPoolTransaction { } /// Whether the transaction originated locally. - pub(crate) fn is_local(&self) -> bool { + pub fn is_local(&self) -> bool { self.origin.is_local() }