feat: add blob store canon tracker (#4278)

This commit is contained in:
Matthias Seitz
2023-08-21 14:41:21 +02:00
committed by GitHub
parent 2523154260
commit 566e244e32
11 changed files with 203 additions and 39 deletions

View File

@@ -388,6 +388,30 @@ impl Transaction {
Transaction::Eip4844(tx) => tx.size(),
}
}
/// Returns true if the transaction is a legacy transaction.
#[inline]
pub fn is_legacy(&self) -> bool {
matches!(self, Transaction::Legacy(_))
}
/// Returns true if the transaction is an EIP-2930 transaction.
#[inline]
pub fn is_eip2930(&self) -> bool {
matches!(self, Transaction::Eip2930(_))
}
/// Returns true if the transaction is an EIP-1559 transaction.
#[inline]
pub fn is_eip1559(&self) -> bool {
matches!(self, Transaction::Eip1559(_))
}
/// Returns true if the transaction is an EIP-4844 transaction.
#[inline]
pub fn is_eip4844(&self) -> bool {
matches!(self, Transaction::Eip4844(_))
}
}
impl Compact for Transaction {

View File

@@ -1,27 +0,0 @@
//! Support for maintaining the blob pool.
use crate::blobstore::BlobStore;
use reth_primitives::H256;
use std::collections::BTreeMap;
/// The type that is used to maintain the blob store and discard finalized transactions.
#[derive(Debug)]
#[allow(unused)]
pub struct BlobStoreMaintainer<S> {
/// The blob store that holds all the blob data.
store: S,
/// Keeps track of the blob transactions that are in blocks.
blob_txs_in_blocks: BTreeMap<u64, Vec<H256>>,
}
impl<S> BlobStoreMaintainer<S> {
/// Creates a new blob store maintenance instance.
pub fn new(store: S) -> Self {
Self { store, blob_txs_in_blocks: Default::default() }
}
}
impl<S: BlobStore> BlobStoreMaintainer<S> {
/// Invoked when a block is finalized.
pub fn on_finalized(&mut self, _block_number: u64) {}
}

View File

@@ -16,16 +16,17 @@ pub struct InMemoryBlobStore {
struct InMemoryBlobStoreInner {
/// Storage for all blob data.
store: RwLock<HashMap<H256, BlobTransactionSidecar>>,
size: AtomicUsize,
data_size: AtomicUsize,
num_blobs: AtomicUsize,
}
impl InMemoryBlobStoreInner {
fn add_size(&self, add: usize) {
self.size.fetch_add(add, std::sync::atomic::Ordering::Relaxed);
self.data_size.fetch_add(add, std::sync::atomic::Ordering::Relaxed);
}
fn sub_size(&self, sub: usize) {
self.size.fetch_sub(sub, std::sync::atomic::Ordering::Relaxed);
self.data_size.fetch_sub(sub, std::sync::atomic::Ordering::Relaxed);
}
fn update_size(&self, add: usize, sub: usize) {
@@ -35,6 +36,10 @@ impl InMemoryBlobStoreInner {
self.sub_size(sub - add);
}
}
fn update_len(&self, len: usize) {
self.num_blobs.store(len, std::sync::atomic::Ordering::Relaxed);
}
}
impl BlobStore for InMemoryBlobStore {
@@ -42,6 +47,7 @@ impl BlobStore for InMemoryBlobStore {
let mut store = self.inner.store.write();
let (add, sub) = insert_size(&mut store, tx, data);
self.inner.update_size(add, sub);
self.inner.update_len(store.len());
Ok(())
}
@@ -58,6 +64,7 @@ impl BlobStore for InMemoryBlobStore {
total_sub += sub;
}
self.inner.update_size(total_add, total_sub);
self.inner.update_len(store.len());
Ok(())
}
@@ -65,6 +72,7 @@ impl BlobStore for InMemoryBlobStore {
let mut store = self.inner.store.write();
let sub = remove_size(&mut store, &tx);
self.inner.sub_size(sub);
self.inner.update_len(store.len());
Ok(())
}
@@ -78,6 +86,7 @@ impl BlobStore for InMemoryBlobStore {
total_sub += remove_size(&mut store, &tx);
}
self.inner.sub_size(total_sub);
self.inner.update_len(store.len());
Ok(())
}
@@ -103,7 +112,11 @@ impl BlobStore for InMemoryBlobStore {
}
fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size.load(std::sync::atomic::Ordering::Relaxed))
Some(self.inner.data_size.load(std::sync::atomic::Ordering::Relaxed))
}
fn blobs_len(&self) -> usize {
self.inner.num_blobs.load(std::sync::atomic::Ordering::Relaxed)
}
}

View File

@@ -1,14 +1,14 @@
//! Storage for blob data of EIP4844 transactions.
use reth_primitives::{BlobTransactionSidecar, H256};
use std::fmt;
mod maintain;
mod mem;
mod noop;
pub use maintain::BlobStoreMaintainer;
pub use mem::InMemoryBlobStore;
pub use noop::NoopBlobStore;
use reth_primitives::{BlobTransactionSidecar, H256};
use std::fmt;
pub use tracker::BlobStoreCanonTracker;
mod mem;
mod noop;
mod tracker;
/// A blob store that can be used to store blob data of EIP4844 transactions.
///
@@ -43,6 +43,9 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
/// Data size of all transactions in the blob store.
fn data_size_hint(&self) -> Option<usize>;
/// How many blobs are in the blob store.
fn blobs_len(&self) -> usize;
}
/// Error variants that can occur when interacting with a blob store.

View File

@@ -37,4 +37,8 @@ impl BlobStore for NoopBlobStore {
fn data_size_hint(&self) -> Option<usize> {
Some(0)
}
fn blobs_len(&self) -> usize {
0
}
}

View File

@@ -0,0 +1,95 @@
//! Support for maintaining the blob pool.
use reth_primitives::{BlockNumber, H256};
use reth_provider::chain::ChainBlocks;
use std::collections::BTreeMap;
/// The type that is used to track canonical blob transactions.
#[derive(Debug, Default, Eq, PartialEq)]
pub struct BlobStoreCanonTracker {
/// Keeps track of the blob transactions included in blocks.
blob_txs_in_blocks: BTreeMap<BlockNumber, Vec<H256>>,
}
impl BlobStoreCanonTracker {
/// Adds a block to the blob store maintenance.
pub(crate) fn add_block(
&mut self,
block_number: BlockNumber,
blob_txs: impl IntoIterator<Item = H256>,
) {
self.blob_txs_in_blocks.insert(block_number, blob_txs.into_iter().collect());
}
/// Adds all blocks to the tracked list of blocks.
pub(crate) fn add_blocks(
&mut self,
blocks: impl IntoIterator<Item = (BlockNumber, impl IntoIterator<Item = H256>)>,
) {
for (block_number, blob_txs) in blocks {
self.add_block(block_number, blob_txs);
}
}
/// Adds all blob transactions from the given chain to the tracker.
pub(crate) fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) {
let blob_txs = blocks.iter().map(|(num, blocks)| {
let iter =
blocks.body.iter().filter(|tx| tx.transaction.is_eip4844()).map(|tx| tx.hash);
(*num, iter)
});
self.add_blocks(blob_txs);
}
/// Invoked when a block is finalized.
#[allow(unused)]
pub(crate) fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates {
let mut finalized = Vec::new();
while let Some(entry) = self.blob_txs_in_blocks.first_entry() {
if *entry.key() <= number {
finalized.extend(entry.remove_entry().1);
} else {
break
}
}
if finalized.is_empty() {
BlobStoreUpdates::None
} else {
BlobStoreUpdates::Finalized(finalized)
}
}
}
/// Updates that should be applied to the blob store.
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum BlobStoreUpdates {
/// No updates.
None,
/// Delete the given finalized transactions from the blob store.
Finalized(Vec<H256>),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_finalized_tracker() {
let mut tracker = BlobStoreCanonTracker::default();
let block1 = vec![H256::random()];
let block2 = vec![H256::random()];
let block3 = vec![H256::random()];
tracker.add_block(1, block1.clone());
tracker.add_block(2, block2.clone());
tracker.add_block(3, block3.clone());
assert_eq!(tracker.on_finalized_block(0), BlobStoreUpdates::None);
assert_eq!(tracker.on_finalized_block(1), BlobStoreUpdates::Finalized(block1));
assert_eq!(
tracker.on_finalized_block(3),
BlobStoreUpdates::Finalized(block2.into_iter().chain(block3).collect::<Vec<_>>())
);
}
}

View File

@@ -493,6 +493,14 @@ where
fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
self.pool.update_accounts(accounts);
}
fn delete_blob(&self, tx: TxHash) {
self.pool.delete_blob(tx)
}
fn delete_blobs(&self, txs: Vec<TxHash>) {
self.pool.delete_blobs(txs)
}
}
impl<V, T: TransactionOrdering, S> Clone for Pool<V, T, S> {

View File

@@ -1,6 +1,7 @@
//! Support for maintaining the state of the transaction pool
use crate::{
blobstore::BlobStoreCanonTracker,
metrics::MaintainPoolMetrics,
traits::{CanonicalStateUpdate, ChangedAccount, TransactionPoolExt},
BlockInfo, TransactionPool,
@@ -93,6 +94,9 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
pool.set_block_info(info);
}
// keeps track of mined blob transaction so we can clean finalized transactions
let mut blob_store_tracker = BlobStoreCanonTracker::default();
// keeps track of any dirty accounts that we know of are out of sync with the pool
let mut dirty_addresses = HashSet::new();
@@ -283,6 +287,10 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
// Note: we no longer know if the tx was local or external
metrics.inc_reinserted_transactions(pruned_old_transactions.len());
let _ = pool.add_external_transactions(pruned_old_transactions).await;
// keep track of mined blob transactions
// TODO(mattsse): handle reorged transactions
blob_store_tracker.add_new_chain_blocks(&new_blocks);
}
CanonStateNotification::Commit { new } => {
let (blocks, state) = new.inner();
@@ -314,6 +322,10 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
pending_basefee: pending_block_base_fee,
};
pool.set_block_info(info);
// keep track of mined blob transactions
blob_store_tracker.add_new_chain_blocks(&blocks);
continue
}
@@ -344,6 +356,9 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
timestamp: tip.timestamp,
};
pool.on_canonical_state_change(update);
// keep track of mined blob transactions
blob_store_tracker.add_new_chain_blocks(&blocks);
}
}
}

View File

@@ -46,6 +46,10 @@ pub struct BlobStoreMetrics {
pub(crate) blobstore_failed_inserts: Counter,
/// Number of failed deletes into the blobstore
pub(crate) blobstore_failed_deletes: Counter,
/// The number of bytes the blobs in the blobstore take up
pub(crate) blobstore_byte_size: Gauge,
/// How many blobs are currently in the blobstore
pub(crate) blobstore_entries: Gauge,
}
/// Transaction pool maintenance metrics

View File

@@ -659,14 +659,33 @@ where
warn!(target: "txpool", ?err, "[{:?}] failed to insert blob", hash);
self.blob_store_metrics.blobstore_failed_inserts.increment(1);
}
self.update_blob_store_metrics();
}
/// Delete a blob from the blob store
fn delete_blob(&self, blob: TxHash) {
pub(crate) fn delete_blob(&self, blob: TxHash) {
if let Err(err) = self.blob_store.delete(blob) {
warn!(target: "txpool", ?err, "[{:?}] failed to delete blobs", blob);
self.blob_store_metrics.blobstore_failed_deletes.increment(1);
}
self.update_blob_store_metrics();
}
/// Delete all blobs from the blob store
pub(crate) fn delete_blobs(&self, txs: Vec<TxHash>) {
let num = txs.len();
if let Err(err) = self.blob_store.delete_all(txs) {
warn!(target: "txpool", ?err,?num, "failed to delete blobs");
self.blob_store_metrics.blobstore_failed_deletes.increment(num as u64);
}
self.update_blob_store_metrics();
}
fn update_blob_store_metrics(&self) {
if let Some(data_size) = self.blob_store.data_size_hint() {
self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
}
self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
}
}

View File

@@ -299,6 +299,12 @@ pub trait TransactionPoolExt: TransactionPool {
/// Updates the accounts in the pool
fn update_accounts(&self, accounts: Vec<ChangedAccount>);
/// Deletes the blob sidecar for the given transaction from the blob store
fn delete_blob(&self, tx: H256);
/// Deletes multiple blob sidecars from the blob store
fn delete_blobs(&self, txs: Vec<H256>);
}
/// Determines what kind of new pending transactions should be emitted by a stream of pending