From 9b5a84acc8180c878f9e36b5ce3628b8a69382b4 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 27 Apr 2023 15:53:55 +0200 Subject: [PATCH] feat: initialize txpool maintenance (#2429) Co-authored-by: Roman Krasiuk --- Cargo.lock | 1 + bin/reth/src/node/mod.rs | 23 ++++++++++++++++++++- crates/storage/provider/Cargo.toml | 1 + crates/storage/provider/src/traits/chain.rs | 12 ++++++++--- crates/transaction-pool/src/maintain.rs | 4 ++-- 5 files changed, 35 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 802fb710b7..129ec7fe34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5156,6 +5156,7 @@ dependencies = [ "reth-trie", "thiserror", "tokio", + "tokio-stream", "tracing", ] diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 6ae21b2869..23731dfcb8 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -40,7 +40,7 @@ use reth_interfaces::{ use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::NetworkInfo; use reth_primitives::{BlockHashOrNumber, Chain, ChainSpec, Head, Header, SealedHeader, H256}; -use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; +use reth_provider::{BlockProvider, CanonStateSubscriptions, HeaderProvider, ShareableDatabase}; use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; use reth_rpc_engine_api::EngineApi; @@ -197,6 +197,27 @@ impl Command { ); info!(target: "reth::cli", "Transaction pool initialized"); + // spawn txpool maintenance task + { + let pool = transaction_pool.clone(); + let chain_events = blockchain_db.canonical_state_stream(); + let client = blockchain_db.clone(); + ctx.task_executor.spawn_critical( + "txpool maintenance task", + Box::pin(async move { + let chain_events = chain_events.filter_map(|event| async move { event.ok() }); + pin_mut!(chain_events); + reth_transaction_pool::maintain::maintain_transaction_pool( + client, + pool, + chain_events, + ) + .await + }), + ); + debug!(target: "reth::cli", "Spawned txpool maintenance task"); + } + info!(target: "reth::cli", "Connecting to P2P network"); let secret_key = get_secret_key(self.p2p_secret_key.unwrap_or_chain_default(self.chain.chain))?; diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 41f14e098b..3283e7abf1 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -17,6 +17,7 @@ reth-trie = { path = "../../trie" } # async tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] } +tokio-stream = "0.1" # tracing tracing = "0.1" diff --git a/crates/storage/provider/src/traits/chain.rs b/crates/storage/provider/src/traits/chain.rs index 03c9f27d49..8deb380d0f 100644 --- a/crates/storage/provider/src/traits/chain.rs +++ b/crates/storage/provider/src/traits/chain.rs @@ -2,13 +2,14 @@ use crate::{chain::BlockReceipts, Chain}; use auto_impl::auto_impl; use std::sync::Arc; -use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::broadcast; +use tokio_stream::wrappers::BroadcastStream; /// Type alias for a receiver that receives [CanonStateNotification] -pub type CanonStateNotifications = Receiver; +pub type CanonStateNotifications = broadcast::Receiver; /// Type alias for a sender that sends [CanonStateNotification] -pub type CanonStateNotificationSender = Sender; +pub type CanonStateNotificationSender = broadcast::Sender; /// A type that allows to register chain related event subscriptions. #[auto_impl(&, Arc)] @@ -17,6 +18,11 @@ pub trait CanonStateSubscriptions: Send + Sync { /// /// A canonical chain be one or more blocks, a reorg or a revert. fn subscribe_to_canonical_state(&self) -> CanonStateNotifications; + + /// Convenience method to get a stream of [`CanonStateNotification`]. + fn canonical_state_stream(&self) -> BroadcastStream { + BroadcastStream::new(self.subscribe_to_canonical_state()) + } } /// Chain action that is triggered when a new block is imported or old block is reverted. diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 6251595693..8d99570ce3 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -22,10 +22,10 @@ pub async fn maintain_transaction_pool( pool: Pool, mut events: St, ) where - Client: StateProviderFactory + BlockProvider + 'static, + Client: StateProviderFactory + BlockProvider, V: TransactionValidator, T: TransactionOrdering::Transaction>, - St: Stream + Unpin + 'static, + St: Stream + Unpin, { // TODO set current head for the pool