mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 17:18:08 -05:00
get queued and pending transactions by sender in tx pool (#6256)
This commit is contained in:
@@ -30,13 +30,12 @@ impl SenderIdentifiers {
|
||||
|
||||
/// Returns the existing `SendId` or assigns a new one if it's missing
|
||||
pub(crate) fn sender_id_or_create(&mut self, addr: Address) -> SenderId {
|
||||
if let Some(id) = self.sender_id(&addr) {
|
||||
return id
|
||||
}
|
||||
let id = self.next_id();
|
||||
self.address_to_id.insert(addr, id);
|
||||
self.sender_to_address.insert(id, addr);
|
||||
id
|
||||
self.sender_id(&addr).unwrap_or_else(|| {
|
||||
let id = self.next_id();
|
||||
self.address_to_id.insert(addr, id);
|
||||
self.sender_to_address.insert(id, addr);
|
||||
id
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a new address
|
||||
|
||||
@@ -91,7 +91,7 @@ impl<T: ParkedOrd> ParkedPool<T> {
|
||||
self.by_id
|
||||
.range((sender.start_bound(), Unbounded))
|
||||
.take_while(move |(other, _)| sender == other.sender)
|
||||
.map(|(_, tx)| *tx.transaction.id())
|
||||
.map(|(tx_id, _)| *tx_id)
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
use crate::{
|
||||
identifier::TransactionId,
|
||||
pool::{best::BestTransactions, size::SizeTracker},
|
||||
identifier::{SenderId, TransactionId},
|
||||
pool::{
|
||||
best::{BestTransactions, BestTransactionsWithBasefee},
|
||||
size::SizeTracker,
|
||||
},
|
||||
Priority, SubPoolLimit, TransactionOrdering, ValidPoolTransaction,
|
||||
};
|
||||
|
||||
use crate::pool::best::BestTransactionsWithBasefee;
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
ops::Bound::Unbounded,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::sync::broadcast;
|
||||
@@ -489,6 +491,15 @@ impl<T: TransactionOrdering> PendingPool<T> {
|
||||
self.by_id.contains_key(id)
|
||||
}
|
||||
|
||||
/// Get transactions by sender
|
||||
pub(crate) fn get_txs_by_sender(&self, sender: SenderId) -> Vec<TransactionId> {
|
||||
self.by_id
|
||||
.range((sender.start_bound(), Unbounded))
|
||||
.take_while(move |(other, _)| sender == other.sender)
|
||||
.map(|(tx_id, _)| *tx_id)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Retrieves a transaction with the given ID from the pool, if it exists.
|
||||
fn get(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
|
||||
self.by_id.get(id)
|
||||
|
||||
@@ -357,9 +357,15 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
|
||||
/// Returns all transactions from parked pools
|
||||
pub(crate) fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
|
||||
let mut queued = self.basefee_pool.all().collect::<Vec<_>>();
|
||||
queued.extend(self.queued_pool.all());
|
||||
queued
|
||||
self.basefee_pool.all().chain(self.queued_pool.all()).collect()
|
||||
}
|
||||
|
||||
/// Returns queued and pending transactions for the specified sender
|
||||
pub fn queued_and_pending_txs_by_sender(
|
||||
&self,
|
||||
sender: SenderId,
|
||||
) -> (Vec<TransactionId>, Vec<TransactionId>) {
|
||||
(self.queued_pool.get_txs_by_sender(sender), self.pending_pool.get_txs_by_sender(sender))
|
||||
}
|
||||
|
||||
/// Returns `true` if the transaction with the given hash is already included in this pool.
|
||||
@@ -583,8 +589,7 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
/// This will move/discard the given transaction according to the `PoolUpdate`
|
||||
fn process_updates(&mut self, updates: Vec<PoolUpdate>) -> UpdateOutcome<T::Transaction> {
|
||||
let mut outcome = UpdateOutcome::default();
|
||||
for update in updates {
|
||||
let PoolUpdate { id, hash, current, destination } = update;
|
||||
for PoolUpdate { id, hash, current, destination } in updates {
|
||||
match destination {
|
||||
Destination::Discard => {
|
||||
// remove the transaction from the pool and subpool
|
||||
@@ -594,7 +599,7 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
self.metrics.removed_transactions.increment(1);
|
||||
}
|
||||
Destination::Pool(move_to) => {
|
||||
debug_assert!(!move_to.eq(¤t), "destination must be different");
|
||||
debug_assert_ne!(&move_to, ¤t, "destination must be different");
|
||||
let moved = self.move_transaction(current, move_to, &id);
|
||||
if matches!(move_to, SubPool::Pending) {
|
||||
if let Some(tx) = moved {
|
||||
|
||||
Reference in New Issue
Block a user