feat(txpool): add missing txpool update checks (#2366)

This commit is contained in:
Matthias Seitz
2023-04-27 16:04:18 +02:00
committed by GitHub
parent 9b5a84acc8
commit 0d20d34eaf
5 changed files with 185 additions and 106 deletions

View File

@@ -65,24 +65,31 @@
//! transactions are _currently_ waiting for state changes that eventually move them into
//! category (2.) and become pending.
#![allow(dead_code)] // TODO(mattsse): remove once remaining checks implemented
use crate::{
error::{PoolError, PoolResult},
identifier::{SenderId, SenderIdentifiers, TransactionId},
pool::{listener::PoolEventBroadcast, state::SubPool, txpool::TxPool},
pool::{
listener::PoolEventBroadcast,
state::SubPool,
txpool::{SenderInfo, TxPool},
},
traits::{
BlockInfo, NewTransactionEvent, PoolSize, PoolTransaction, PropagatedTransactions,
TransactionOrigin,
},
validate::{TransactionValidationOutcome, ValidPoolTransaction},
CanonicalStateUpdate, PoolConfig, TransactionOrdering, TransactionValidator,
CanonicalStateUpdate, ChangedAccount, PoolConfig, TransactionOrdering, TransactionValidator,
};
use best::BestTransactions;
pub use events::TransactionEvent;
use parking_lot::{Mutex, RwLock};
use reth_primitives::{Address, TxHash, H256};
use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
use std::{
collections::{HashMap, HashSet},
fmt,
sync::Arc,
time::Instant,
};
use tokio::sync::mpsc;
use tracing::warn;
@@ -149,6 +156,22 @@ where
self.identifiers.write().sender_id_or_create(addr)
}
/// Converts the changed accounts to a map of sender ids to sender info (internal identifier
/// used for accounts)
fn changed_senders(
&self,
accs: impl Iterator<Item = ChangedAccount>,
) -> HashMap<SenderId, SenderInfo> {
let mut identifiers = self.identifiers.write();
accs.into_iter()
.map(|acc| {
let ChangedAccount { address, nonce, balance } = acc;
let sender_id = identifiers.sender_id_or_create(address);
(sender_id, SenderInfo { state_nonce: nonce, balance })
})
.collect()
}
/// Get the config the pool was configured with.
pub fn config(&self) -> &PoolConfig {
&self.config
@@ -190,7 +213,24 @@ where
/// Updates the entire pool after a new block was executed.
pub(crate) fn on_canonical_state_change(&self, update: CanonicalStateUpdate) {
let outcome = self.pool.write().on_canonical_state_change(update);
let CanonicalStateUpdate {
hash,
number,
pending_block_base_fee,
changed_accounts,
mined_transactions,
} = update;
let changed_senders = self.changed_senders(changed_accounts.into_iter());
let block_info = BlockInfo {
last_seen_block_hash: hash,
last_seen_block_number: number,
pending_basefee: pending_block_base_fee,
};
let outcome = self.pool.write().on_canonical_state_change(
block_info,
mined_transactions,
changed_senders,
);
self.notify_on_new_state(outcome);
}
@@ -439,13 +479,6 @@ pub struct AddedPendingTransaction<T: PoolTransaction> {
discarded: Vec<TxHash>,
}
impl<T: PoolTransaction> AddedPendingTransaction<T> {
/// Create a new, empty transaction.
fn new(transaction: Arc<ValidPoolTransaction<T>>) -> Self {
Self { transaction, promoted: Default::default(), discarded: Default::default() }
}
}
/// Represents a transaction that was added into the pool and its state
#[derive(Debug, Clone)]
pub enum AddedTransaction<T: PoolTransaction> {

View File

@@ -95,6 +95,7 @@ impl<T: ParkedOrd> ParkedPool<T> {
/// Whether the pool is empty
#[cfg(test)]
#[allow(unused)]
pub(crate) fn is_empty(&self) -> bool {
self.by_id.is_empty()
}

View File

@@ -3,7 +3,7 @@ use crate::{
pool::{best::BestTransactions, size::SizeTracker},
TransactionOrdering, ValidPoolTransaction,
};
use reth_primitives::TxHash;
use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet},
@@ -128,7 +128,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
/// Removes a _mined_ transaction from the pool.
///
/// If the transactions has a descendant transaction it will advance it to the best queue.
pub(crate) fn remove_mined(
pub(crate) fn prune_transaction(
&mut self,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
@@ -160,11 +160,6 @@ impl<T: TransactionOrdering> PendingPool<T> {
id
}
/// Returns the transaction for the id if it's in the pool but not yet mined.
pub(crate) fn get(&self, id: &TransactionId) -> Option<Arc<PendingTransaction<T>>> {
self.by_id.get(id).cloned()
}
/// Removes the worst transaction from this pool.
pub(crate) fn pop_worst(&mut self) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let worst = self.all.iter().next_back().map(|tx| *tx.transaction.id())?;
@@ -182,6 +177,8 @@ impl<T: TransactionOrdering> PendingPool<T> {
}
/// Whether the pool is empty
#[cfg(test)]
#[allow(unused)]
pub(crate) fn is_empty(&self) -> bool {
self.by_id.is_empty()
}
@@ -193,15 +190,6 @@ pub(crate) struct PendingTransaction<T: TransactionOrdering> {
pub(crate) transaction: PendingTransactionRef<T>,
}
// == impl PendingTransaction ===
impl<T: TransactionOrdering> PendingTransaction<T> {
/// Returns all ids this transaction satisfies.
pub(crate) fn id(&self) -> &TransactionId {
&self.transaction.transaction.transaction_id
}
}
impl<T: TransactionOrdering> Clone for PendingTransaction<T> {
fn clone(&self) -> Self {
Self { transaction: self.transaction.clone() }
@@ -223,11 +211,6 @@ impl<T: TransactionOrdering> PendingTransactionRef<T> {
pub(crate) fn unlocks(&self) -> TransactionId {
self.transaction.transaction_id.descendant()
}
/// The hash for this transaction
pub(crate) fn hash(&self) -> &TxHash {
self.transaction.hash()
}
}
impl<T: TransactionOrdering> Clone for PendingTransactionRef<T> {

View File

@@ -41,12 +41,6 @@ impl TxState {
*self >= TxState::PENDING_POOL_BITS
}
/// Returns `true` if the `ENOUGH_FEE_CAP_BLOCK` bit is set.
#[inline]
pub(crate) fn has_enough_fee_cap(&self) -> bool {
self.intersects(TxState::ENOUGH_FEE_CAP_BLOCK)
}
/// Returns `true` if the transaction has a nonce gap.
#[inline]
pub(crate) fn has_nonce_gap(&self) -> bool {

View File

@@ -13,8 +13,7 @@ use crate::{
AddedPendingTransaction, AddedTransaction, OnNewCanonicalStateOutcome,
},
traits::{BlockInfo, PoolSize},
CanonicalStateUpdate, ChangedAccount, PoolConfig, PoolResult, PoolTransaction,
TransactionOrdering, ValidPoolTransaction, U256,
PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, ValidPoolTransaction, U256,
};
use fnv::FnvHashMap;
use reth_primitives::{constants::MIN_PROTOCOL_BASE_FEE, TxHash, H256};
@@ -129,14 +128,6 @@ impl<T: TransactionOrdering> TxPool<T> {
}
}
/// Updates the pool based on the changed base fee.
///
/// This enforces the dynamic fee requirement.
pub(crate) fn update_base_fee(&mut self, _new_base_fee: U256) {
// TODO update according to the changed base_fee
todo!()
}
/// Returns an iterator that yields transactions that are ready to be included in the block.
pub(crate) fn best_transactions(&self) -> BestTransactions<T> {
self.pending_pool.best()
@@ -177,29 +168,32 @@ impl<T: TransactionOrdering> TxPool<T> {
/// sender allowance.
pub(crate) fn on_canonical_state_change(
&mut self,
event: CanonicalStateUpdate,
block_info: BlockInfo,
mined_transactions: Vec<TxHash>,
changed_senders: HashMap<SenderId, SenderInfo>,
) -> OnNewCanonicalStateOutcome {
// track changed accounts
self.sender_info.extend(changed_senders.clone());
// update block info
let block_hash = block_info.last_seen_block_hash;
self.all_transactions.set_block_info(block_info);
// Remove all transaction that were included in the block
for tx_hash in &event.mined_transactions {
if self.remove_transaction_by_hash(tx_hash).is_some() {
for tx_hash in mined_transactions.iter() {
if self.prune_transaction_by_hash(tx_hash).is_some() {
// Update removed transactions metric
self.metrics.removed_transactions.increment(1);
}
}
// Apply the state changes to the total set of transactions which triggers sub-pool updates.
let updates =
self.all_transactions.update(event.pending_block_base_fee, &event.changed_accounts);
let updates = self.all_transactions.update(changed_senders);
// Process the sub-pool updates
let UpdateOutcome { promoted, discarded } = self.process_updates(updates);
OnNewCanonicalStateOutcome {
block_hash: event.hash,
mined: event.mined_transactions,
promoted,
discarded,
}
OnNewCanonicalStateOutcome { block_hash, mined: mined_transactions, promoted, discarded }
}
/// Adds the transaction into the pool.
@@ -262,7 +256,7 @@ impl<T: TransactionOrdering> TxPool<T> {
// Update invalid transactions metric
self.metrics.invalid_transactions.increment(1);
match e {
InsertErr::Underpriced { existing, .. } => {
InsertErr::Underpriced { existing, transaction: _ } => {
Err(PoolError::ReplacementUnderpriced(existing))
}
InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap } => Err(
@@ -321,6 +315,9 @@ impl<T: TransactionOrdering> TxPool<T> {
}
/// Removes and returns all matching transactions from the pool.
///
/// Note: this does not advance any descendants of the removed transactions and does not apply
/// any additional updates.
pub(crate) fn remove_transactions(
&mut self,
hashes: impl IntoIterator<Item = TxHash>,
@@ -350,6 +347,19 @@ impl<T: TransactionOrdering> TxPool<T> {
self.remove_from_subpool(pool, tx.id())
}
/// This removes the transaction from the pool and advances any descendant state inside the
/// subpool.
///
/// This is intended to be used when a transaction is included in a block,
/// [Self::on_canonical_state_change]
fn prune_transaction_by_hash(
&mut self,
tx_hash: &H256,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let (tx, pool) = self.all_transactions.remove_transaction_by_hash(tx_hash)?;
self.prune_from_subpool(pool, tx.id())
}
/// Removes the transaction from the given pool.
///
/// Caution: this only removes the tx from the sub-pool and not from the pool itself
@@ -365,6 +375,20 @@ impl<T: TransactionOrdering> TxPool<T> {
}
}
/// Removes the transaction from the given pool and advance sub-pool internal state, with the
/// expectation that the given transaction is included in a block.
fn prune_from_subpool(
&mut self,
pool: SubPool,
tx: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
match pool {
SubPool::Queued => self.queued_pool.remove_transaction(tx),
SubPool::Pending => self.pending_pool.prune_transaction(tx),
SubPool::BaseFee => self.basefee_pool.remove_transaction(tx),
}
}
/// Removes _only_ the descendants of the given transaction from the entire pool.
///
/// All removed transactions are added to the `removed` vec.
@@ -571,6 +595,15 @@ impl<T: PoolTransaction> AllTransactions<T> {
}
}
/// Updates the block specific info
fn set_block_info(&mut self, block_info: BlockInfo) {
let BlockInfo { last_seen_block_hash, last_seen_block_number, pending_basefee } =
block_info;
self.last_seen_block_number = last_seen_block_number;
self.last_seen_block_hash = last_seen_block_hash;
self.pending_basefee = pending_basefee;
}
/// Rechecks all transactions in the pool against the changes.
///
/// Possible changes are:
@@ -588,14 +621,10 @@ impl<T: PoolTransaction> AllTransactions<T> {
/// that got transaction included in the block.
pub(crate) fn update(
&mut self,
pending_block_base_fee: u128,
_changed_accounts: &[ChangedAccount],
changed_accounts: HashMap<SenderId, SenderInfo>,
) -> Vec<PoolUpdate> {
// update new basefee
self.pending_basefee = pending_block_base_fee;
// TODO(mattsse): probably good idea to allocate some capacity here.
let mut updates = Vec::new();
// pre-allocate a few updates
let mut updates = Vec::with_capacity(64);
let mut iter = self.txs.iter_mut().peekable();
@@ -608,7 +637,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
// The `unique_sender` loop will process the first transaction of all senders, update its
// state and internally update all consecutive transactions
'unique_sender: while let Some((id, tx)) = iter.next() {
'transactions: while let Some((id, tx)) = iter.next() {
// Advances the iterator to the next sender
macro_rules! next_sender {
($iter:ident) => {
@@ -620,36 +649,85 @@ impl<T: PoolTransaction> AllTransactions<T> {
}
};
}
// If there's a nonce gap, we can shortcircuit, because there's nothing to update.
if tx.state.has_nonce_gap() {
next_sender!(iter);
continue
// tracks the balance if the sender was changed in the block
let mut changed_balance = None;
// check if this is a changed account
if let Some(info) = changed_accounts.get(&id.sender) {
// discard all transactions with a nonce lower than the current state nonce
if id.nonce < info.state_nonce {
updates.push(PoolUpdate {
id: *tx.transaction.id(),
hash: *tx.transaction.hash(),
current: tx.subpool,
destination: Destination::Discard,
});
continue 'transactions
}
let ancestor = TransactionId::ancestor(id.nonce, info.state_nonce, id.sender);
// If there's no ancestor then this is the next transaction.
if ancestor.is_none() {
tx.state.insert(TxState::NO_NONCE_GAPS);
tx.state.insert(TxState::NO_PARKED_ANCESTORS);
tx.cumulative_cost = U256::ZERO;
if tx.transaction.cost > info.balance {
// sender lacks sufficient funds to pay for this transaction
tx.state.remove(TxState::ENOUGH_BALANCE);
} else {
tx.state.insert(TxState::ENOUGH_BALANCE);
}
}
changed_balance = Some(info.balance);
}
// TODO(mattsse): if account has balance changes or mined transactions the balance needs
// to be checked here
// If there's a nonce gap, we can shortcircuit, because there's nothing to update yet.
if tx.state.has_nonce_gap() {
next_sender!(iter);
continue 'transactions
}
// Since this is the first transaction of the sender, it has no parked ancestors
tx.state.insert(TxState::NO_PARKED_ANCESTORS);
// Update the first transaction of this sender.
Self::update_base_fee(&pending_block_base_fee, tx);
Self::update_tx_base_fee(&self.pending_basefee, tx);
// Track if the transaction's sub-pool changed.
Self::record_subpool_update(&mut updates, tx);
// Track blocking transactions.
let mut has_parked_ancestor = !tx.state.is_pending();
let mut cumulative_cost = tx.next_cumulative_cost();
// Update all consecutive transaction of this sender
while let Some((peek, ref mut tx)) = iter.peek_mut() {
if peek.sender != id.sender {
// Found the next sender
continue 'unique_sender
continue 'transactions
}
// can short circuit
if tx.state.has_nonce_gap() {
next_sender!(iter);
continue 'unique_sender
continue 'transactions
}
// update cumulative cost
tx.cumulative_cost = cumulative_cost;
// Update for next transaction
cumulative_cost = tx.next_cumulative_cost();
// If the account changed in the block, check the balance.
if let Some(changed_balance) = changed_balance {
if cumulative_cost > changed_balance {
// sender lacks sufficient funds to pay for this transaction
tx.state.remove(TxState::ENOUGH_BALANCE);
} else {
tx.state.insert(TxState::ENOUGH_BALANCE);
}
}
// Update ancestor condition.
@@ -661,7 +739,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
has_parked_ancestor = !tx.state.is_pending();
// Update and record sub-pool changes.
Self::update_base_fee(&pending_block_base_fee, tx);
Self::update_tx_base_fee(&self.pending_basefee, tx);
Self::record_subpool_update(&mut updates, tx);
// Advance iterator
@@ -690,7 +768,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
}
/// Rechecks the transaction's dynamic fee condition.
fn update_base_fee(pending_block_base_fee: &u128, tx: &mut PoolInternalTransaction<T>) {
fn update_tx_base_fee(pending_block_base_fee: &u128, tx: &mut PoolInternalTransaction<T>) {
// Recheck dynamic fee condition.
if let Some(fee_cap) = tx.transaction.max_fee_per_gas() {
match fee_cap.cmp(pending_block_base_fee) {
@@ -738,22 +816,12 @@ impl<T: PoolTransaction> AllTransactions<T> {
self.txs.range((Excluded(id), Unbounded)).take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that _follow_ after the given id but have the same sender.
///
/// NOTE: The range is _exclusive_
pub(crate) fn descendant_txs_exclusive_mut<'a, 'b: 'a>(
&'a mut self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a mut PoolInternalTransaction<T>)> + '_ {
self.txs
.range_mut((Excluded(id), Unbounded))
.take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that _follow_ after the given id but have the same sender.
///
/// NOTE: The range is _inclusive_: if the transaction that belongs to `id` it field be the
/// first value.
#[cfg(test)]
#[allow(unused)]
pub(crate) fn descendant_txs<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
@@ -854,11 +922,11 @@ impl<T: PoolTransaction> AllTransactions<T> {
let mut cumulative_cost = U256::ZERO;
let mut updates = Vec::new();
let predecessor =
let ancestor =
TransactionId::ancestor(transaction.transaction.nonce(), on_chain_nonce, tx_id.sender);
// If there's no predecessor then this is the next transaction.
if predecessor.is_none() {
// If there's no ancestor tx then this is the next transaction.
if ancestor.is_none() {
state.insert(TxState::NO_NONCE_GAPS);
state.insert(TxState::NO_PARKED_ANCESTORS);
}
@@ -926,7 +994,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
// We need to find out if the next transaction of the sender is considered pending
//
let mut has_parked_ancestor = if predecessor.is_none() {
let mut has_parked_ancestor = if ancestor.is_none() {
// the new transaction is the next one
false
} else {
@@ -1046,7 +1114,11 @@ pub(crate) type InsertResult<T> = Result<InsertOk<T>, InsertErr<T>>;
#[derive(Debug)]
pub(crate) enum InsertErr<T: PoolTransaction> {
/// Attempted to replace existing transaction, but was underpriced
Underpriced { transaction: Arc<ValidPoolTransaction<T>>, existing: TxHash },
Underpriced {
#[allow(unused)]
transaction: Arc<ValidPoolTransaction<T>>,
existing: TxHash,
},
/// The transactions feeCap is lower than the chain's minimum fee requirement.
///
/// See also [`MIN_PROTOCOL_BASE_FEE`]
@@ -1071,6 +1143,7 @@ pub(crate) struct InsertOk<T: PoolTransaction> {
/// Where to move the transaction to.
move_to: SubPool,
/// Current state of the inserted tx.
#[allow(unused)]
state: TxState,
/// The transaction that was replaced by this.
replaced_tx: Option<(Arc<ValidPoolTransaction<T>>, SubPool)>,
@@ -1143,21 +1216,16 @@ impl<T: PoolTransaction> fmt::Debug for PruneResult<T> {
/// Stores relevant context about a sender.
#[derive(Debug, Clone, Default)]
struct SenderInfo {
pub(crate) struct SenderInfo {
/// current nonce of the sender.
state_nonce: u64,
pub(crate) state_nonce: u64,
/// Balance of the sender at the current point.
balance: U256,
pub(crate) balance: U256,
}
// === impl SenderInfo ===
impl SenderInfo {
/// Creates a new entry for an incoming, not yet tracked sender.
fn new_incoming(state_nonce: u64, balance: U256) -> Self {
Self { state_nonce, balance }
}
/// Updates the info with the new values.
fn update(&mut self, state_nonce: u64, balance: U256) {
*self = Self { state_nonce, balance };