feat: reload dirty accounts if pool drifts (#3732)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Matthias Seitz
2023-07-15 17:26:44 +02:00
committed by GitHub
parent 64d58456da
commit 9a00f04d77
8 changed files with 240 additions and 51 deletions

4
Cargo.lock generated
View File

@@ -4296,9 +4296,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.9"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57"
[[package]]
name = "pin-utils"

View File

@@ -241,6 +241,8 @@ impl Command {
client,
pool,
chain_events,
ctx.task_executor.clone(),
Default::default(),
),
);
debug!(target: "reth::cli", "Spawned txpool maintenance task");

View File

@@ -132,7 +132,7 @@
//! );
//!
//! // spawn a task that listens for new blocks and updates the pool's transactions, mined transactions etc..
//! tokio::task::spawn( maintain_transaction_pool_future(client, pool, stream));
//! tokio::task::spawn( maintain_transaction_pool_future(client, pool, stream, TokioTaskExecutor::default(), Default::default()));
//!
//! # }
//! ```
@@ -145,7 +145,10 @@ use crate::pool::PoolInner;
use aquamarine as _;
use reth_primitives::{Address, TxHash, U256};
use reth_provider::StateProviderFactory;
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::mpsc::Receiver;
use tracing::{instrument, trace};
@@ -438,6 +441,10 @@ where
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.pool.get_transactions_by_sender(sender)
}
fn unique_senders(&self) -> HashSet<Address> {
self.pool.unique_senders()
}
}
impl<V: TransactionValidator, T: TransactionOrdering> TransactionPoolExt for Pool<V, T>
@@ -454,6 +461,10 @@ where
fn on_canonical_state_change(&self, update: CanonicalStateUpdate) {
self.pool.on_canonical_state_change(update);
}
fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
self.pool.update_accounts(accounts);
}
}
impl<V: TransactionValidator, T: TransactionOrdering> Clone for Pool<V, T> {

View File

@@ -5,33 +5,57 @@ use crate::{
traits::{CanonicalStateUpdate, ChangedAccount, TransactionPoolExt},
BlockInfo, TransactionPool,
};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use futures_util::{
future::{BoxFuture, Fuse, FusedFuture},
FutureExt, Stream, StreamExt,
};
use reth_primitives::{Address, BlockHash, BlockNumberOrTag, FromRecoveredTransaction};
use reth_provider::{BlockReaderIdExt, CanonStateNotification, PostState, StateProviderFactory};
use reth_tasks::TaskSpawner;
use std::{
borrow::Borrow,
collections::HashSet,
hash::{Hash, Hasher},
};
use tracing::debug;
use tokio::sync::oneshot;
use tracing::{debug, trace};
/// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
/// last_seen.number`
const MAX_UPDATE_DEPTH: u64 = 64;
/// Additional settings for maintaining the transaction pool
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MaintainPoolConfig {
/// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
/// last_seen.number`
///
/// Default: 64 (2 epochs)
pub max_update_depth: u64,
/// Maximum number of accounts to reload from state at once when updating the transaction pool.
///
/// Default: 250
pub max_reload_accounts: usize,
}
impl Default for MaintainPoolConfig {
fn default() -> Self {
Self { max_update_depth: 64, max_reload_accounts: 250 }
}
}
/// Returns a spawnable future for maintaining the state of the transaction pool.
pub fn maintain_transaction_pool_future<Client, P, St>(
pub fn maintain_transaction_pool_future<Client, P, St, Tasks>(
client: Client,
pool: P,
events: St,
task_spawner: Tasks,
config: MaintainPoolConfig,
) -> BoxFuture<'static, ()>
where
Client: StateProviderFactory + BlockReaderIdExt + Send + 'static,
Client: StateProviderFactory + BlockReaderIdExt + Clone + Send + 'static,
P: TransactionPoolExt + 'static,
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
Tasks: TaskSpawner + 'static,
{
async move {
maintain_transaction_pool(client, pool, events).await;
maintain_transaction_pool(client, pool, events, task_spawner, config).await;
}
.boxed()
}
@@ -39,14 +63,20 @@ where
/// Maintains the state of the transaction pool by handling new blocks and reorgs.
///
/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
#[allow(unused)]
pub async fn maintain_transaction_pool<Client, P, St>(client: Client, pool: P, mut events: St)
where
Client: StateProviderFactory + BlockReaderIdExt + Send + 'static,
pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
client: Client,
pool: P,
mut events: St,
task_spawner: Tasks,
config: MaintainPoolConfig,
) where
Client: StateProviderFactory + BlockReaderIdExt + Clone + Send + 'static,
P: TransactionPoolExt + 'static,
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
Tasks: TaskSpawner + 'static,
{
let mut metrics = MaintainPoolMetrics::default();
let metrics = MaintainPoolMetrics::default();
let MaintainPoolConfig { max_update_depth, max_reload_accounts } = config;
// ensure the pool points to latest state
if let Ok(Some(latest)) = client.block_by_number_or_tag(BlockNumberOrTag::Latest) {
let latest = latest.seal_slow();
@@ -64,17 +94,100 @@ where
// keeps track of the state of the pool wrt to blocks
let mut maintained_state = MaintainedPoolState::InSync;
// the future that reloads accounts from state
let mut reload_accounts_fut = Fuse::terminated();
// The update loop that waits for new blocks and reorgs and performs pool updated
// Listen for new chain events and derive the update action for the pool
loop {
trace!(target = "txpool", state=?maintained_state, "awaiting new block or reorg");
metrics.set_dirty_accounts_len(dirty_addresses.len());
let Some(event) = events.next().await else { break };
let pool_info = pool.block_info();
// TODO from time to time re-check the unique accounts in the pool and remove and resync
// based on the tracked state
// after performing a pool update after a new block we have some time to properly update
// dirty accounts and correct if the pool drifted from current state, for example after
// restart or a pipeline run
if maintained_state.is_drifted() {
// assuming all senders are dirty
dirty_addresses = pool.unique_senders();
maintained_state = MaintainedPoolState::InSync;
}
// if we have accounts that are out of sync with the pool, we reload them in chunks
if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
let (tx, rx) = oneshot::channel();
let c = client.clone();
let at = pool_info.last_seen_block_hash;
let fut = if dirty_addresses.len() > max_reload_accounts {
// need to chunk accounts to reload
let accs_to_reload =
dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
for acc in &accs_to_reload {
// make sure we remove them from the dirty set
dirty_addresses.remove(acc);
}
async move {
let res = load_accounts(c, at, accs_to_reload.into_iter());
let _ = tx.send(res);
}
.boxed()
} else {
// can fetch all dirty accounts at once
let accs_to_reload = std::mem::take(&mut dirty_addresses);
async move {
let res = load_accounts(c, at, accs_to_reload.into_iter());
let _ = tx.send(res);
}
.boxed()
};
reload_accounts_fut = rx.fuse();
task_spawner.spawn_blocking(fut);
}
// outcomes of the futures we are waiting on
let mut event = None;
let mut reloaded = None;
// select of account reloads and new canonical state updates which should arrive at the rate
// of the block time (12s)
tokio::select! {
res = &mut reload_accounts_fut => {
reloaded = Some(res);
}
ev = events.next() => {
if ev.is_none() {
// the stream ended, we are done
break;
}
event = ev;
}
}
// handle the result of the account reload
match reloaded {
Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
// reloaded accounts successfully
// extend accounts we failed to load from database
dirty_addresses.extend(failed_to_load);
// update the pool with the loaded accounts
pool.update_accounts(accounts);
}
Some(Ok(Err(res))) => {
// Failed to load accounts from state
let (accs, err) = *res;
debug!(target = "txpool", ?err, "failed to load accounts");
dirty_addresses.extend(accs);
}
Some(Err(_)) => {
// failed to receive the accounts, sender dropped, only possible if task panicked
maintained_state = MaintainedPoolState::Drifted;
}
None => {}
}
// handle the new block or reorg
let Some(event) = event else { continue };
match event {
CanonStateNotification::Reorg { old, new } => {
let (old_blocks, old_state) = old.inner();
@@ -88,7 +201,7 @@ where
new_first.parent_hash == pool_info.last_seen_block_hash)
{
// the new block points to a higher block than the oldest block in the old chain
maintained_state = MaintainedPoolState::Drift;
maintained_state = MaintainedPoolState::Drifted;
}
// base fee for the next block: `new_tip+1`
@@ -108,7 +221,7 @@ where
// for these we need to fetch the nonce+balance from the db at the new tip
let mut changed_accounts =
match load_accounts(&client, new_tip.hash, missing_changed_acc) {
match load_accounts(client.clone(), new_tip.hash, missing_changed_acc) {
Ok(LoadedAccounts { accounts, failed_to_load }) => {
// extend accounts we failed to load from database
dirty_addresses.extend(failed_to_load);
@@ -118,6 +231,7 @@ where
Err(err) => {
let (addresses, err) = *err;
debug!(
target = "txpool",
?err,
"failed to load missing changed accounts at new tip: {:?}",
new_tip.hash
@@ -170,13 +284,20 @@ where
let pending_block_base_fee = tip.next_block_base_fee().unwrap_or_default() as u128;
let first_block = blocks.first();
trace!(
target = "txpool",
first = first_block.number,
tip = tip.number,
pool_block = pool_info.last_seen_block_number,
"update pool on new commit"
);
// check if the depth is too large and should be skipped, this could happen after
// initial sync or long re-sync
let depth = tip.number.abs_diff(pool_info.last_seen_block_number);
if depth > MAX_UPDATE_DEPTH {
maintained_state = MaintainedPoolState::Drift;
debug!(?depth, "skipping deep canonical update");
if depth > max_update_depth {
maintained_state = MaintainedPoolState::Drifted;
debug!(target = "txpool", ?depth, "skipping deep canonical update");
let info = BlockInfo {
last_seen_block_hash: tip.hash,
last_seen_block_number: tip.number,
@@ -200,7 +321,7 @@ where
// we received a new canonical chain commit but the commit is not canonical with
// the pool's block, this could happen after initial sync or
// long re-sync
maintained_state = MaintainedPoolState::Drift;
maintained_state = MaintainedPoolState::Drifted;
}
// Canonical update
@@ -219,12 +340,19 @@ where
/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
/// state.
#[derive(Eq, PartialEq)]
#[derive(Debug, Eq, PartialEq)]
enum MaintainedPoolState {
/// Pool is assumed to be in sync with the state
/// Pool is assumed to be in sync with the current state
InSync,
/// Pool could be out of sync with the state
Drift,
Drifted,
}
impl MaintainedPoolState {
/// Returns `true` if the pool is assumed to be out of sync with the current state.
fn is_drifted(&self) -> bool {
matches!(self, MaintainedPoolState::Drifted)
}
}
/// A unique ChangedAccount identified by its address that can be used for deduplication
@@ -263,7 +391,7 @@ struct LoadedAccounts {
///
/// Note: this expects _unique_ addresses
fn load_accounts<Client, I>(
client: &Client,
client: Client,
at: BlockHash,
addresses: I,
) -> Result<LoadedAccounts, Box<(HashSet<Address>, reth_interfaces::Error)>>

View File

@@ -10,7 +10,7 @@ use crate::{
TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction,
};
use reth_primitives::{Address, TxHash};
use std::{marker::PhantomData, sync::Arc};
use std::{collections::HashSet, marker::PhantomData, sync::Arc};
use tokio::sync::{mpsc, mpsc::Receiver};
/// A [`TransactionPool`] implementation that does nothing.
@@ -150,6 +150,10 @@ impl TransactionPool for NoopTransactionPool {
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
vec![]
}
fn unique_senders(&self) -> HashSet<Address> {
Default::default()
}
}
/// A [`TransactionValidator`] that does nothing.

View File

@@ -90,12 +90,13 @@ use std::{
time::Instant,
};
use tokio::sync::mpsc;
use tracing::debug;
use tracing::{debug, trace};
mod events;
pub use events::{FullTransactionEvent, TransactionEvent};
mod listener;
use crate::pool::txpool::UpdateOutcome;
pub use listener::{AllTransactionsEvents, TransactionEvents};
mod best;
@@ -163,6 +164,11 @@ where
self.identifiers.write().sender_id_or_create(addr)
}
/// Returns all senders in the pool
pub(crate) fn unique_senders(&self) -> HashSet<Address> {
self.pool.read().unique_senders()
}
/// Converts the changed accounts to a map of sender ids to sender info (internal identifier
/// used for accounts)
fn changed_senders(
@@ -243,6 +249,8 @@ where
/// Updates the entire pool after a new block was executed.
pub(crate) fn on_canonical_state_change(&self, update: CanonicalStateUpdate) {
trace!(target: "txpool", %update, "updating pool on canonical state change");
let CanonicalStateUpdate {
hash,
number,
@@ -264,6 +272,18 @@ where
self.notify_on_new_state(outcome);
}
/// Performs account updates on the pool.
///
/// This will either promote or discard transactions based on the new account state.
pub(crate) fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
let changed_senders = self.changed_senders(accounts.into_iter());
let UpdateOutcome { promoted, discarded } =
self.pool.write().update_accounts(changed_senders);
let mut listener = self.event_listener.write();
promoted.iter().for_each(|tx| listener.pending(tx, None));
discarded.iter().for_each(|tx| listener.discarded(tx));
}
/// Add a single validated transaction into the pool.
///
/// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)

View File

@@ -19,11 +19,11 @@ use crate::{
use fnv::FnvHashMap;
use reth_primitives::{
constants::{ETHEREUM_BLOCK_GAS_LIMIT, MIN_PROTOCOL_BASE_FEE},
TxHash, H256,
Address, TxHash, H256,
};
use std::{
cmp::Ordering,
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap},
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet},
fmt,
ops::Bound::{Excluded, Unbounded},
sync::Arc,
@@ -111,6 +111,11 @@ impl<T: TransactionOrdering> TxPool<T> {
&self.all_transactions
}
/// Returns all senders in the pool
pub(crate) fn unique_senders(&self) -> HashSet<Address> {
self.all_transactions.txs.values().map(|tx| tx.transaction.sender()).collect()
}
/// Returns stats about the size of pool.
pub(crate) fn size(&self) -> PoolSize {
PoolSize {
@@ -227,6 +232,22 @@ impl<T: TransactionOrdering> TxPool<T> {
self.all_transactions.txs_iter(sender).map(|(_, tx)| Arc::clone(&tx.transaction)).collect()
}
/// Updates the transactions for the changed senders.
pub(crate) fn update_accounts(
&mut self,
changed_senders: HashMap<SenderId, SenderInfo>,
) -> UpdateOutcome {
// track changed accounts
self.sender_info.extend(changed_senders.clone());
// Apply the state changes to the total set of transactions which triggers sub-pool updates.
let updates = self.all_transactions.update(changed_senders);
// Process the sub-pool updates
let update = self.process_updates(updates);
// update the metrics after the update
self.update_size_metrics();
update
}
/// Updates the entire pool after a new block was mined.
///
/// This removes all mined transactions, updates according to the new base fee and rechecks
@@ -237,9 +258,6 @@ impl<T: TransactionOrdering> TxPool<T> {
mined_transactions: Vec<TxHash>,
changed_senders: HashMap<SenderId, SenderInfo>,
) -> OnNewCanonicalStateOutcome {
// track changed accounts
self.sender_info.extend(changed_senders.clone());
// update block info
let block_hash = block_info.last_seen_block_hash;
self.all_transactions.set_block_info(block_info);
@@ -252,14 +270,7 @@ impl<T: TransactionOrdering> TxPool<T> {
}
}
// Apply the state changes to the total set of transactions which triggers sub-pool updates.
let updates = self.all_transactions.update(changed_senders);
// Process the sub-pool updates
let UpdateOutcome { promoted, discarded } = self.process_updates(updates);
// update the metrics after the update
self.update_size_metrics();
let UpdateOutcome { promoted, discarded } = self.update_accounts(changed_senders);
self.metrics.performed_state_updates.increment(1);
@@ -1281,11 +1292,11 @@ impl<T: PoolTransaction> PoolInternalTransaction<T> {
/// Tracks the result after updating the pool
#[derive(Default, Debug)]
pub struct UpdateOutcome {
pub(crate) struct UpdateOutcome {
/// transactions promoted to the ready queue
promoted: Vec<TxHash>,
pub(crate) promoted: Vec<TxHash>,
/// transaction that failed and became discarded
discarded: Vec<TxHash>,
pub(crate) discarded: Vec<TxHash>,
}
/// Represents the outcome of a prune

View File

@@ -11,7 +11,7 @@ use reth_primitives::{
};
use reth_rlp::Encodable;
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
fmt,
pin::Pin,
sync::Arc,
@@ -242,6 +242,9 @@ pub trait TransactionPool: Send + Sync + Clone {
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
/// Returns a set of all senders of transactions in the pool
fn unique_senders(&self) -> HashSet<Address>;
}
/// Extension for [TransactionPool] trait that allows to set the current block info.
@@ -256,6 +259,9 @@ pub trait TransactionPoolExt: TransactionPool {
/// For example the base fee of the pending block is determined after a block is mined which
/// affects the dynamic fee requirement of pending transactions in the pool.
fn on_canonical_state_change(&self, update: CanonicalStateUpdate);
/// Updates the accounts in the pool
fn update_accounts(&self, accounts: Vec<ChangedAccount>);
}
/// A Helper type that bundles all transactions in the pool.
@@ -389,6 +395,13 @@ pub struct CanonicalStateUpdate {
pub mined_transactions: Vec<H256>,
}
impl fmt::Display for CanonicalStateUpdate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{ hash: {}, number: {}, pending_block_base_fee: {}, changed_accounts: {}, mined_transactions: {} }}",
self.hash, self.number, self.pending_block_base_fee, self.changed_accounts.len(), self.mined_transactions.len())
}
}
/// Represents a changed account
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub struct ChangedAccount {