mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
feat: add TransactionPool::get_pooled_transaction_elements (#4317)
This commit is contained in:
@@ -288,6 +288,24 @@ pub struct BlobTransaction {
|
||||
}
|
||||
|
||||
impl BlobTransaction {
|
||||
/// Constructs a new [BlobTransaction] from a [TransactionSigned] and a
|
||||
/// [BlobTransactionSidecar].
|
||||
///
|
||||
/// Returns an error if the signed transaction is not [TxEip4844]
|
||||
pub fn try_from_signed(
|
||||
tx: TransactionSigned,
|
||||
sidecar: BlobTransactionSidecar,
|
||||
) -> Result<Self, (TransactionSigned, BlobTransactionSidecar)> {
|
||||
let TransactionSigned { transaction, signature, hash } = tx;
|
||||
match transaction {
|
||||
Transaction::Eip4844(transaction) => Ok(Self { hash, transaction, signature, sidecar }),
|
||||
transaction => {
|
||||
let tx = TransactionSigned { transaction, signature, hash };
|
||||
Err((tx, sidecar))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Verifies that the transaction's blob data, commitments, and proofs are all valid.
|
||||
///
|
||||
/// Takes as input the [KzgSettings], which should contain the the parameters derived from the
|
||||
|
||||
@@ -156,7 +156,7 @@
|
||||
//! - `test-utils`: Export utilities for testing
|
||||
use crate::pool::PoolInner;
|
||||
use aquamarine as _;
|
||||
use reth_primitives::{Address, BlobTransactionSidecar, TxHash, U256};
|
||||
use reth_primitives::{Address, BlobTransactionSidecar, PooledTransactionsElement, TxHash, U256};
|
||||
use reth_provider::StateProviderFactory;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
@@ -165,7 +165,10 @@ use std::{
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
use crate::blobstore::{BlobStore, BlobStoreError};
|
||||
use crate::{
|
||||
blobstore::{BlobStore, BlobStoreError},
|
||||
traits::GetPooledTransactionLimit,
|
||||
};
|
||||
pub use crate::{
|
||||
config::{
|
||||
PoolConfig, PriceBumpConfig, SubPoolLimit, DEFAULT_PRICE_BUMP, REPLACE_BLOB_PRICE_BUMP,
|
||||
@@ -403,6 +406,14 @@ where
|
||||
self.pooled_transactions().into_iter().take(max).collect()
|
||||
}
|
||||
|
||||
fn get_pooled_transaction_elements(
|
||||
&self,
|
||||
tx_hashes: Vec<TxHash>,
|
||||
limit: GetPooledTransactionLimit,
|
||||
) -> Vec<PooledTransactionsElement> {
|
||||
self.pool.get_pooled_transaction_elements(tx_hashes, limit)
|
||||
}
|
||||
|
||||
fn best_transactions(
|
||||
&self,
|
||||
) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
|
||||
|
||||
@@ -4,13 +4,16 @@
|
||||
//! to be generic over it.
|
||||
|
||||
use crate::{
|
||||
blobstore::BlobStoreError, error::PoolError, traits::PendingTransactionListenerKind,
|
||||
validate::ValidTransaction, AllPoolTransactions, AllTransactionsEvents, BestTransactions,
|
||||
BlockInfo, EthPooledTransaction, NewTransactionEvent, PoolResult, PoolSize, PoolTransaction,
|
||||
PropagatedTransactions, TransactionEvents, TransactionOrigin, TransactionPool,
|
||||
TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction,
|
||||
blobstore::BlobStoreError,
|
||||
error::PoolError,
|
||||
traits::{GetPooledTransactionLimit, PendingTransactionListenerKind},
|
||||
validate::ValidTransaction,
|
||||
AllPoolTransactions, AllTransactionsEvents, BestTransactions, BlockInfo, EthPooledTransaction,
|
||||
NewTransactionEvent, PoolResult, PoolSize, PoolTransaction, PropagatedTransactions,
|
||||
TransactionEvents, TransactionOrigin, TransactionPool, TransactionValidationOutcome,
|
||||
TransactionValidator, ValidPoolTransaction,
|
||||
};
|
||||
use reth_primitives::{Address, BlobTransactionSidecar, TxHash};
|
||||
use reth_primitives::{Address, BlobTransactionSidecar, PooledTransactionsElement, TxHash};
|
||||
use std::{collections::HashSet, marker::PhantomData, sync::Arc};
|
||||
use tokio::sync::{mpsc, mpsc::Receiver};
|
||||
|
||||
@@ -108,6 +111,14 @@ impl TransactionPool for NoopTransactionPool {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn get_pooled_transaction_elements(
|
||||
&self,
|
||||
_tx_hashes: Vec<TxHash>,
|
||||
_limit: GetPooledTransactionLimit,
|
||||
) -> Vec<PooledTransactionsElement> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn best_transactions(
|
||||
&self,
|
||||
) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
|
||||
|
||||
@@ -82,7 +82,10 @@ use crate::{
|
||||
};
|
||||
use best::BestTransactions;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use reth_primitives::{Address, BlobTransactionSidecar, TxHash, H256};
|
||||
use reth_primitives::{
|
||||
Address, BlobTransaction, BlobTransactionSidecar, IntoRecoveredTransaction,
|
||||
PooledTransactionsElement, TransactionSigned, TxHash, H256,
|
||||
};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
fmt,
|
||||
@@ -97,10 +100,14 @@ pub use events::{FullTransactionEvent, TransactionEvent};
|
||||
|
||||
mod listener;
|
||||
use crate::{
|
||||
blobstore::BlobStore, metrics::BlobStoreMetrics, pool::txpool::UpdateOutcome,
|
||||
traits::PendingTransactionListenerKind, validate::ValidTransaction,
|
||||
blobstore::BlobStore,
|
||||
metrics::BlobStoreMetrics,
|
||||
pool::txpool::UpdateOutcome,
|
||||
traits::{GetPooledTransactionLimit, PendingTransactionListenerKind},
|
||||
validate::ValidTransaction,
|
||||
};
|
||||
pub use listener::{AllTransactionsEvents, TransactionEvents};
|
||||
use reth_rlp::Encodable;
|
||||
|
||||
mod best;
|
||||
mod parked;
|
||||
@@ -269,6 +276,50 @@ where
|
||||
pool.all().transactions_iter().filter(|tx| tx.propagate).collect()
|
||||
}
|
||||
|
||||
/// Returns the [BlobTransaction] for the given transaction if the sidecar exists.
|
||||
///
|
||||
/// Caution: this assumes the given transaction is eip-4844
|
||||
fn get_blob_transaction(&self, transaction: TransactionSigned) -> Option<BlobTransaction> {
|
||||
if let Ok(Some(sidecar)) = self.blob_store.get(transaction.hash()) {
|
||||
if let Ok(blob) = BlobTransaction::try_from_signed(transaction, sidecar) {
|
||||
return Some(blob)
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns converted [PooledTransactionsElement] for the given transaction hashes.
|
||||
pub(crate) fn get_pooled_transaction_elements(
|
||||
&self,
|
||||
tx_hashes: Vec<TxHash>,
|
||||
limit: GetPooledTransactionLimit,
|
||||
) -> Vec<PooledTransactionsElement> {
|
||||
let transactions = self.get_all(tx_hashes);
|
||||
let mut elements = Vec::with_capacity(transactions.len());
|
||||
let mut size = 0;
|
||||
for transaction in transactions {
|
||||
let tx = transaction.to_recovered_transaction().into_signed();
|
||||
let pooled = if tx.is_eip4844() {
|
||||
if let Some(blob) = self.get_blob_transaction(tx) {
|
||||
PooledTransactionsElement::BlobTransaction(blob)
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
PooledTransactionsElement::from(tx)
|
||||
};
|
||||
|
||||
size += pooled.length();
|
||||
elements.push(pooled);
|
||||
|
||||
if limit.exceeds(size) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
elements
|
||||
}
|
||||
|
||||
/// Updates the entire pool after a new block was executed.
|
||||
pub(crate) fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_>) {
|
||||
trace!(target: "txpool", %update, "updating pool on canonical state change");
|
||||
|
||||
@@ -171,8 +171,13 @@ pub trait TransactionPool: Send + Sync + Clone {
|
||||
|
||||
/// Returns the _full_ transaction objects all transactions in the pool.
|
||||
///
|
||||
/// This is intended to be used by the network for the initial exchange of pooled transaction
|
||||
/// _hashes_
|
||||
///
|
||||
/// Note: This returns a `Vec` but should guarantee that all transactions are unique.
|
||||
///
|
||||
/// Caution: In case of blob transactions, this does not include the sidecar.
|
||||
///
|
||||
/// Consumer: P2P
|
||||
fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
|
||||
|
||||
@@ -184,6 +189,21 @@ pub trait TransactionPool: Send + Sync + Clone {
|
||||
max: usize,
|
||||
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
|
||||
|
||||
/// Returns converted [PooledTransactionsElement] for the given transaction hashes.
|
||||
///
|
||||
/// This adheres to the expected behavior of [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
|
||||
/// The transactions must be in same order as in the request, but it is OK to skip transactions
|
||||
/// which are not available.
|
||||
///
|
||||
/// If the transaction is a blob transaction, the sidecar will be included.
|
||||
///
|
||||
/// Consumer: P2P
|
||||
fn get_pooled_transaction_elements(
|
||||
&self,
|
||||
tx_hashes: Vec<TxHash>,
|
||||
limit: GetPooledTransactionLimit,
|
||||
) -> Vec<PooledTransactionsElement>;
|
||||
|
||||
/// Returns an iterator that yields transactions that are ready for block production.
|
||||
///
|
||||
/// Consumer: Block production
|
||||
@@ -249,10 +269,7 @@ pub trait TransactionPool: Send + Sync + Clone {
|
||||
|
||||
/// Returns all transactions objects for the given hashes.
|
||||
///
|
||||
/// TODO(mattsse): this will no longer be accurate and we need a new function specifically for
|
||||
/// pooled txs This adheres to the expected behavior of [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
|
||||
/// The transactions must be in same order as in the request, but it is OK to skip transactions
|
||||
/// which are not available.
|
||||
/// Caution: This in case of blob transactions, this does not include the sidecar.
|
||||
fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
|
||||
|
||||
/// Notify the pool about transactions that are propagated to peers.
|
||||
@@ -848,6 +865,26 @@ pub struct BlockInfo {
|
||||
pub pending_basefee: u64,
|
||||
}
|
||||
|
||||
/// The limit to enforce for [TransactionPool::get_pooled_transaction_elements].
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum GetPooledTransactionLimit {
|
||||
/// No limit, return all transactions.
|
||||
None,
|
||||
/// Enforce a size limit on the returned transactions, for example 2MB
|
||||
SizeSoftLimit(usize),
|
||||
}
|
||||
|
||||
impl GetPooledTransactionLimit {
|
||||
/// Returns true if the given size exceeds the limit.
|
||||
#[inline]
|
||||
pub fn exceeds(&self, size: usize) -> bool {
|
||||
match self {
|
||||
GetPooledTransactionLimit::None => false,
|
||||
GetPooledTransactionLimit::SizeSoftLimit(limit) => size > *limit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A Stream that yields full transactions the subpool
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
#[derive(Debug)]
|
||||
|
||||
Reference in New Issue
Block a user