From 0beaf85f4bfbed26956fae488268db3ec626c9ba Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 24 Aug 2023 17:44:25 +0200 Subject: [PATCH] feat: remove finalized blobs (#4342) --- crates/transaction-pool/src/blobstore/mod.rs | 2 +- .../transaction-pool/src/blobstore/tracker.rs | 11 ++-- crates/transaction-pool/src/maintain.rs | 52 ++++++++++++++++++- 3 files changed, 56 insertions(+), 9 deletions(-) diff --git a/crates/transaction-pool/src/blobstore/mod.rs b/crates/transaction-pool/src/blobstore/mod.rs index bf0db1046a..786bbcd4f4 100644 --- a/crates/transaction-pool/src/blobstore/mod.rs +++ b/crates/transaction-pool/src/blobstore/mod.rs @@ -4,7 +4,7 @@ pub use mem::InMemoryBlobStore; pub use noop::NoopBlobStore; use reth_primitives::{BlobTransactionSidecar, H256}; use std::fmt; -pub use tracker::BlobStoreCanonTracker; +pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates}; mod mem; mod noop; diff --git a/crates/transaction-pool/src/blobstore/tracker.rs b/crates/transaction-pool/src/blobstore/tracker.rs index 0d1f783331..20461e1126 100644 --- a/crates/transaction-pool/src/blobstore/tracker.rs +++ b/crates/transaction-pool/src/blobstore/tracker.rs @@ -13,7 +13,7 @@ pub struct BlobStoreCanonTracker { impl BlobStoreCanonTracker { /// Adds a block to the blob store maintenance. - pub(crate) fn add_block( + pub fn add_block( &mut self, block_number: BlockNumber, blob_txs: impl IntoIterator, @@ -22,7 +22,7 @@ impl BlobStoreCanonTracker { } /// Adds all blocks to the tracked list of blocks. - pub(crate) fn add_blocks( + pub fn add_blocks( &mut self, blocks: impl IntoIterator)>, ) { @@ -32,7 +32,7 @@ impl BlobStoreCanonTracker { } /// Adds all blob transactions from the given chain to the tracker. - pub(crate) fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) { + pub fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) { let blob_txs = blocks.iter().map(|(num, blocks)| { let iter = blocks.body.iter().filter(|tx| tx.transaction.is_eip4844()).map(|tx| tx.hash); @@ -42,8 +42,7 @@ impl BlobStoreCanonTracker { } /// Invoked when a block is finalized. - #[allow(unused)] - pub(crate) fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates { + pub fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates { let mut finalized = Vec::new(); while let Some(entry) = self.blob_txs_in_blocks.first_entry() { if *entry.key() <= number { @@ -63,7 +62,7 @@ impl BlobStoreCanonTracker { /// Updates that should be applied to the blob store. #[derive(Debug, Eq, PartialEq)] -pub(crate) enum BlobStoreUpdates { +pub enum BlobStoreUpdates { /// No updates. None, /// Delete the given finalized transactions from the blob store. diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 2e9936fe95..fd3c83a485 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -1,7 +1,7 @@ //! Support for maintaining the state of the transaction pool use crate::{ - blobstore::BlobStoreCanonTracker, + blobstore::{BlobStoreCanonTracker, BlobStoreUpdates}, metrics::MaintainPoolMetrics, traits::{CanonicalStateUpdate, ChangedAccount, TransactionPoolExt}, BlockInfo, TransactionPool, @@ -10,7 +10,9 @@ use futures_util::{ future::{BoxFuture, Fuse, FusedFuture}, FutureExt, Stream, StreamExt, }; -use reth_primitives::{Address, BlockHash, BlockNumberOrTag, FromRecoveredTransaction}; +use reth_primitives::{ + Address, BlockHash, BlockNumber, BlockNumberOrTag, FromRecoveredTransaction, +}; use reth_provider::{ BlockReaderIdExt, CanonStateNotification, ChainSpecProvider, PostState, StateProviderFactory, }; @@ -97,6 +99,10 @@ pub async fn maintain_transaction_pool( // keeps track of mined blob transaction so we can clean finalized transactions let mut blob_store_tracker = BlobStoreCanonTracker::default(); + // keeps track of the latest finalized block + let mut last_finalized_block = + FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten()); + // keeps track of any dirty accounts that we know of are out of sync with the pool let mut dirty_addresses = HashSet::new(); @@ -154,6 +160,19 @@ pub async fn maintain_transaction_pool( task_spawner.spawn_blocking(fut); } + // check if we have a new finalized block + if let Some(finalized) = + last_finalized_block.update(client.finalized_block_number().ok().flatten()) + { + match blob_store_tracker.on_finalized_block(finalized) { + BlobStoreUpdates::None => {} + BlobStoreUpdates::Finalized(blobs) => { + // remove all finalized blobs from the blob store + pool.delete_blobs(blobs); + } + } + } + // outcomes of the futures we are waiting on let mut event = None; let mut reloaded = None; @@ -360,6 +379,35 @@ pub async fn maintain_transaction_pool( } } +struct FinalizedBlockTracker { + last_finalized_block: Option, +} + +impl FinalizedBlockTracker { + fn new(last_finalized_block: Option) -> Self { + Self { last_finalized_block } + } + + /// Updates the tracked finalized block and returns the new finalized block if it changed + fn update(&mut self, finalized_block: Option) -> Option { + match (self.last_finalized_block, finalized_block) { + (Some(last), Some(finalized)) => { + self.last_finalized_block = Some(finalized); + if last < finalized { + Some(finalized) + } else { + None + } + } + (None, Some(finalized)) => { + self.last_finalized_block = Some(finalized); + Some(finalized) + } + _ => None, + } + } +} + /// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual /// state. #[derive(Debug, Eq, PartialEq)]