From 56ded417e9c384eecdf0bc8ef165f0c1c25aef88 Mon Sep 17 00:00:00 2001 From: Karl Yu <43113774+0xKarl98@users.noreply.github.com> Date: Wed, 12 Nov 2025 18:45:26 +0800 Subject: [PATCH] feat: limit handling of incoming txs to trusted peers (#19666) Co-authored-by: Matthias Seitz --- crates/net/network/src/transactions/config.rs | 47 ++++++++++++ crates/net/network/src/transactions/mod.rs | 23 +++++- crates/net/network/tests/it/txgossip.rs | 71 ++++++++++++++++++- crates/node/core/src/args/network.rs | 10 ++- docs/vocs/docs/pages/cli/reth/node.mdx | 7 ++ docs/vocs/docs/pages/cli/reth/p2p/body.mdx | 7 ++ docs/vocs/docs/pages/cli/reth/p2p/header.mdx | 7 ++ docs/vocs/docs/pages/cli/reth/stage/run.mdx | 7 ++ 8 files changed, 175 insertions(+), 4 deletions(-) diff --git a/crates/net/network/src/transactions/config.rs b/crates/net/network/src/transactions/config.rs index c34bbecd77..ba821d5df2 100644 --- a/crates/net/network/src/transactions/config.rs +++ b/crates/net/network/src/transactions/config.rs @@ -14,6 +14,7 @@ use derive_more::{Constructor, Display}; use reth_eth_wire::NetworkPrimitives; use reth_ethereum_primitives::TxType; +use reth_network_types::peers::kind::PeerKind; /// Configuration for managing transactions within the network. #[derive(Debug, Clone)] @@ -26,6 +27,9 @@ pub struct TransactionsManagerConfig { /// How new pending transactions are propagated. #[cfg_attr(feature = "serde", serde(default))] pub propagation_mode: TransactionPropagationMode, + /// Which peers we accept incoming transactions or announcements from. + #[cfg_attr(feature = "serde", serde(default))] + pub ingress_policy: TransactionIngressPolicy, } impl Default for TransactionsManagerConfig { @@ -34,6 +38,7 @@ impl Default for TransactionsManagerConfig { transaction_fetcher_config: TransactionFetcherConfig::default(), max_transactions_seen_by_peer_history: DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER, propagation_mode: TransactionPropagationMode::default(), + ingress_policy: TransactionIngressPolicy::default(), } } } @@ -177,6 +182,48 @@ impl FromStr for TransactionPropagationKind { } } +/// Determines which peers we will accept incoming transactions or announcements from. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum TransactionIngressPolicy { + /// Accept transactions from any peer. + #[default] + All, + /// Accept transactions only from trusted peers. + Trusted, + /// Drop all incoming transactions. + None, +} + +impl TransactionIngressPolicy { + /// Returns true if the ingress policy allows the provided peer kind. + pub const fn allows(&self, peer_kind: PeerKind) -> bool { + match self { + Self::All => true, + Self::Trusted => peer_kind.is_trusted(), + Self::None => false, + } + } + + /// Returns true if the ingress policy accepts transactions from any peer. + pub const fn allows_all(&self) -> bool { + matches!(self, Self::All) + } +} + +impl FromStr for TransactionIngressPolicy { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "All" | "all" => Ok(Self::All), + "Trusted" | "trusted" => Ok(Self::Trusted), + "None" | "none" => Ok(Self::None), + _ => Err(format!("Invalid transaction ingress policy: {s}")), + } + } +} + /// Defines the outcome of evaluating a transaction against an `AnnouncementFilteringPolicy`. /// /// Dictates how the `TransactionManager` should proceed on an announced transaction. diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index f4ef42523d..c11512e6c2 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -17,8 +17,8 @@ pub use self::constants::{ }; use config::{AnnouncementAcceptance, StrictEthAnnouncementFilter, TransactionPropagationKind}; pub use config::{ - AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionPropagationMode, - TransactionPropagationPolicy, TransactionsManagerConfig, + AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy, + TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig, }; use policy::{NetworkPolicies, TransactionPolicies}; @@ -1282,10 +1282,25 @@ where } } + /// Returns true if the ingress policy allows processing messages from the given peer. + fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool { + if self.config.ingress_policy.allows_all() { + return true; + } + let Some(peer) = self.peers.get(peer_id) else { + return false; + }; + self.config.ingress_policy.allows(peer.peer_kind()) + } + /// Handles dedicated transaction events related to the `eth` protocol. fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { match event { NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => { + if !self.accepts_incoming_from(&peer_id) { + trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy"); + return; + } // ensure we didn't receive any blob transactions as these are disallowed to be // broadcasted in full @@ -1306,6 +1321,10 @@ where } } NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => { + if !self.accepts_incoming_from(&peer_id) { + trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy"); + return; + } self.on_new_pooled_transaction_hashes(peer_id, msg) } NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => { diff --git a/crates/net/network/tests/it/txgossip.rs b/crates/net/network/tests/it/txgossip.rs index ed1c2f925d..d0f192cff5 100644 --- a/crates/net/network/tests/it/txgossip.rs +++ b/crates/net/network/tests/it/txgossip.rs @@ -5,7 +5,9 @@ use futures::StreamExt; use reth_ethereum_primitives::TransactionSigned; use reth_network::{ test_utils::{NetworkEventStream, Testnet}, - transactions::config::TransactionPropagationKind, + transactions::config::{ + TransactionIngressPolicy, TransactionPropagationKind, TransactionsManagerConfig, + }, NetworkEvent, NetworkEventListenerProvider, Peers, }; use reth_network_api::{events::PeerEvent, PeerKind, PeersInfo}; @@ -123,6 +125,73 @@ async fn test_tx_propagation_policy_trusted_only() { assert!(buff.contains(&outcome_1.hash)); } +#[tokio::test(flavor = "multi_thread")] +async fn test_tx_ingress_policy_trusted_only() { + reth_tracing::init_test_tracing(); + + let provider = MockEthProvider::default(); + + let tx_manager_config = TransactionsManagerConfig { + ingress_policy: TransactionIngressPolicy::Trusted, + ..Default::default() + }; + + let net = Testnet::create_with(2, provider.clone()).await; + let net = net.with_eth_pool_config(tx_manager_config); + + let handle = net.spawn(); + + // connect all the peers + handle.connect_peers().await; + + let peer_0_handle = &handle.peers()[0]; + let peer_1_handle = &handle.peers()[1]; + + let mut peer0_tx_listener = peer_0_handle.pool().unwrap().pending_transactions_listener(); + + let mut tx_gen = TransactionGenerator::new(rand::rng()); + let tx = tx_gen.gen_eip1559_pooled(); + + // ensure the sender has balance + let sender = tx.sender(); + provider.add_account(sender, ExtendedAccount::new(0, U256::from(100_000_000))); + + // insert the tx in peer1's pool + let outcome_0 = peer_1_handle.pool().unwrap().add_external_transaction(tx).await.unwrap(); + + // ensure tx is not accepted by peer0 + peer0_tx_listener.try_recv().expect_err("Empty"); + + let mut event_stream_0 = NetworkEventStream::new(peer_0_handle.network().event_listener()); + let mut event_stream_1 = NetworkEventStream::new(peer_1_handle.network().event_listener()); + + // disconnect peer1 from peer0 + peer_0_handle.network().remove_peer(*peer_1_handle.peer_id(), PeerKind::Static); + join!(event_stream_0.next_session_closed(), event_stream_1.next_session_closed()); + + // re register peer1 as trusted + peer_0_handle.network().add_trusted_peer(*peer_1_handle.peer_id(), peer_1_handle.local_addr()); + join!(event_stream_0.next_session_established(), event_stream_1.next_session_established()); + + let mut tx_gen = TransactionGenerator::new(rand::rng()); + let tx = tx_gen.gen_eip1559_pooled(); + + // ensure the sender has balance + let sender = tx.sender(); + provider.add_account(sender, ExtendedAccount::new(0, U256::from(100_000_000))); + + // insert pending tx in peer1's pool + let outcome_1 = peer_1_handle.pool().unwrap().add_external_transaction(tx).await.unwrap(); + + // ensure peer0 now receives both pending txs from peer1 (the blocked one and the new one) + let mut buff = Vec::with_capacity(2); + buff.push(peer0_tx_listener.recv().await.unwrap()); + buff.push(peer0_tx_listener.recv().await.unwrap()); + + assert!(buff.contains(&outcome_0.hash)); + assert!(buff.contains(&outcome_1.hash)); +} + #[tokio::test(flavor = "multi_thread")] async fn test_4844_tx_gossip_penalization() { reth_tracing::init_test_tracing(); diff --git a/crates/node/core/src/args/network.rs b/crates/node/core/src/args/network.rs index 4e57839e3e..9d6598cca1 100644 --- a/crates/node/core/src/args/network.rs +++ b/crates/node/core/src/args/network.rs @@ -19,7 +19,7 @@ use reth_discv5::{ use reth_net_nat::{NatResolver, DEFAULT_NET_IF_NAME}; use reth_network::{ transactions::{ - config::TransactionPropagationKind, + config::{TransactionIngressPolicy, TransactionPropagationKind}, constants::{ tx_fetcher::{ DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS, @@ -162,6 +162,12 @@ pub struct NetworkArgs { #[arg(long = "tx-propagation-policy", default_value_t = TransactionPropagationKind::All)] pub tx_propagation_policy: TransactionPropagationKind, + /// Transaction ingress policy + /// + /// Determines which peers' transactions are accepted over P2P. + #[arg(long = "tx-ingress-policy", default_value_t = TransactionIngressPolicy::All)] + pub tx_ingress_policy: TransactionIngressPolicy, + /// Disable transaction pool gossip /// /// Disables gossiping of transactions in the mempool to peers. This can be omitted for @@ -230,6 +236,7 @@ impl NetworkArgs { ), max_transactions_seen_by_peer_history: self.max_seen_tx_history, propagation_mode: self.propagation_mode, + ingress_policy: self.tx_ingress_policy, } } @@ -373,6 +380,7 @@ impl Default for NetworkArgs { max_capacity_cache_txns_pending_fetch: DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, net_if: None, tx_propagation_policy: TransactionPropagationKind::default(), + tx_ingress_policy: TransactionIngressPolicy::default(), disable_tx_gossip: false, propagation_mode: TransactionPropagationMode::Sqrt, required_block_hashes: vec![], diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index 9adfd18d2b..bdf39a671f 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -245,6 +245,13 @@ Networking: [default: All] + --tx-ingress-policy + Transaction ingress policy + + Determines which peers' transactions are accepted over P2P. + + [default: All] + --disable-tx-gossip Disable transaction pool gossip diff --git a/docs/vocs/docs/pages/cli/reth/p2p/body.mdx b/docs/vocs/docs/pages/cli/reth/p2p/body.mdx index a7670bacce..e9c8ff08cd 100644 --- a/docs/vocs/docs/pages/cli/reth/p2p/body.mdx +++ b/docs/vocs/docs/pages/cli/reth/p2p/body.mdx @@ -191,6 +191,13 @@ Networking: [default: All] + --tx-ingress-policy + Transaction ingress policy + + Determines which peers' transactions are accepted over P2P. + + [default: All] + --disable-tx-gossip Disable transaction pool gossip diff --git a/docs/vocs/docs/pages/cli/reth/p2p/header.mdx b/docs/vocs/docs/pages/cli/reth/p2p/header.mdx index 76afd9a4cf..0dc48503a8 100644 --- a/docs/vocs/docs/pages/cli/reth/p2p/header.mdx +++ b/docs/vocs/docs/pages/cli/reth/p2p/header.mdx @@ -191,6 +191,13 @@ Networking: [default: All] + --tx-ingress-policy + Transaction ingress policy + + Determines which peers' transactions are accepted over P2P. + + [default: All] + --disable-tx-gossip Disable transaction pool gossip diff --git a/docs/vocs/docs/pages/cli/reth/stage/run.mdx b/docs/vocs/docs/pages/cli/reth/stage/run.mdx index b7a5a41aaf..c56bede6d0 100644 --- a/docs/vocs/docs/pages/cli/reth/stage/run.mdx +++ b/docs/vocs/docs/pages/cli/reth/stage/run.mdx @@ -295,6 +295,13 @@ Networking: [default: All] + --tx-ingress-policy + Transaction ingress policy + + Determines which peers' transactions are accepted over P2P. + + [default: All] + --disable-tx-gossip Disable transaction pool gossip