From 4763aad11e17e8f4ac8d4fa78cd6a55cb6746925 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 26 Apr 2023 11:10:02 +0200 Subject: [PATCH] feat(txpool): add pool manage task (#2298) Co-authored-by: Georgios Konstantopoulos --- Cargo.lock | 2 + crates/storage/provider/src/chain.rs | 41 ++- crates/storage/provider/src/traits/account.rs | 2 + crates/storage/provider/src/traits/chain.rs | 2 + crates/transaction-pool/Cargo.toml | 2 + crates/transaction-pool/src/lib.rs | 16 +- crates/transaction-pool/src/maintain.rs | 284 ++++++++++++++++++ crates/transaction-pool/src/pool/mod.rs | 24 +- crates/transaction-pool/src/pool/txpool.rs | 47 ++- crates/transaction-pool/src/traits.rs | 83 ++++- 10 files changed, 450 insertions(+), 53 deletions(-) create mode 100644 crates/transaction-pool/src/maintain.rs diff --git a/Cargo.lock b/Cargo.lock index a2ee6c051d..3cd31ff30d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5447,6 +5447,8 @@ dependencies = [ "parking_lot 0.12.1", "paste", "rand 0.8.5", + "reth-consensus-common", + "reth-interfaces", "reth-metrics-derive", "reth-primitives", "reth-provider", diff --git a/crates/storage/provider/src/chain.rs b/crates/storage/provider/src/chain.rs index 765032672f..20a407bf20 100644 --- a/crates/storage/provider/src/chain.rs +++ b/crates/storage/provider/src/chain.rs @@ -4,9 +4,9 @@ use crate::PostState; use reth_interfaces::{executor::Error as ExecError, Error}; use reth_primitives::{ BlockHash, BlockNumHash, BlockNumber, ForkBlock, Receipt, SealedBlock, SealedBlockWithSenders, - TxHash, + TransactionSigned, TxHash, }; -use std::collections::BTreeMap; +use std::{borrow::Cow, collections::BTreeMap}; /// A chain of blocks and their final state. /// @@ -64,8 +64,14 @@ impl Chain { /// Destructure the chain into its inner components, the blocks and the state at the tip of the /// chain. - pub fn into_inner(self) -> (ChainBlocks, PostState) { - (ChainBlocks { blocks: self.blocks }, self.state) + pub fn into_inner(self) -> (ChainBlocks<'static>, PostState) { + (ChainBlocks { blocks: Cow::Owned(self.blocks) }, self.state) + } + + /// Destructure the chain into its inner components, the blocks and the state at the tip of the + /// chain. + pub fn inner(&self) -> (ChainBlocks<'_>, &PostState) { + (ChainBlocks { blocks: Cow::Borrowed(&self.blocks) }, &self.state) } /// Get the block at which this chain forked. @@ -202,16 +208,16 @@ impl Chain { /// All blocks in the chain #[derive(Clone, Debug, Default, PartialEq, Eq)] -pub struct ChainBlocks { - blocks: BTreeMap, +pub struct ChainBlocks<'a> { + blocks: Cow<'a, BTreeMap>, } -impl ChainBlocks { +impl<'a> ChainBlocks<'a> { /// Creates a consuming iterator over all blocks in the chain with increasing block number. /// /// Note: this always yields at least one block. pub fn into_blocks(self) -> impl Iterator { - self.blocks.into_values() + self.blocks.into_owned().into_values() } /// Creates an iterator over all blocks in the chain with increasing block number. @@ -227,14 +233,29 @@ impl ChainBlocks { pub fn tip(&self) -> &SealedBlockWithSenders { self.blocks.last_key_value().expect("Chain should have at least one block").1 } + + /// Get the _first_ block of the chain. + /// + /// # Note + /// + /// Chains always have at least one block. + pub fn first(&self) -> &SealedBlockWithSenders { + self.blocks.first_key_value().expect("Chain should have at least one block").1 + } + + /// Returns an iterator over all transactions in the chain. + pub fn transactions(&self) -> impl Iterator + '_ { + self.blocks.values().flat_map(|block| block.body.iter()) + } } -impl IntoIterator for ChainBlocks { +impl<'a> IntoIterator for ChainBlocks<'a> { type Item = (BlockNumber, SealedBlockWithSenders); type IntoIter = std::collections::btree_map::IntoIter; fn into_iter(self) -> Self::IntoIter { - self.blocks.into_iter() + #[allow(clippy::unnecessary_to_owned)] + self.blocks.into_owned().into_iter() } } diff --git a/crates/storage/provider/src/traits/account.rs b/crates/storage/provider/src/traits/account.rs index a62b3e8f1a..5660ebb079 100644 --- a/crates/storage/provider/src/traits/account.rs +++ b/crates/storage/provider/src/traits/account.rs @@ -6,5 +6,7 @@ use reth_primitives::{Account, Address}; #[auto_impl(&, Arc, Box)] pub trait AccountProvider: Send + Sync { /// Get basic account information. + /// + /// Returns `None` if the account doesn't exist. fn basic_account(&self, address: Address) -> Result>; } diff --git a/crates/storage/provider/src/traits/chain.rs b/crates/storage/provider/src/traits/chain.rs index 508d1f473b..03c9f27d49 100644 --- a/crates/storage/provider/src/traits/chain.rs +++ b/crates/storage/provider/src/traits/chain.rs @@ -28,6 +28,8 @@ pub enum CanonStateNotification { /// Chain reorgs and both old and new chain are returned. Reorg { old: Arc, new: Arc }, /// Chain got reverted without reorg and only old chain is returned. + /// + /// This reverts the chain's tip to the first block of the chain. Revert { old: Arc }, /// Chain got extended without reorg and only new chain is returned. Commit { new: Arc }, diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index 2c0f2bf6bd..30b2faa8ab 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -18,8 +18,10 @@ normal = [ [dependencies] # reth +reth-consensus-common = { path = "../consensus/common" } reth-primitives = { path = "../primitives" } reth-provider = { path = "../storage/provider" } +reth-interfaces = { path = "../interfaces" } reth-rlp = { path = "../rlp" } # async/futures diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 1e9bb37602..af85ea2048 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -83,8 +83,9 @@ pub use crate::{ config::PoolConfig, ordering::{CostOrdering, TransactionOrdering}, traits::{ - BestTransactions, OnNewBlockEvent, PoolTransaction, PooledTransaction, PropagateKind, - PropagatedTransactions, TransactionOrigin, TransactionPool, + BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount, PoolTransaction, + PooledTransaction, PropagateKind, PropagatedTransactions, TransactionOrigin, + TransactionPool, }, validate::{ EthTransactionValidator, TransactionValidationOutcome, TransactionValidator, @@ -104,6 +105,7 @@ use tokio::sync::mpsc::Receiver; mod config; pub mod error; mod identifier; +pub mod maintain; pub mod metrics; mod ordering; pub mod pool; @@ -225,12 +227,16 @@ where { type Transaction = T::Transaction; - fn status(&self) -> PoolSize { + fn pool_size(&self) -> PoolSize { self.pool.size() } - fn on_new_block(&self, event: OnNewBlockEvent) { - self.pool.on_new_block(event); + fn block_info(&self) -> BlockInfo { + self.pool.block_info() + } + + fn on_canonical_state_change(&self, update: CanonicalStateUpdate) { + self.pool.on_canonical_state_change(update); } async fn add_transaction( diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs new file mode 100644 index 0000000000..5bf78a1c69 --- /dev/null +++ b/crates/transaction-pool/src/maintain.rs @@ -0,0 +1,284 @@ +//! Support for maintaining the state of the transaction pool + +use crate::{ + traits::{CanonicalStateUpdate, ChangedAccount}, + Pool, TransactionOrdering, TransactionPool, TransactionValidator, +}; +use futures_util::{Stream, StreamExt}; +use reth_consensus_common::validation::calculate_next_block_base_fee; +use reth_primitives::{Address, BlockHash, FromRecoveredTransaction}; +use reth_provider::{BlockProvider, CanonStateNotification, PostState, StateProviderFactory}; +use std::{ + borrow::Borrow, + collections::HashSet, + hash::{Hash, Hasher}, +}; +use tracing::warn; + +/// Maintains the state of the transaction pool by handling new blocks and reorgs. +/// +/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly +pub async fn maintain_transaction_pool( + client: Client, + pool: Pool, + mut events: St, +) where + Client: StateProviderFactory + BlockProvider + 'static, + V: TransactionValidator, + T: TransactionOrdering::Transaction>, + St: Stream + Unpin + 'static, +{ + // TODO set current head for the pool + + // keeps track of any dirty accounts that we failed to fetch the state for and need to retry + let mut dirty = HashSet::new(); + + // Listen for new chain events and derive the update action for the pool + while let Some(event) = events.next().await { + let pool_info = pool.block_info(); + + // TODO check dirty accounts from time to time + + match event { + CanonStateNotification::Reorg { old, new } => { + let (old_blocks, old_state) = old.inner(); + let (new_blocks, new_state) = new.inner(); + let new_tip = new_blocks.tip(); + + // base fee for the next block: `new_tip+1` + let pending_block_base_fee = calculate_next_block_base_fee( + new_tip.gas_used, + new_tip.gas_limit, + new_tip.base_fee_per_gas.unwrap_or_default(), + ) as u128; + + // we know all changed account in the new chain + let new_changed_accounts: HashSet<_> = + changed_accounts_iter(new_state).map(ChangedAccountEntry).collect(); + + // find all accounts that were changed in the old chain but _not_ in the new chain + let missing_changed_acc = old_state + .accounts() + .keys() + .filter(|addr| !new_changed_accounts.contains(*addr)) + .copied(); + + // for these we need to fetch the nonce+balance from the db at the new tip + let mut changed_accounts = + match load_accounts(&client, new_tip.hash, missing_changed_acc) { + Ok(LoadedAccounts { accounts, failed_to_load }) => { + // extend accounts we failed to load from database + dirty.extend(failed_to_load); + + accounts + } + Err(err) => { + let (addresses, err) = *err; + warn!( + ?err, + "failed to load missing changed accounts at new tip: {:?}", + new_tip.hash + ); + dirty.extend(addresses); + vec![] + } + }; + + // also include all accounts from new chain + // we can use extend here because they are unique + changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0)); + + // all transactions mined in the new chain + let new_mined_transactions: HashSet<_> = + new_blocks.transactions().map(|tx| tx.hash).collect(); + + // update the pool then reinject the pruned transactions + // find all transactions that were mined in the old chain but not in the new chain + let pruned_old_transactions = old_blocks + .transactions() + .filter(|tx| !new_mined_transactions.contains(&tx.hash)) + .filter_map(|tx| tx.clone().into_ecrecovered()) + .map(|tx| { + ::Transaction::from_recovered_transaction(tx) + }) + .collect(); + + // update the pool first + let update = CanonicalStateUpdate { + hash: new_tip.hash, + number: new_tip.number, + pending_block_base_fee, + changed_accounts, + // all transactions mined in the new chain need to be removed from the pool + mined_transactions: new_mined_transactions.into_iter().collect(), + }; + pool.on_canonical_state_change(update); + + // all transactions that were mined in the old chain but not in the new chain need + // to be re-injected + // + // Note: we no longer know if the tx was local or external + let _ = pool.add_external_transactions(pruned_old_transactions).await; + // TODO: metrics + } + CanonStateNotification::Revert { old } => { + // this similar to the inverse of a commit where we need to insert the transactions + // back into the pool and update the pool's state accordingly + + let (blocks, state) = old.inner(); + let first_block = blocks.first(); + if first_block.hash == pool_info.last_seen_block_hash { + // nothing to update + continue + } + + // base fee for the next block: `first_block+1` + let pending_block_base_fee = calculate_next_block_base_fee( + first_block.gas_used, + first_block.gas_limit, + first_block.base_fee_per_gas.unwrap_or_default(), + ) as u128; + let changed_accounts = changed_accounts_iter(state).collect(); + let update = CanonicalStateUpdate { + hash: first_block.hash, + number: first_block.number, + pending_block_base_fee, + changed_accounts, + // no tx to prune in the reverted chain + mined_transactions: vec![], + }; + pool.on_canonical_state_change(update); + + let pruned_old_transactions = blocks + .transactions() + .filter_map(|tx| tx.clone().into_ecrecovered()) + .map(|tx| { + ::Transaction::from_recovered_transaction(tx) + }) + .collect(); + + // all transactions that were mined in the old chain need to be re-injected + // + // Note: we no longer know if the tx was local or external + let _ = pool.add_external_transactions(pruned_old_transactions).await; + // TODO: metrics + } + CanonStateNotification::Commit { new } => { + // TODO skip large commits? + + let (blocks, state) = new.inner(); + let tip = blocks.tip(); + // base fee for the next block: `tip+1` + let pending_block_base_fee = calculate_next_block_base_fee( + tip.gas_used, + tip.gas_limit, + tip.base_fee_per_gas.unwrap_or_default(), + ) as u128; + + let first_block = blocks.first(); + // check if the range of the commit is canonical + if first_block.parent_hash == pool_info.last_seen_block_hash { + let changed_accounts = changed_accounts_iter(state).collect(); + let mined_transactions = blocks.transactions().map(|tx| tx.hash).collect(); + // Canonical update + let update = CanonicalStateUpdate { + hash: tip.hash, + number: tip.number, + pending_block_base_fee, + changed_accounts, + mined_transactions, + }; + pool.on_canonical_state_change(update); + } else { + // TODO is this even reachable, because all commits are canonical? + // this a canonical + } + } + } + } +} + +/// A unique ChangedAccount identified by its address that can be used for deduplication +#[derive(Eq)] +struct ChangedAccountEntry(ChangedAccount); + +impl PartialEq for ChangedAccountEntry { + fn eq(&self, other: &Self) -> bool { + self.0.address == other.0.address + } +} + +impl Hash for ChangedAccountEntry { + fn hash(&self, state: &mut H) { + self.0.address.hash(state); + } +} + +impl Borrow
for ChangedAccountEntry { + fn borrow(&self) -> &Address { + &self.0.address + } +} + +#[derive(Default)] +struct LoadedAccounts { + /// All accounts that were loaded + accounts: Vec, + /// All accounts that failed to load + failed_to_load: Vec
, +} + +/// Loads all accounts at the given state +/// +/// Returns an error with all given addresses if the state is not available. +fn load_accounts( + client: &Client, + at: BlockHash, + addresses: I, +) -> Result, reth_interfaces::Error)>> +where + I: Iterator, + + Client: StateProviderFactory, +{ + let mut res = LoadedAccounts::default(); + let state = match client.history_by_block_hash(at) { + Ok(state) => state, + Err(err) => return Err(Box::new((addresses.collect(), err))), + }; + for addr in addresses { + if let Ok(maybe_acc) = state.basic_account(addr) { + let acc = maybe_acc + .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance }) + .unwrap_or_else(|| ChangedAccount::empty(addr)); + res.accounts.push(acc) + } else { + // failed to load account. + res.failed_to_load.push(addr); + } + } + Ok(res) +} + +fn changed_accounts_iter(state: &PostState) -> impl Iterator + '_ { + state.accounts().iter().filter_map(|(addr, acc)| acc.map(|acc| (addr, acc))).map( + |(address, acc)| ChangedAccount { + address: *address, + nonce: acc.nonce, + balance: acc.balance, + }, + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn changed_acc_entry() { + let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random())); + let mut copy = changed_acc.0; + copy.nonce = 10; + assert!(changed_acc.eq(&ChangedAccountEntry(copy))); + } +} diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 7aca2106e1..ace1465746 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -72,10 +72,11 @@ use crate::{ identifier::{SenderId, SenderIdentifiers, TransactionId}, pool::{listener::PoolEventBroadcast, state::SubPool, txpool::TxPool}, traits::{ - NewTransactionEvent, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin, + BlockInfo, NewTransactionEvent, PoolSize, PoolTransaction, PropagatedTransactions, + TransactionOrigin, }, validate::{TransactionValidationOutcome, ValidPoolTransaction}, - OnNewBlockEvent, PoolConfig, TransactionOrdering, TransactionValidator, + CanonicalStateUpdate, PoolConfig, TransactionOrdering, TransactionValidator, }; use best::BestTransactions; pub use events::TransactionEvent; @@ -138,6 +139,11 @@ where self.pool.read().size() } + /// Returns the currently tracked block + pub(crate) fn block_info(&self) -> BlockInfo { + self.pool.read().block_info() + } + /// Returns the internal `SenderId` for this address pub(crate) fn get_sender_id(&self, addr: Address) -> SenderId { self.identifiers.write().sender_id_or_create(addr) @@ -183,9 +189,9 @@ where } /// Updates the entire pool after a new block was executed. - pub(crate) fn on_new_block(&self, block: OnNewBlockEvent) { - let outcome = self.pool.write().on_new_block(block); - self.notify_on_new_block(outcome); + pub(crate) fn on_canonical_state_change(&self, update: CanonicalStateUpdate) { + let outcome = self.pool.write().on_canonical_state_change(update); + self.notify_on_new_state(outcome); } /// Add a single validated transaction into the pool. @@ -313,8 +319,8 @@ where } /// Notifies transaction listeners about changes after a block was processed. - fn notify_on_new_block(&self, outcome: OnNewBlockOutcome) { - let OnNewBlockOutcome { mined, promoted, discarded, block_hash } = outcome; + fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome) { + let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome; let mut listener = self.event_listener.write(); @@ -486,9 +492,9 @@ impl AddedTransaction { } } -/// Contains all state changes after a [`OnNewBlockEvent`] was processed +/// Contains all state changes after a [`CanonicalStateUpdate`] was processed #[derive(Debug)] -pub(crate) struct OnNewBlockOutcome { +pub(crate) struct OnNewCanonicalStateOutcome { /// Hash of the block. pub(crate) block_hash: H256, /// All mined transactions. diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 5453227dd7..7728252c93 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -10,11 +10,11 @@ use crate::{ pending::PendingPool, state::{SubPool, TxState}, update::{Destination, PoolUpdate}, - AddedPendingTransaction, AddedTransaction, OnNewBlockOutcome, + AddedPendingTransaction, AddedTransaction, OnNewCanonicalStateOutcome, }, - traits::{PoolSize, StateDiff}, - OnNewBlockEvent, PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, - ValidPoolTransaction, U256, + traits::{BlockInfo, PoolSize}, + CanonicalStateUpdate, ChangedAccount, PoolConfig, PoolResult, PoolTransaction, + TransactionOrdering, ValidPoolTransaction, U256, }; use fnv::FnvHashMap; use reth_primitives::{constants::MIN_PROTOCOL_BASE_FEE, TxHash, H256}; @@ -120,6 +120,15 @@ impl TxPool { } } + /// Returns the currently tracked block values + pub(crate) fn block_info(&self) -> BlockInfo { + BlockInfo { + last_seen_block_hash: self.all_transactions.last_seen_block_hash, + last_seen_block_number: self.all_transactions.last_seen_block_number, + pending_basefee: self.all_transactions.pending_basefee, + } + } + /// Updates the pool based on the changed base fee. /// /// This enforces the dynamic fee requirement. @@ -166,22 +175,26 @@ impl TxPool { /// /// This removes all mined transactions, updates according to the new base fee and rechecks /// sender allowance. - pub(crate) fn on_new_block(&mut self, event: OnNewBlockEvent) -> OnNewBlockOutcome { + pub(crate) fn on_canonical_state_change( + &mut self, + event: CanonicalStateUpdate, + ) -> OnNewCanonicalStateOutcome { // Remove all transaction that were included in the block for tx_hash in &event.mined_transactions { - self.remove_transaction_by_hash(tx_hash); - // Update removed transactions metric - self.metrics.removed_transactions.increment(1); + if self.remove_transaction_by_hash(tx_hash).is_some() { + // Update removed transactions metric + self.metrics.removed_transactions.increment(1); + } } // Apply the state changes to the total set of transactions which triggers sub-pool updates. let updates = - self.all_transactions.update(event.pending_block_base_fee, &event.state_changes); + self.all_transactions.update(event.pending_block_base_fee, &event.changed_accounts); // Process the sub-pool updates let UpdateOutcome { promoted, discarded } = self.process_updates(updates); - OnNewBlockOutcome { + OnNewCanonicalStateOutcome { block_hash: event.hash, mined: event.mined_transactions, promoted, @@ -489,8 +502,6 @@ impl fmt::Debug for TxPool { /// This is the sole entrypoint that's guarding all sub-pools, all sub-pool actions are always /// derived from this set. Updates returned from this type must be applied to the sub-pools. pub(crate) struct AllTransactions { - /// Expected base fee for the pending block. - pending_basefee: u128, /// Minimum base fee required by the protocol. /// /// Transactions with a lower base fee will never be included by the chain @@ -505,6 +516,12 @@ pub(crate) struct AllTransactions { txs: BTreeMap>, /// Tracks the number of transactions by sender that are currently in the pool. tx_counter: FnvHashMap, + /// The current block number the pool keeps track of. + last_seen_block_number: u64, + /// The current block hash the pool keeps track of. + last_seen_block_hash: H256, + /// Expected base fee for the pending block. + pending_basefee: u128, } impl AllTransactions { @@ -572,7 +589,7 @@ impl AllTransactions { pub(crate) fn update( &mut self, pending_block_base_fee: u128, - _state_diffs: &StateDiff, + _changed_accounts: &[ChangedAccount], ) -> Vec { // update new basefee self.pending_basefee = pending_block_base_fee; @@ -1010,12 +1027,14 @@ impl Default for AllTransactions { fn default() -> Self { Self { max_account_slots: MAX_ACCOUNT_SLOTS_PER_SENDER, - pending_basefee: Default::default(), minimal_protocol_basefee: MIN_PROTOCOL_BASE_FEE, block_gas_limit: 30_000_000, by_hash: Default::default(), txs: Default::default(), tx_counter: Default::default(), + last_seen_block_number: 0, + last_seen_block_hash: Default::default(), + pending_basefee: Default::default(), } } } diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index c81294515b..7d01d6d37b 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -24,15 +24,20 @@ pub trait TransactionPool: Send + Sync + Clone { /// The transaction type of the pool type Transaction: PoolTransaction; - /// Returns stats about the pool. - fn status(&self) -> PoolSize; + /// Returns stats about the pool and all sub-pools. + fn pool_size(&self) -> PoolSize; - /// Event listener for when a new block was mined. + /// Returns the block the pool is currently tracking. + /// + /// This tracks the block that the pool has last seen. + fn block_info(&self) -> BlockInfo; + + /// Event listener for when the pool needs to be updated /// /// Implementers need to update the pool accordingly. /// For example the base fee of the pending block is determined after a block is mined which /// affects the dynamic fee requirement of pending transactions in the pool. - fn on_new_block(&self, event: OnNewBlockEvent); + fn on_canonical_state_change(&self, update: CanonicalStateUpdate); /// Imports an _external_ transaction. /// @@ -44,6 +49,17 @@ pub trait TransactionPool: Send + Sync + Clone { self.add_transaction(TransactionOrigin::External, transaction).await } + /// Imports all _external_ transactions + /// + /// + /// Consumer: Utility + async fn add_external_transactions( + &self, + transactions: Vec, + ) -> PoolResult>> { + self.add_transactions(TransactionOrigin::External, transactions).await + } + /// Adds an _unvalidated_ transaction into the pool. /// /// Consumer: RPC @@ -108,7 +124,7 @@ pub trait TransactionPool: Send + Sync + Clone { /// Removes all transactions corresponding to the given hashes. /// - /// Also removes all dependent transactions. + /// Also removes all _dependent_ transactions. /// /// Consumer: Block production fn remove_transactions( @@ -229,25 +245,48 @@ impl TransactionOrigin { } } -/// Event fired when a new block was mined +/// Represents changes after a new canonical block or range of canonical blocks was added to the +/// chain. +/// +/// It is expected that this is only used if the added blocks are canonical to the pool's last known +/// block hash. In other words, the first added block of the range must be the child of the last +/// known block hash. +/// +/// This is used to update the pool state accordingly. #[derive(Debug, Clone)] -pub struct OnNewBlockEvent { - /// Hash of the added block. +pub struct CanonicalStateUpdate { + /// Hash of the tip block. pub hash: H256, + /// Number of the tip block. + pub number: u64, /// EIP-1559 Base fee of the _next_ (pending) block /// /// The base fee of a block depends on the utilization of the last block and its base fee. pub pending_block_base_fee: u128, - /// Provides a set of state changes that affected the accounts. - pub state_changes: StateDiff, - /// All mined transactions in the block + /// A set of changed accounts across a range of blocks. + pub changed_accounts: Vec, + /// All mined transactions in the block range. pub mined_transactions: Vec, } -/// Contains a list of changed state -#[derive(Debug, Clone)] -pub struct StateDiff { - // TODO(mattsse) this could be an `Arc>` +/// Represents a changed account +#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +pub struct ChangedAccount { + /// The address of the account. + pub address: Address, + /// Account nonce. + pub nonce: u64, + /// Account balance. + pub balance: U256, +} + +// === impl ChangedAccount === + +impl ChangedAccount { + /// Creates a new `ChangedAccount` with the given address and 0 balance and nonce. + pub(crate) fn empty(address: Address) -> Self { + Self { address, nonce: 0, balance: U256::ZERO } + } } /// An `Iterator` that only returns transactions that are ready to be executed. @@ -480,3 +519,17 @@ pub struct PoolSize { /// Reported size of transactions in the _queued_ sub-pool. pub queued_size: usize, } + +/// Represents the current status of the pool. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct BlockInfo { + /// Hash for the currently tracked block. + pub last_seen_block_hash: H256, + /// Current the currently tracked block. + pub last_seen_block_number: u64, + /// Currently enforced base fee: the threshold for the basefee sub-pool. + /// + /// Note: this is the derived base fee of the _next_ block that builds on the clock the pool is + /// currently tracking. + pub pending_basefee: u128, +}