Files
reth/crates/optimism/txpool/src/maintain.rs
2025-03-26 11:04:01 +00:00

78 lines
2.8 KiB
Rust

//! Support for maintaining the state of the transaction pool
use crate::{conditional::MaybeConditionalTransaction, interop::MaybeInteropTransaction};
use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives_traits::NodePrimitives;
use reth_transaction_pool::TransactionPool;
/// Transaction pool maintenance metrics
#[derive(Metrics)]
#[metrics(scope = "transaction_pool")]
struct MaintainPoolMetrics {
/// Counter indicating the number of conditional transactions removed from
/// the pool because of exceeded block attributes.
removed_tx_conditional: Counter,
}
impl MaintainPoolMetrics {
#[inline]
fn inc_removed_tx_conditional(&self, count: usize) {
self.removed_tx_conditional.increment(count as u64);
}
}
/// Returns a spawnable future for maintaining the state of the transaction pool.
pub fn maintain_transaction_pool_future<N, Pool, St>(
pool: Pool,
events: St,
) -> BoxFuture<'static, ()>
where
N: NodePrimitives,
Pool: TransactionPool + 'static,
Pool::Transaction: MaybeConditionalTransaction + MaybeInteropTransaction,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
async move {
maintain_transaction_pool(pool, events).await;
}
.boxed()
}
/// Maintains the state of the transaction pool by handling new blocks and reorgs.
///
/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
pub async fn maintain_transaction_pool<N, Pool, St>(pool: Pool, mut events: St)
where
N: NodePrimitives,
Pool: TransactionPool,
Pool::Transaction: MaybeConditionalTransaction + MaybeInteropTransaction,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
let metrics = MaintainPoolMetrics::default();
loop {
let Some(event) = events.next().await else { break };
if let CanonStateNotification::Commit { new } = event {
if new.is_empty() {
continue;
}
let block_attr = BlockConditionalAttributes {
number: new.tip().number(),
timestamp: new.tip().timestamp(),
};
let mut to_remove = Vec::new();
for tx in &pool.pooled_transactions() {
if tx.transaction.has_exceeded_block_attributes(&block_attr) {
to_remove.push(*tx.hash());
}
}
if !to_remove.is_empty() {
metrics.inc_removed_tx_conditional(to_remove.len());
let _ = pool.remove_transactions(to_remove);
}
}
}
}