diff --git a/crates/transaction-pool/src/blobstore/disk.rs b/crates/transaction-pool/src/blobstore/disk.rs index 13736da657..69f7de7e4d 100644 --- a/crates/transaction-pool/src/blobstore/disk.rs +++ b/crates/transaction-pool/src/blobstore/disk.rs @@ -5,13 +5,17 @@ use alloy_rlp::{Decodable, Encodable}; use parking_lot::{Mutex, RwLock}; use reth_primitives::{BlobTransactionSidecar, TxHash, B256}; use schnellru::{ByLength, LruMap}; -use std::{fmt, fs, io, path::PathBuf, sync::Arc}; +use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc}; use tracing::{debug, trace}; /// How many [BlobTransactionSidecar] to cache in memory. pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100; /// A blob store that stores blob data on disk. +/// +/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but +/// it's expected that the maintenance task will call [BlobStore::cleanup] to remove the deleted +/// blobs from disk. #[derive(Clone, Debug)] pub struct DiskFileBlobStore { inner: Arc, @@ -58,19 +62,36 @@ impl BlobStore for DiskFileBlobStore { } fn delete(&self, tx: B256) -> Result<(), BlobStoreError> { - self.inner.delete_one(tx)?; + self.inner.txs_to_delete.write().insert(tx); Ok(()) } fn delete_all(&self, txs: Vec) -> Result<(), BlobStoreError> { - if txs.is_empty() { - return Ok(()) - } - - self.inner.delete_many(txs)?; + self.inner.txs_to_delete.write().extend(txs); Ok(()) } + fn cleanup(&self) { + let txs_to_delete = { + let mut txs_to_delete = self.inner.txs_to_delete.write(); + std::mem::take(&mut *txs_to_delete) + }; + self.inner.size_tracker.sub_len(txs_to_delete.len()); + let mut subsize = 0; + debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk"); + for tx in txs_to_delete { + let path = self.inner.blob_disk_file(tx); + let _ = fs::metadata(&path).map(|meta| { + subsize += meta.len(); + }); + let _ = fs::remove_file(&path).map_err(|e| { + let err = DiskFileBlobStoreError::DeleteFile(tx, path, e); + debug!(target:"txpool::blob", ?err); + }); + } + self.inner.size_tracker.sub_size(subsize as usize); + } + fn get(&self, tx: B256) -> Result, BlobStoreError> { self.inner.get_one(tx) } @@ -84,14 +105,14 @@ impl BlobStore for DiskFileBlobStore { txs: Vec, ) -> Result, BlobStoreError> { if txs.is_empty() { - return Ok(Vec::new()) + return Ok(Vec::new()); } self.inner.get_all(txs) } fn get_exact(&self, txs: Vec) -> Result, BlobStoreError> { if txs.is_empty() { - return Ok(Vec::new()) + return Ok(Vec::new()); } self.inner.get_exact(txs) } @@ -110,6 +131,7 @@ struct DiskFileBlobStoreInner { blob_cache: Mutex>, size_tracker: BlobStoreSize, file_lock: RwLock<()>, + txs_to_delete: RwLock>, } impl DiskFileBlobStoreInner { @@ -120,6 +142,7 @@ impl DiskFileBlobStoreInner { blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))), size_tracker: Default::default(), file_lock: Default::default(), + txs_to_delete: Default::default(), } } @@ -190,7 +213,7 @@ impl DiskFileBlobStoreInner { /// Returns true if the blob for the given transaction hash is in the blob cache or on disk. fn contains(&self, tx: B256) -> Result { if self.blob_cache.lock().get(&tx).is_some() { - return Ok(true) + return Ok(true); } // we only check if the file exists and assume it's valid Ok(self.blob_disk_file(tx).is_file()) @@ -199,7 +222,7 @@ impl DiskFileBlobStoreInner { /// Retrieves the blob for the given transaction hash from the blob cache or disk. fn get_one(&self, tx: B256) -> Result, BlobStoreError> { if let Some(blob) = self.blob_cache.lock().get(&tx) { - return Ok(Some(blob.clone())) + return Ok(Some(blob.clone())); } let blob = self.read_one(tx)?; if let Some(blob) = &blob { @@ -273,42 +296,11 @@ impl DiskFileBlobStoreInner { fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result { trace!( target:"txpool::blob", "[{:?}] writing blob file", tx); let path = self.blob_disk_file(tx); - let _lock = self.file_lock.write(); - - fs::write(&path, data).map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?; - Ok(data.len()) - } - - /// Retries the blob data for the given transaction hash. - #[inline] - fn delete_one(&self, tx: B256) -> Result<(), DiskFileBlobStoreError> { - trace!( target:"txpool::blob", "[{:?}] deleting blob file", tx); - let path = self.blob_disk_file(tx); - - let _lock = self.file_lock.write(); - fs::remove_file(&path).map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?; - - Ok(()) - } - - /// Retries the blob data for the given transaction hash. - #[inline] - fn delete_many( - &self, - txs: impl IntoIterator, - ) -> Result<(), DiskFileBlobStoreError> { - let _lock = self.file_lock.write(); - for tx in txs.into_iter() { - trace!( target:"txpool::blob", "[{:?}] deleting blob file", tx); - let path = self.blob_disk_file(tx); - - let _ = fs::remove_file(&path).map_err(|e| { - let err = DiskFileBlobStoreError::WriteFile(tx, path, e); - debug!( target:"txpool::blob", ?err); - }); + { + let _lock = self.file_lock.write(); + fs::write(&path, data).map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?; } - - Ok(()) + Ok(data.len()) } #[inline] @@ -329,11 +321,11 @@ impl DiskFileBlobStoreInner { } } if cache_miss.is_empty() { - return Ok(res) + return Ok(res); } let from_disk = self.read_many_decoded(cache_miss); if from_disk.is_empty() { - return Ok(res) + return Ok(res); } let mut cache = self.blob_cache.lock(); for (tx, data) in from_disk { @@ -361,6 +353,7 @@ impl fmt::Debug for DiskFileBlobStoreInner { f.debug_struct("DiskFileBlobStoreInner") .field("blob_dir", &self.blob_dir) .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len())) + .field("txs_to_delete", &self.txs_to_delete.try_read()) .finish() } } @@ -425,6 +418,7 @@ mod tests { strategy::{Strategy, ValueTree}, test_runner::TestRunner, }; + use std::sync::atomic::Ordering; fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) { let dir = tempfile::tempdir().unwrap(); @@ -459,7 +453,9 @@ mod tests { assert!(store.contains(all_hashes[0]).unwrap()); store.delete_all(all_hashes.clone()).unwrap(); + assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0])); store.clear_cache(); + store.cleanup(); assert!(store.get(blobs[0].0).unwrap().is_none()); @@ -468,5 +464,8 @@ mod tests { assert!(!store.contains(all_hashes[0]).unwrap()); assert!(store.get_exact(all_hashes).is_err()); + + assert_eq!(store.data_size_hint(), Some(0)); + assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0); } } diff --git a/crates/transaction-pool/src/blobstore/mem.rs b/crates/transaction-pool/src/blobstore/mem.rs index a06e45565c..7cf4cd894a 100644 --- a/crates/transaction-pool/src/blobstore/mem.rs +++ b/crates/transaction-pool/src/blobstore/mem.rs @@ -67,6 +67,8 @@ impl BlobStore for InMemoryBlobStore { Ok(()) } + fn cleanup(&self) {} + // Retrieves the decoded blob data for the given transaction hash. fn get(&self, tx: B256) -> Result, BlobStoreError> { let store = self.inner.store.read(); diff --git a/crates/transaction-pool/src/blobstore/mod.rs b/crates/transaction-pool/src/blobstore/mod.rs index 091cd1ec66..aa412f2a9d 100644 --- a/crates/transaction-pool/src/blobstore/mod.rs +++ b/crates/transaction-pool/src/blobstore/mod.rs @@ -34,6 +34,12 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static { /// Deletes multiple blob sidecars from the store fn delete_all(&self, txs: Vec) -> Result<(), BlobStoreError>; + /// A maintenance function that can be called periodically to clean up the blob store. + /// + /// This is intended to be called in the background to clean up any old or unused data, in case + /// the store uses deferred cleanup: [DiskFileBlobStore] + fn cleanup(&self); + /// Retrieves the decoded blob data for the given transaction hash. fn get(&self, tx: B256) -> Result, BlobStoreError>; @@ -106,6 +112,11 @@ impl BlobStoreSize { self.num_blobs.fetch_add(add, Ordering::Relaxed); } + #[inline] + pub(crate) fn sub_len(&self, sub: usize) { + self.num_blobs.fetch_sub(sub, Ordering::Relaxed); + } + #[inline] pub(crate) fn data_size(&self) -> usize { self.data_size.load(Ordering::Relaxed) diff --git a/crates/transaction-pool/src/blobstore/noop.rs b/crates/transaction-pool/src/blobstore/noop.rs index 81b844d48c..16c949e9da 100644 --- a/crates/transaction-pool/src/blobstore/noop.rs +++ b/crates/transaction-pool/src/blobstore/noop.rs @@ -23,6 +23,8 @@ impl BlobStore for NoopBlobStore { Ok(()) } + fn cleanup(&self) {} + fn get(&self, _tx: B256) -> Result, BlobStoreError> { Ok(None) } diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index a57709f136..ed007e5a96 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -548,6 +548,10 @@ where fn delete_blobs(&self, txs: Vec) { self.pool.delete_blobs(txs) } + + fn cleanup_blobs(&self) { + self.pool.cleanup_blobs() + } } impl Clone for Pool { diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index fb3662dbb5..b74fcc5a5e 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -194,6 +194,12 @@ pub async fn maintain_transaction_pool( pool.delete_blobs(blobs); } } + // also do periodic cleanup of the blob store + let pool = pool.clone(); + task_spawner.spawn_blocking(Box::pin(async move { + debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store"); + pool.cleanup_blobs(); + })); } // outcomes of the futures we are waiting on diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 58ab12a753..0ed073477b 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -786,6 +786,12 @@ where self.update_blob_store_metrics(); } + /// Cleans up the blob store + pub(crate) fn cleanup_blobs(&self) { + self.blob_store.cleanup(); + self.update_blob_store_metrics(); + } + fn update_blob_store_metrics(&self) { if let Some(data_size) = self.blob_store.data_size_hint() { self.blob_store_metrics.blobstore_byte_size.set(data_size as f64); diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 7bd926f0f7..ed484974d4 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -396,6 +396,9 @@ pub trait TransactionPoolExt: TransactionPool { /// Deletes multiple blob sidecars from the blob store fn delete_blobs(&self, txs: Vec); + + /// Maintenance function to cleanup blobs that are no longer needed. + fn cleanup_blobs(&self); } /// Determines what kind of new transactions should be emitted by a stream of transactions.