feat(txpool): add update_basefee function (#2447)

This commit is contained in:
Matthias Seitz
2023-04-30 11:05:51 +02:00
committed by GitHub
parent 7fa5dc632f
commit a3e627e801
9 changed files with 315 additions and 46 deletions

View File

@@ -103,6 +103,7 @@ use reth_primitives::{Address, TxHash, U256};
use reth_provider::StateProviderFactory;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc::Receiver;
use tracing::{instrument, trace};
mod config;
pub mod error;
@@ -165,6 +166,13 @@ where
self.inner().config()
}
/// Sets the current block info for the pool.
#[instrument(skip(self), target = "txpool")]
pub fn set_block_info(&self, info: BlockInfo) {
trace!(target: "txpool", "updating pool block info");
self.pool.set_block_info(info)
}
/// Returns future that validates all transaction in the given iterator.
async fn validate_all(
&self,

View File

@@ -2,10 +2,10 @@
use crate::{
traits::{CanonicalStateUpdate, ChangedAccount},
Pool, TransactionOrdering, TransactionPool, TransactionValidator,
BlockInfo, Pool, TransactionOrdering, TransactionPool, TransactionValidator,
};
use futures_util::{Stream, StreamExt};
use reth_primitives::{Address, BlockHash, FromRecoveredTransaction};
use reth_primitives::{Address, BlockHash, BlockNumberOrTag, FromRecoveredTransaction};
use reth_provider::{BlockProvider, CanonStateNotification, PostState, StateProviderFactory};
use std::{
borrow::Borrow,
@@ -27,7 +27,16 @@ pub async fn maintain_transaction_pool<Client, V, T, St>(
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
St: Stream<Item = CanonStateNotification> + Unpin,
{
// TODO set current head for the pool
// ensure the pool points to latest state
if let Ok(Some(latest)) = client.block(BlockNumberOrTag::Latest.into()) {
let latest = latest.seal_slow();
let info = BlockInfo {
last_seen_block_hash: latest.hash,
last_seen_block_number: latest.number,
pending_basefee: latest.next_block_base_fee().unwrap_or_default() as u128,
};
pool.set_block_info(info);
}
// keeps track of any dirty accounts that we failed to fetch the state for and need to retry
let mut dirty = HashSet::new();

View File

@@ -150,6 +150,10 @@ where
pub(crate) fn block_info(&self) -> BlockInfo {
self.pool.read().block_info()
}
/// Returns the currently tracked block
pub(crate) fn set_block_info(&self, info: BlockInfo) {
self.pool.write().set_block_info(info)
}
/// Returns the internal `SenderId` for this address
pub(crate) fn get_sender_id(&self, addr: Address) -> SenderId {

View File

@@ -13,6 +13,7 @@ use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc};
///
/// Note: This type is generic over [ParkedPool] which enforces that the underlying transaction type
/// is [ValidPoolTransaction] wrapped in an [Arc].
#[derive(Clone)]
pub(crate) struct ParkedPool<T: ParkedOrd> {
/// Keeps track of transactions inserted in the pool.
///
@@ -101,6 +102,41 @@ impl<T: ParkedOrd> ParkedPool<T> {
}
}
impl<T: PoolTransaction> ParkedPool<BasefeeOrd<T>> {
/// Removes all transactions and their dependent transaction from the subpool that no longer
/// satisfy the given basefee.
///
/// Note: the transactions are not returned in a particular order.
pub(crate) fn enforce_basefee(&mut self, basefee: u128) -> Vec<Arc<ValidPoolTransaction<T>>> {
let mut to_remove = Vec::new();
{
let mut iter = self.by_id.iter().peekable();
while let Some((id, tx)) = iter.next() {
if tx.transaction.transaction.max_fee_per_gas() < basefee {
// still parked -> skip descendant transactions
'this: while let Some((peek, _)) = iter.peek() {
if peek.sender != id.sender {
break 'this
}
iter.next();
}
} else {
to_remove.push(*id);
}
}
}
let mut removed = Vec::with_capacity(to_remove.len());
for id in to_remove {
removed.push(self.remove_transaction(&id).expect("transaction exists"));
}
removed
}
}
impl<T: ParkedOrd> Default for ParkedPool<T> {
fn default() -> Self {
Self {
@@ -225,12 +261,7 @@ impl_ord_wrapper!(BasefeeOrd);
impl<T: PoolTransaction> Ord for BasefeeOrd<T> {
fn cmp(&self, other: &Self) -> Ordering {
match (self.0.transaction.max_fee_per_gas(), other.0.transaction.max_fee_per_gas()) {
(Some(fee), Some(other)) => fee.cmp(&other),
(None, Some(_)) => Ordering::Less,
(Some(_), None) => Ordering::Greater,
_ => Ordering::Equal,
}
self.0.transaction.max_fee_per_gas().cmp(&other.0.transaction.max_fee_per_gas())
}
}
@@ -256,3 +287,63 @@ impl<T: PoolTransaction> Ord for QueuedOrd<T> {
other.timestamp.cmp(&self.timestamp))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{MockTransaction, MockTransactionFactory};
#[test]
fn test_enforce_parked_basefee() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
pool.add_transaction(tx.clone());
assert!(pool.by_id.contains_key(tx.id()));
assert_eq!(pool.len(), 1);
let removed = pool.enforce_basefee(u128::MAX);
assert!(removed.is_empty());
let removed = pool.enforce_basefee(tx.max_fee_per_gas() - 1);
assert_eq!(removed.len(), 1);
assert!(pool.is_empty());
}
#[test]
fn test_enforce_parked_basefee_descendant() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let t = MockTransaction::eip1559().inc_price_by(10);
let root_tx = f.validated_arc(t.clone());
pool.add_transaction(root_tx.clone());
let descendant_tx = f.validated_arc(t.inc_nonce().inc_price());
pool.add_transaction(descendant_tx.clone());
assert!(pool.by_id.contains_key(root_tx.id()));
assert!(pool.by_id.contains_key(descendant_tx.id()));
assert_eq!(pool.len(), 2);
let removed = pool.enforce_basefee(u128::MAX);
assert!(removed.is_empty());
// two dependent tx in the pool with decreasing fee
{
let mut pool2 = pool.clone();
let removed = pool2.enforce_basefee(descendant_tx.max_fee_per_gas());
assert_eq!(removed.len(), 1);
assert_eq!(pool2.len(), 1);
// descendant got popped
assert!(pool2.by_id.contains_key(root_tx.id()));
assert!(!pool2.by_id.contains_key(descendant_tx.id()));
}
// remove root transaction via root tx fee
let removed = pool.enforce_basefee(root_tx.max_fee_per_gas());
assert_eq!(removed.len(), 2);
assert!(pool.is_empty());
}
}

View File

@@ -20,6 +20,7 @@ use std::{
///
/// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction
/// is also pending, then this will be moved to the `independent` queue.
#[derive(Clone)]
pub(crate) struct PendingPool<T: TransactionOrdering> {
/// How to order transactions.
ordering: T,
@@ -83,6 +84,44 @@ impl<T: TransactionOrdering> PendingPool<T> {
}
}
/// Removes all transactions and their dependent transaction from the subpool that no longer
/// satisfy the given basefee (`tx.fee < basefee`)
///
/// Note: the transactions are not returned in a particular order.
pub(crate) fn enforce_basefee(
&mut self,
basefee: u128,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut to_remove = Vec::new();
{
let mut iter = self.by_id.iter().peekable();
while let Some((id, tx)) = iter.next() {
if tx.transaction.transaction.max_fee_per_gas() < basefee {
// this transaction no longer satisfies the basefee: remove it and all its
// descendants
to_remove.push(*id);
'this: while let Some((peek, _)) = iter.peek() {
if peek.sender != id.sender {
break 'this
}
to_remove.push(**peek);
iter.next();
}
}
}
}
dbg!(&to_remove);
let mut removed = Vec::with_capacity(to_remove.len());
for id in to_remove {
removed.push(self.remove_transaction(&id).expect("transaction exists"));
}
removed
}
/// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
///
/// Note: for a transaction with nonce higher than the current on chain nonce this will always
@@ -127,7 +166,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
/// Removes a _mined_ transaction from the pool.
///
/// If the transactions has a descendant transaction it will advance it to the best queue.
/// If the transaction has a descendant transaction it will advance it to the best queue.
pub(crate) fn prune_transaction(
&mut self,
id: &TransactionId,
@@ -147,9 +186,8 @@ impl<T: TransactionOrdering> PendingPool<T> {
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let tx = self.by_id.remove(id)?;
// keep track of size
self.size_of -= tx.transaction.transaction.size();
self.all.remove(&tx.transaction);
self.size_of -= tx.transaction.transaction.size();
self.independent_transactions.remove(&tx.transaction);
Some(tx.transaction.transaction.clone())
}
@@ -178,7 +216,6 @@ impl<T: TransactionOrdering> PendingPool<T> {
/// Whether the pool is empty
#[cfg(test)]
#[allow(unused)]
pub(crate) fn is_empty(&self) -> bool {
self.by_id.is_empty()
}
@@ -247,3 +284,65 @@ impl<T: TransactionOrdering> Ord for PendingTransactionRef<T> {
.then_with(|| other.submission_id.cmp(&self.submission_id))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{MockOrdering, MockTransaction, MockTransactionFactory};
#[test]
fn test_enforce_basefee() {
let mut f = MockTransactionFactory::default();
let mut pool = PendingPool::new(MockOrdering::default());
let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
pool.add_transaction(tx.clone());
assert!(pool.by_id.contains_key(tx.id()));
assert_eq!(pool.len(), 1);
let removed = pool.enforce_basefee(0);
assert!(removed.is_empty());
let removed = pool.enforce_basefee(tx.max_fee_per_gas() + 1);
assert_eq!(removed.len(), 1);
assert!(pool.is_empty());
}
#[test]
fn test_enforce_basefee_descendant() {
let mut f = MockTransactionFactory::default();
let mut pool = PendingPool::new(MockOrdering::default());
let t = MockTransaction::eip1559().inc_price_by(10);
let root_tx = f.validated_arc(t.clone());
pool.add_transaction(root_tx.clone());
let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
pool.add_transaction(descendant_tx.clone());
assert!(pool.by_id.contains_key(root_tx.id()));
assert!(pool.by_id.contains_key(descendant_tx.id()));
assert_eq!(pool.len(), 2);
assert_eq!(pool.independent_transactions.len(), 1);
let removed = pool.enforce_basefee(0);
assert!(removed.is_empty());
// two dependent tx in the pool with decreasing fee
{
let mut pool2 = pool.clone();
let removed = pool2.enforce_basefee(descendant_tx.max_fee_per_gas() + 1);
assert_eq!(removed.len(), 1);
assert_eq!(pool2.len(), 1);
// descendant got popped
assert!(pool2.by_id.contains_key(root_tx.id()));
assert!(!pool2.by_id.contains_key(descendant_tx.id()));
}
// remove root transaction via fee
let removed = pool.enforce_basefee(root_tx.max_fee_per_gas() + 1);
assert_eq!(removed.len(), 2);
assert!(pool.is_empty());
}
}

View File

@@ -128,6 +128,40 @@ impl<T: TransactionOrdering> TxPool<T> {
}
}
/// Updates the tracked basefee
///
/// Depending on the change in direction of the basefee, this will promote or demote
/// transactions from the basefee pool.
fn update_basefee(&mut self, pending_basefee: u128) {
match pending_basefee.cmp(&self.all_transactions.pending_basefee) {
Ordering::Equal => {
// fee unchanged, nothing to update
}
Ordering::Greater => {
// increased base fee: recheck pending pool and remove all that are no longer valid
for tx in self.pending_pool.enforce_basefee(pending_basefee) {
self.basefee_pool.add_transaction(tx);
}
}
Ordering::Less => {
// decreased base fee: recheck basefee pool and promote all that are now valid
for tx in self.basefee_pool.enforce_basefee(pending_basefee) {
self.pending_pool.add_transaction(tx);
}
}
}
}
/// Sets the current block info for the pool.
///
/// This will also apply updates to the pool based on the new base fee
pub(crate) fn set_block_info(&mut self, info: BlockInfo) {
let BlockInfo { last_seen_block_hash, last_seen_block_number, pending_basefee } = info;
self.all_transactions.last_seen_block_hash = last_seen_block_hash;
self.all_transactions.last_seen_block_number = last_seen_block_number;
self.update_basefee(pending_basefee)
}
/// Returns an iterator that yields transactions that are ready to be included in the block.
pub(crate) fn best_transactions(&self) -> BestTransactions<T> {
self.pending_pool.best()
@@ -638,7 +672,6 @@ impl<T: PoolTransaction> AllTransactions<T> {
// The `unique_sender` loop will process the first transaction of all senders, update its
// state and internally update all consecutive transactions
'transactions: while let Some((id, tx)) = iter.next() {
// Advances the iterator to the next sender
macro_rules! next_sender {
($iter:ident) => {
'this: while let Some((peek, _)) = iter.peek() {
@@ -649,7 +682,6 @@ impl<T: PoolTransaction> AllTransactions<T> {
}
};
}
// tracks the balance if the sender was changed in the block
let mut changed_balance = None;
@@ -770,14 +802,12 @@ impl<T: PoolTransaction> AllTransactions<T> {
/// Rechecks the transaction's dynamic fee condition.
fn update_tx_base_fee(pending_block_base_fee: &u128, tx: &mut PoolInternalTransaction<T>) {
// Recheck dynamic fee condition.
if let Some(fee_cap) = tx.transaction.max_fee_per_gas() {
match fee_cap.cmp(pending_block_base_fee) {
Ordering::Greater | Ordering::Equal => {
tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
}
Ordering::Less => {
tx.state.remove(TxState::ENOUGH_FEE_CAP_BLOCK);
}
match tx.transaction.max_fee_per_gas().cmp(pending_block_base_fee) {
Ordering::Greater | Ordering::Equal => {
tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
}
Ordering::Less => {
tx.state.remove(TxState::ENOUGH_FEE_CAP_BLOCK);
}
}
}
@@ -932,15 +962,12 @@ impl<T: PoolTransaction> AllTransactions<T> {
}
// Check dynamic fee
if let Some(fee_cap) = transaction.max_fee_per_gas() {
if fee_cap < self.minimal_protocol_basefee {
return Err(InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap })
}
if fee_cap >= self.pending_basefee {
state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
}
} else {
// legacy transactions always satisfy the condition
let fee_cap = transaction.max_fee_per_gas();
if fee_cap < self.minimal_protocol_basefee {
return Err(InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap })
}
if fee_cap >= self.pending_basefee {
state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
}

View File

@@ -251,8 +251,25 @@ impl MockTransaction {
/// Returns a new transaction with a higher gas price +1
pub fn inc_price(&self) -> Self {
self.inc_price_by(1)
}
/// Returns a new transaction with a higher gas price
pub fn inc_price_by(&self, value: u128) -> Self {
let mut next = self.clone();
let gas = self.get_gas_price().checked_add(1).unwrap();
let gas = self.get_gas_price().checked_add(value).unwrap();
next.with_gas_price(gas)
}
/// Returns a new transaction with a lower gas price -1
pub fn decr_price(&self) -> Self {
self.decr_price_by(1)
}
/// Returns a new transaction with a lower gas price
pub fn decr_price_by(&self, value: u128) -> Self {
let mut next = self.clone();
let gas = self.get_gas_price().checked_sub(value).unwrap();
next.with_gas_price(gas)
}
@@ -320,10 +337,10 @@ impl PoolTransaction for MockTransaction {
self.get_gas_limit()
}
fn max_fee_per_gas(&self) -> Option<u128> {
fn max_fee_per_gas(&self) -> u128 {
match self {
MockTransaction::Legacy { .. } => None,
MockTransaction::Eip1559 { max_fee_per_gas, .. } => Some(*max_fee_per_gas),
MockTransaction::Legacy { gas_price, .. } => *gas_price,
MockTransaction::Eip1559 { max_fee_per_gas, .. } => *max_fee_per_gas,
}
}
@@ -453,6 +470,9 @@ impl MockTransactionFactory {
pub fn validated(&mut self, transaction: MockTransaction) -> MockValidTx {
self.validated_with_origin(TransactionOrigin::External, transaction)
}
pub fn validated_arc(&mut self, transaction: MockTransaction) -> Arc<MockValidTx> {
Arc::new(self.validated(transaction))
}
/// Converts the transaction into a validated transaction
pub fn validated_with_origin(
@@ -482,7 +502,7 @@ impl MockTransactionFactory {
}
}
#[derive(Default)]
#[derive(Clone, Default)]
#[non_exhaustive]
pub struct MockOrdering;

View File

@@ -335,8 +335,10 @@ pub trait PoolTransaction:
/// Returns the EIP-1559 Max base fee the caller is willing to pay.
///
/// This will return `None` for non-EIP1559 transactions
fn max_fee_per_gas(&self) -> Option<u128>;
/// For legacy transactions this is gas_price.
///
/// This is also commonly referred to as the "Gas Fee Cap" (`GasFeeCap`).
fn max_fee_per_gas(&self) -> u128;
/// Returns the EIP-1559 Priority fee the caller is paying to the block author.
///
@@ -425,12 +427,14 @@ impl PoolTransaction for PooledTransaction {
/// Returns the EIP-1559 Max base fee the caller is willing to pay.
///
/// This will return `None` for non-EIP1559 transactions
fn max_fee_per_gas(&self) -> Option<u128> {
/// For legacy transactions this is gas_price.
///
/// This is also commonly referred to as the "Gas Fee Cap" (`GasFeeCap`).
fn max_fee_per_gas(&self) -> u128 {
match &self.transaction.transaction {
Transaction::Legacy(_) => None,
Transaction::Eip2930(_) => None,
Transaction::Eip1559(tx) => Some(tx.max_fee_per_gas),
Transaction::Legacy(tx) => tx.gas_price,
Transaction::Eip2930(tx) => tx.gas_price,
Transaction::Eip1559(tx) => tx.max_fee_per_gas,
}
}

View File

@@ -206,7 +206,7 @@ where
}
// Ensure max_priority_fee_per_gas (if EIP1559) is less than max_fee_per_gas if any.
if transaction.max_priority_fee_per_gas() > transaction.max_fee_per_gas() {
if transaction.max_priority_fee_per_gas() > Some(transaction.max_fee_per_gas()) {
return TransactionValidationOutcome::Invalid(
transaction,
InvalidTransactionError::TipAboveFeeCap.into(),
@@ -335,10 +335,17 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
}
/// Returns the EIP-1559 Max base fee the caller is willing to pay.
pub fn max_fee_per_gas(&self) -> Option<u128> {
///
/// For legacy transactions this is gas_price.
pub fn max_fee_per_gas(&self) -> u128 {
self.transaction.max_fee_per_gas()
}
/// Returns the EIP-1559 Max base fee the caller is willing to pay.
pub fn effective_gas_price(&self) -> u128 {
self.transaction.effective_gas_price()
}
/// Amount of gas that should be used in executing this transaction. This is paid up-front.
pub fn gas_limit(&self) -> u64 {
self.transaction.gas_limit()