diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index e65297ea72..97afe6c79a 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -388,6 +388,30 @@ impl Transaction { Transaction::Eip4844(tx) => tx.size(), } } + + /// Returns true if the transaction is a legacy transaction. + #[inline] + pub fn is_legacy(&self) -> bool { + matches!(self, Transaction::Legacy(_)) + } + + /// Returns true if the transaction is an EIP-2930 transaction. + #[inline] + pub fn is_eip2930(&self) -> bool { + matches!(self, Transaction::Eip2930(_)) + } + + /// Returns true if the transaction is an EIP-1559 transaction. + #[inline] + pub fn is_eip1559(&self) -> bool { + matches!(self, Transaction::Eip1559(_)) + } + + /// Returns true if the transaction is an EIP-4844 transaction. + #[inline] + pub fn is_eip4844(&self) -> bool { + matches!(self, Transaction::Eip4844(_)) + } } impl Compact for Transaction { diff --git a/crates/transaction-pool/src/blobstore/maintain.rs b/crates/transaction-pool/src/blobstore/maintain.rs deleted file mode 100644 index cfc4c8fc68..0000000000 --- a/crates/transaction-pool/src/blobstore/maintain.rs +++ /dev/null @@ -1,27 +0,0 @@ -//! Support for maintaining the blob pool. - -use crate::blobstore::BlobStore; -use reth_primitives::H256; -use std::collections::BTreeMap; - -/// The type that is used to maintain the blob store and discard finalized transactions. -#[derive(Debug)] -#[allow(unused)] -pub struct BlobStoreMaintainer { - /// The blob store that holds all the blob data. - store: S, - /// Keeps track of the blob transactions that are in blocks. - blob_txs_in_blocks: BTreeMap>, -} - -impl BlobStoreMaintainer { - /// Creates a new blob store maintenance instance. - pub fn new(store: S) -> Self { - Self { store, blob_txs_in_blocks: Default::default() } - } -} - -impl BlobStoreMaintainer { - /// Invoked when a block is finalized. - pub fn on_finalized(&mut self, _block_number: u64) {} -} diff --git a/crates/transaction-pool/src/blobstore/mem.rs b/crates/transaction-pool/src/blobstore/mem.rs index 6d1dcb76aa..187b9026f0 100644 --- a/crates/transaction-pool/src/blobstore/mem.rs +++ b/crates/transaction-pool/src/blobstore/mem.rs @@ -16,16 +16,17 @@ pub struct InMemoryBlobStore { struct InMemoryBlobStoreInner { /// Storage for all blob data. store: RwLock>, - size: AtomicUsize, + data_size: AtomicUsize, + num_blobs: AtomicUsize, } impl InMemoryBlobStoreInner { fn add_size(&self, add: usize) { - self.size.fetch_add(add, std::sync::atomic::Ordering::Relaxed); + self.data_size.fetch_add(add, std::sync::atomic::Ordering::Relaxed); } fn sub_size(&self, sub: usize) { - self.size.fetch_sub(sub, std::sync::atomic::Ordering::Relaxed); + self.data_size.fetch_sub(sub, std::sync::atomic::Ordering::Relaxed); } fn update_size(&self, add: usize, sub: usize) { @@ -35,6 +36,10 @@ impl InMemoryBlobStoreInner { self.sub_size(sub - add); } } + + fn update_len(&self, len: usize) { + self.num_blobs.store(len, std::sync::atomic::Ordering::Relaxed); + } } impl BlobStore for InMemoryBlobStore { @@ -42,6 +47,7 @@ impl BlobStore for InMemoryBlobStore { let mut store = self.inner.store.write(); let (add, sub) = insert_size(&mut store, tx, data); self.inner.update_size(add, sub); + self.inner.update_len(store.len()); Ok(()) } @@ -58,6 +64,7 @@ impl BlobStore for InMemoryBlobStore { total_sub += sub; } self.inner.update_size(total_add, total_sub); + self.inner.update_len(store.len()); Ok(()) } @@ -65,6 +72,7 @@ impl BlobStore for InMemoryBlobStore { let mut store = self.inner.store.write(); let sub = remove_size(&mut store, &tx); self.inner.sub_size(sub); + self.inner.update_len(store.len()); Ok(()) } @@ -78,6 +86,7 @@ impl BlobStore for InMemoryBlobStore { total_sub += remove_size(&mut store, &tx); } self.inner.sub_size(total_sub); + self.inner.update_len(store.len()); Ok(()) } @@ -103,7 +112,11 @@ impl BlobStore for InMemoryBlobStore { } fn data_size_hint(&self) -> Option { - Some(self.inner.size.load(std::sync::atomic::Ordering::Relaxed)) + Some(self.inner.data_size.load(std::sync::atomic::Ordering::Relaxed)) + } + + fn blobs_len(&self) -> usize { + self.inner.num_blobs.load(std::sync::atomic::Ordering::Relaxed) } } diff --git a/crates/transaction-pool/src/blobstore/mod.rs b/crates/transaction-pool/src/blobstore/mod.rs index dcc6764389..bf0db1046a 100644 --- a/crates/transaction-pool/src/blobstore/mod.rs +++ b/crates/transaction-pool/src/blobstore/mod.rs @@ -1,14 +1,14 @@ //! Storage for blob data of EIP4844 transactions. -use reth_primitives::{BlobTransactionSidecar, H256}; -use std::fmt; -mod maintain; -mod mem; -mod noop; - -pub use maintain::BlobStoreMaintainer; pub use mem::InMemoryBlobStore; pub use noop::NoopBlobStore; +use reth_primitives::{BlobTransactionSidecar, H256}; +use std::fmt; +pub use tracker::BlobStoreCanonTracker; + +mod mem; +mod noop; +mod tracker; /// A blob store that can be used to store blob data of EIP4844 transactions. /// @@ -43,6 +43,9 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static { /// Data size of all transactions in the blob store. fn data_size_hint(&self) -> Option; + + /// How many blobs are in the blob store. + fn blobs_len(&self) -> usize; } /// Error variants that can occur when interacting with a blob store. diff --git a/crates/transaction-pool/src/blobstore/noop.rs b/crates/transaction-pool/src/blobstore/noop.rs index d21bf59ef1..3cb30a22e9 100644 --- a/crates/transaction-pool/src/blobstore/noop.rs +++ b/crates/transaction-pool/src/blobstore/noop.rs @@ -37,4 +37,8 @@ impl BlobStore for NoopBlobStore { fn data_size_hint(&self) -> Option { Some(0) } + + fn blobs_len(&self) -> usize { + 0 + } } diff --git a/crates/transaction-pool/src/blobstore/tracker.rs b/crates/transaction-pool/src/blobstore/tracker.rs new file mode 100644 index 0000000000..0d1f783331 --- /dev/null +++ b/crates/transaction-pool/src/blobstore/tracker.rs @@ -0,0 +1,95 @@ +//! Support for maintaining the blob pool. + +use reth_primitives::{BlockNumber, H256}; +use reth_provider::chain::ChainBlocks; +use std::collections::BTreeMap; + +/// The type that is used to track canonical blob transactions. +#[derive(Debug, Default, Eq, PartialEq)] +pub struct BlobStoreCanonTracker { + /// Keeps track of the blob transactions included in blocks. + blob_txs_in_blocks: BTreeMap>, +} + +impl BlobStoreCanonTracker { + /// Adds a block to the blob store maintenance. + pub(crate) fn add_block( + &mut self, + block_number: BlockNumber, + blob_txs: impl IntoIterator, + ) { + self.blob_txs_in_blocks.insert(block_number, blob_txs.into_iter().collect()); + } + + /// Adds all blocks to the tracked list of blocks. + pub(crate) fn add_blocks( + &mut self, + blocks: impl IntoIterator)>, + ) { + for (block_number, blob_txs) in blocks { + self.add_block(block_number, blob_txs); + } + } + + /// Adds all blob transactions from the given chain to the tracker. + pub(crate) fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) { + let blob_txs = blocks.iter().map(|(num, blocks)| { + let iter = + blocks.body.iter().filter(|tx| tx.transaction.is_eip4844()).map(|tx| tx.hash); + (*num, iter) + }); + self.add_blocks(blob_txs); + } + + /// Invoked when a block is finalized. + #[allow(unused)] + pub(crate) fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates { + let mut finalized = Vec::new(); + while let Some(entry) = self.blob_txs_in_blocks.first_entry() { + if *entry.key() <= number { + finalized.extend(entry.remove_entry().1); + } else { + break + } + } + + if finalized.is_empty() { + BlobStoreUpdates::None + } else { + BlobStoreUpdates::Finalized(finalized) + } + } +} + +/// Updates that should be applied to the blob store. +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum BlobStoreUpdates { + /// No updates. + None, + /// Delete the given finalized transactions from the blob store. + Finalized(Vec), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_finalized_tracker() { + let mut tracker = BlobStoreCanonTracker::default(); + + let block1 = vec![H256::random()]; + let block2 = vec![H256::random()]; + let block3 = vec![H256::random()]; + tracker.add_block(1, block1.clone()); + tracker.add_block(2, block2.clone()); + tracker.add_block(3, block3.clone()); + + assert_eq!(tracker.on_finalized_block(0), BlobStoreUpdates::None); + assert_eq!(tracker.on_finalized_block(1), BlobStoreUpdates::Finalized(block1)); + assert_eq!( + tracker.on_finalized_block(3), + BlobStoreUpdates::Finalized(block2.into_iter().chain(block3).collect::>()) + ); + } +} diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index d7fc2f1de8..60f56cace2 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -493,6 +493,14 @@ where fn update_accounts(&self, accounts: Vec) { self.pool.update_accounts(accounts); } + + fn delete_blob(&self, tx: TxHash) { + self.pool.delete_blob(tx) + } + + fn delete_blobs(&self, txs: Vec) { + self.pool.delete_blobs(txs) + } } impl Clone for Pool { diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 05bb7824a0..c7c55a6196 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -1,6 +1,7 @@ //! Support for maintaining the state of the transaction pool use crate::{ + blobstore::BlobStoreCanonTracker, metrics::MaintainPoolMetrics, traits::{CanonicalStateUpdate, ChangedAccount, TransactionPoolExt}, BlockInfo, TransactionPool, @@ -93,6 +94,9 @@ pub async fn maintain_transaction_pool( pool.set_block_info(info); } + // keeps track of mined blob transaction so we can clean finalized transactions + let mut blob_store_tracker = BlobStoreCanonTracker::default(); + // keeps track of any dirty accounts that we know of are out of sync with the pool let mut dirty_addresses = HashSet::new(); @@ -283,6 +287,10 @@ pub async fn maintain_transaction_pool( // Note: we no longer know if the tx was local or external metrics.inc_reinserted_transactions(pruned_old_transactions.len()); let _ = pool.add_external_transactions(pruned_old_transactions).await; + + // keep track of mined blob transactions + // TODO(mattsse): handle reorged transactions + blob_store_tracker.add_new_chain_blocks(&new_blocks); } CanonStateNotification::Commit { new } => { let (blocks, state) = new.inner(); @@ -314,6 +322,10 @@ pub async fn maintain_transaction_pool( pending_basefee: pending_block_base_fee, }; pool.set_block_info(info); + + // keep track of mined blob transactions + blob_store_tracker.add_new_chain_blocks(&blocks); + continue } @@ -344,6 +356,9 @@ pub async fn maintain_transaction_pool( timestamp: tip.timestamp, }; pool.on_canonical_state_change(update); + + // keep track of mined blob transactions + blob_store_tracker.add_new_chain_blocks(&blocks); } } } diff --git a/crates/transaction-pool/src/metrics.rs b/crates/transaction-pool/src/metrics.rs index 85c3d707f6..966834c63b 100644 --- a/crates/transaction-pool/src/metrics.rs +++ b/crates/transaction-pool/src/metrics.rs @@ -46,6 +46,10 @@ pub struct BlobStoreMetrics { pub(crate) blobstore_failed_inserts: Counter, /// Number of failed deletes into the blobstore pub(crate) blobstore_failed_deletes: Counter, + /// The number of bytes the blobs in the blobstore take up + pub(crate) blobstore_byte_size: Gauge, + /// How many blobs are currently in the blobstore + pub(crate) blobstore_entries: Gauge, } /// Transaction pool maintenance metrics diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 3b633eec22..718fb40704 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -659,14 +659,33 @@ where warn!(target: "txpool", ?err, "[{:?}] failed to insert blob", hash); self.blob_store_metrics.blobstore_failed_inserts.increment(1); } + self.update_blob_store_metrics(); } /// Delete a blob from the blob store - fn delete_blob(&self, blob: TxHash) { + pub(crate) fn delete_blob(&self, blob: TxHash) { if let Err(err) = self.blob_store.delete(blob) { warn!(target: "txpool", ?err, "[{:?}] failed to delete blobs", blob); self.blob_store_metrics.blobstore_failed_deletes.increment(1); } + self.update_blob_store_metrics(); + } + + /// Delete all blobs from the blob store + pub(crate) fn delete_blobs(&self, txs: Vec) { + let num = txs.len(); + if let Err(err) = self.blob_store.delete_all(txs) { + warn!(target: "txpool", ?err,?num, "failed to delete blobs"); + self.blob_store_metrics.blobstore_failed_deletes.increment(num as u64); + } + self.update_blob_store_metrics(); + } + + fn update_blob_store_metrics(&self) { + if let Some(data_size) = self.blob_store.data_size_hint() { + self.blob_store_metrics.blobstore_byte_size.set(data_size as f64); + } + self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64); } } diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index eb41edd4b7..a9411dd212 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -299,6 +299,12 @@ pub trait TransactionPoolExt: TransactionPool { /// Updates the accounts in the pool fn update_accounts(&self, accounts: Vec); + + /// Deletes the blob sidecar for the given transaction from the blob store + fn delete_blob(&self, tx: H256); + + /// Deletes multiple blob sidecars from the blob store + fn delete_blobs(&self, txs: Vec); } /// Determines what kind of new pending transactions should be emitted by a stream of pending