diff --git a/crates/transaction-pool/src/blobstore/disk.rs b/crates/transaction-pool/src/blobstore/disk.rs index 1dcc9abf13..43caf112b7 100644 --- a/crates/transaction-pool/src/blobstore/disk.rs +++ b/crates/transaction-pool/src/blobstore/disk.rs @@ -75,6 +75,10 @@ impl BlobStore for DiskFileBlobStore { self.inner.get_one(tx) } + fn contains(&self, tx: B256) -> Result { + self.inner.contains(tx) + } + fn get_all( &self, txs: Vec, @@ -183,6 +187,15 @@ impl DiskFileBlobStoreInner { Ok(()) } + /// Returns true if the blob for the given transaction hash is in the blob cache or on disk. + fn contains(&self, tx: B256) -> Result { + if self.blob_cache.lock().get(&tx).is_some() { + return Ok(true) + } + // we only check if the file exists and assume it's valid + Ok(self.blob_disk_file(tx).is_file()) + } + /// Retrieves the blob for the given transaction hash from the blob cache or disk. fn get_one(&self, tx: B256) -> Result, BlobStoreError> { if let Some(blob) = self.blob_cache.lock().get(&tx) { @@ -438,6 +451,7 @@ mod tests { assert!(blobs.contains(&(tx, blob)), "missing blob {:?}", tx); } + assert!(store.contains(all_hashes[0]).unwrap()); store.delete_all(all_hashes.clone()).unwrap(); store.clear_cache(); @@ -446,6 +460,7 @@ mod tests { let all = store.get_all(all_hashes.clone()).unwrap(); assert!(all.is_empty()); + assert!(!store.contains(all_hashes[0]).unwrap()); assert!(store.get_exact(all_hashes).is_err()); } } diff --git a/crates/transaction-pool/src/blobstore/mem.rs b/crates/transaction-pool/src/blobstore/mem.rs index af2f4e746d..568fa5ec69 100644 --- a/crates/transaction-pool/src/blobstore/mem.rs +++ b/crates/transaction-pool/src/blobstore/mem.rs @@ -67,6 +67,11 @@ impl BlobStore for InMemoryBlobStore { Ok(store.get(&tx).cloned()) } + fn contains(&self, tx: B256) -> Result { + let store = self.inner.store.read(); + Ok(store.contains_key(&tx)) + } + fn get_all( &self, txs: Vec, diff --git a/crates/transaction-pool/src/blobstore/mod.rs b/crates/transaction-pool/src/blobstore/mod.rs index 0fffdb16d4..6dee69d4b3 100644 --- a/crates/transaction-pool/src/blobstore/mod.rs +++ b/crates/transaction-pool/src/blobstore/mod.rs @@ -34,6 +34,9 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static { /// Retrieves the decoded blob data for the given transaction hash. fn get(&self, tx: B256) -> Result, BlobStoreError>; + /// Checks if the given transaction hash is in the blob store. + fn contains(&self, tx: B256) -> Result; + /// Retrieves all decoded blob data for the given transaction hashes. /// /// This only returns the blobs that were found in the store. diff --git a/crates/transaction-pool/src/blobstore/noop.rs b/crates/transaction-pool/src/blobstore/noop.rs index b3d4915dd1..81b844d48c 100644 --- a/crates/transaction-pool/src/blobstore/noop.rs +++ b/crates/transaction-pool/src/blobstore/noop.rs @@ -27,6 +27,10 @@ impl BlobStore for NoopBlobStore { Ok(None) } + fn contains(&self, _tx: B256) -> Result { + Ok(false) + } + fn get_all( &self, _txs: Vec, diff --git a/crates/transaction-pool/src/blobstore/tracker.rs b/crates/transaction-pool/src/blobstore/tracker.rs index a3dd30bca0..c9221002ad 100644 --- a/crates/transaction-pool/src/blobstore/tracker.rs +++ b/crates/transaction-pool/src/blobstore/tracker.rs @@ -22,6 +22,8 @@ impl BlobStoreCanonTracker { } /// Adds all blocks to the tracked list of blocks. + /// + /// Replaces any previously tracked blocks with the set of transactions. pub fn add_blocks( &mut self, blocks: impl IntoIterator)>, @@ -32,6 +34,9 @@ impl BlobStoreCanonTracker { } /// Adds all blob transactions from the given chain to the tracker. + /// + /// Note: In case this is a chain that's part of a reorg, this replaces previously tracked + /// blocks. pub fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) { let blob_txs = blocks.iter().map(|(num, blocks)| { let iter = @@ -42,10 +47,12 @@ impl BlobStoreCanonTracker { } /// Invoked when a block is finalized. - pub fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates { + /// + /// This returns all blob transactions that were included in blocks that are now finalized. + pub fn on_finalized_block(&mut self, finalized_block: BlockNumber) -> BlobStoreUpdates { let mut finalized = Vec::new(); while let Some(entry) = self.blob_txs_in_blocks.first_entry() { - if *entry.key() <= number { + if *entry.key() <= finalized_block { finalized.extend(entry.remove_entry().1); } else { break diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index f0ab1124bd..a1d8b4ba7a 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -351,6 +351,9 @@ where origin: TransactionOrigin, transactions: Vec, ) -> PoolResult>> { + if transactions.is_empty() { + return Ok(Vec::new()) + } let validated = self.validate_all(origin, transactions).await?; let transactions = diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 4da9987b72..19c3d6d513 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -304,11 +304,12 @@ pub async fn maintain_transaction_pool( // to be re-injected // // Note: we no longer know if the tx was local or external + // Because the transactions are not finalized, the corresponding blobs are still in + // blob store (if we previously received them from the network) metrics.inc_reinserted_transactions(pruned_old_transactions.len()); let _ = pool.add_external_transactions(pruned_old_transactions).await; - // keep track of mined blob transactions - // TODO(mattsse): handle reorged transactions + // keep track of new mined blob transactions blob_store_tracker.add_new_chain_blocks(&new_blocks); } CanonStateNotification::Commit { new } => { diff --git a/crates/transaction-pool/src/validate/eth.rs b/crates/transaction-pool/src/validate/eth.rs index 51c6470bc1..c8ca6891d7 100644 --- a/crates/transaction-pool/src/validate/eth.rs +++ b/crates/transaction-pool/src/validate/eth.rs @@ -307,7 +307,11 @@ where ) } EthBlobTransactionSidecar::Missing => { - if let Ok(Some(_)) = self.blob_store.get(*transaction.hash()) { + // This can happen for re-injected blob transactions (on re-org), since the blob + // is stripped from the transaction and not included in a block. + // check if the blob is in the store, if it's included we previously validated + // it and inserted it + if let Ok(true) = self.blob_store.contains(*transaction.hash()) { // validated transaction is already in the store } else { return TransactionValidationOutcome::Invalid(