mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 15:28:01 -05:00
feat(net): add transaction gossip policy (#15341)
Co-authored-by: Sagar Rana <sagarrana.pvt@gmail.com> Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
7
book/cli/reth/debug/execution.md
vendored
7
book/cli/reth/debug/execution.md
vendored
@@ -234,6 +234,13 @@ Networking:
|
||||
|
||||
If flag is set, but no value is passed, the default interface for docker `eth0` is tried.
|
||||
|
||||
--tx-propagation-policy <TX_PROPAGATION_POLICY>
|
||||
Transaction Propagation Policy
|
||||
|
||||
The policy determines which peers transactions are gossiped to.
|
||||
|
||||
[default: All]
|
||||
|
||||
--to <TO>
|
||||
The maximum block height
|
||||
|
||||
|
||||
7
book/cli/reth/debug/in-memory-merkle.md
vendored
7
book/cli/reth/debug/in-memory-merkle.md
vendored
@@ -234,6 +234,13 @@ Networking:
|
||||
|
||||
If flag is set, but no value is passed, the default interface for docker `eth0` is tried.
|
||||
|
||||
--tx-propagation-policy <TX_PROPAGATION_POLICY>
|
||||
Transaction Propagation Policy
|
||||
|
||||
The policy determines which peers transactions are gossiped to.
|
||||
|
||||
[default: All]
|
||||
|
||||
--retries <RETRIES>
|
||||
The number of retries per request
|
||||
|
||||
|
||||
7
book/cli/reth/debug/merkle.md
vendored
7
book/cli/reth/debug/merkle.md
vendored
@@ -234,6 +234,13 @@ Networking:
|
||||
|
||||
If flag is set, but no value is passed, the default interface for docker `eth0` is tried.
|
||||
|
||||
--tx-propagation-policy <TX_PROPAGATION_POLICY>
|
||||
Transaction Propagation Policy
|
||||
|
||||
The policy determines which peers transactions are gossiped to.
|
||||
|
||||
[default: All]
|
||||
|
||||
--retries <RETRIES>
|
||||
The number of retries per request
|
||||
|
||||
|
||||
7
book/cli/reth/node.md
vendored
7
book/cli/reth/node.md
vendored
@@ -226,6 +226,13 @@ Networking:
|
||||
|
||||
If flag is set, but no value is passed, the default interface for docker `eth0` is tried.
|
||||
|
||||
--tx-propagation-policy <TX_PROPAGATION_POLICY>
|
||||
Transaction Propagation Policy
|
||||
|
||||
The policy determines which peers transactions are gossiped to.
|
||||
|
||||
[default: All]
|
||||
|
||||
RPC:
|
||||
--http
|
||||
Enable the HTTP-RPC server
|
||||
|
||||
7
book/cli/reth/p2p.md
vendored
7
book/cli/reth/p2p.md
vendored
@@ -202,6 +202,13 @@ Networking:
|
||||
|
||||
If flag is set, but no value is passed, the default interface for docker `eth0` is tried.
|
||||
|
||||
--tx-propagation-policy <TX_PROPAGATION_POLICY>
|
||||
Transaction Propagation Policy
|
||||
|
||||
The policy determines which peers transactions are gossiped to.
|
||||
|
||||
[default: All]
|
||||
|
||||
Datadir:
|
||||
--datadir <DATA_DIR>
|
||||
The path to the data dir for all reth files and subdirectories.
|
||||
|
||||
7
book/cli/reth/stage/run.md
vendored
7
book/cli/reth/stage/run.md
vendored
@@ -277,6 +277,13 @@ Networking:
|
||||
|
||||
If flag is set, but no value is passed, the default interface for docker `eth0` is tried.
|
||||
|
||||
--tx-propagation-policy <TX_PROPAGATION_POLICY>
|
||||
Transaction Propagation Policy
|
||||
|
||||
The policy determines which peers transactions are gossiped to.
|
||||
|
||||
[default: All]
|
||||
|
||||
Logging:
|
||||
--log.stdout.format <FORMAT>
|
||||
The format to use for logs written to stdout
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use crate::{
|
||||
eth_requests::EthRequestHandler,
|
||||
transactions::{TransactionsManager, TransactionsManagerConfig},
|
||||
transactions::{TransactionPropagationPolicy, TransactionsManager, TransactionsManagerConfig},
|
||||
NetworkHandle, NetworkManager,
|
||||
};
|
||||
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
|
||||
@@ -72,11 +72,27 @@ impl<Tx, Eth, N: NetworkPrimitives> NetworkBuilder<Tx, Eth, N> {
|
||||
pool: Pool,
|
||||
transactions_manager_config: TransactionsManagerConfig,
|
||||
) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
|
||||
self.transactions_with_policy(pool, transactions_manager_config, Default::default())
|
||||
}
|
||||
|
||||
/// Creates a new [`TransactionsManager`] and wires it to the network.
|
||||
pub fn transactions_with_policy<Pool: TransactionPool, P: TransactionPropagationPolicy>(
|
||||
self,
|
||||
pool: Pool,
|
||||
transactions_manager_config: TransactionsManagerConfig,
|
||||
propagation_policy: P,
|
||||
) -> NetworkBuilder<TransactionsManager<Pool, N, P>, Eth, N> {
|
||||
let Self { mut network, request_handler, .. } = self;
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
network.set_transactions(tx);
|
||||
let handle = network.handle().clone();
|
||||
let transactions = TransactionsManager::new(handle, pool, rx, transactions_manager_config);
|
||||
let transactions = TransactionsManager::with_policy(
|
||||
handle,
|
||||
pool,
|
||||
rx,
|
||||
transactions_manager_config,
|
||||
propagation_policy,
|
||||
);
|
||||
NetworkBuilder { network, request_handler, transactions }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,10 @@ use crate::{
|
||||
error::NetworkError,
|
||||
eth_requests::EthRequestHandler,
|
||||
protocol::IntoRlpxSubProtocol,
|
||||
transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
|
||||
transactions::{
|
||||
config::TransactionPropagationKind, TransactionsHandle, TransactionsManager,
|
||||
TransactionsManagerConfig,
|
||||
},
|
||||
NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
|
||||
};
|
||||
use alloy_consensus::transaction::PooledTransaction;
|
||||
@@ -205,6 +208,15 @@ where
|
||||
pub fn with_eth_pool_config(
|
||||
self,
|
||||
tx_manager_config: TransactionsManagerConfig,
|
||||
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
|
||||
self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
|
||||
}
|
||||
|
||||
/// Installs an eth pool on each peer with custom transaction manager config and policy.
|
||||
pub fn with_eth_pool_config_and_policy(
|
||||
self,
|
||||
tx_manager_config: TransactionsManagerConfig,
|
||||
policy: TransactionPropagationKind,
|
||||
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
|
||||
self.map_pool(|peer| {
|
||||
let blob_store = InMemoryBlobStore::default();
|
||||
@@ -214,9 +226,10 @@ where
|
||||
TokioTaskExecutor::default(),
|
||||
);
|
||||
|
||||
peer.map_transactions_manager_with_config(
|
||||
peer.map_transactions_manager_with(
|
||||
EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
|
||||
tx_manager_config.clone(),
|
||||
policy,
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -490,6 +503,19 @@ where
|
||||
pool: P,
|
||||
config: TransactionsManagerConfig,
|
||||
) -> Peer<C, P>
|
||||
where
|
||||
P: TransactionPool,
|
||||
{
|
||||
self.map_transactions_manager_with(pool, config, Default::default())
|
||||
}
|
||||
|
||||
/// Map transactions manager with custom config and the given policy.
|
||||
pub fn map_transactions_manager_with<P>(
|
||||
self,
|
||||
pool: P,
|
||||
config: TransactionsManagerConfig,
|
||||
policy: TransactionPropagationKind,
|
||||
) -> Peer<C, P>
|
||||
where
|
||||
P: TransactionPool,
|
||||
{
|
||||
@@ -497,11 +523,12 @@ where
|
||||
let (tx, rx) = unbounded_channel();
|
||||
network.set_transactions(tx);
|
||||
|
||||
let transactions_manager = TransactionsManager::new(
|
||||
let transactions_manager = TransactionsManager::with_policy(
|
||||
network.handle().clone(),
|
||||
pool.clone(),
|
||||
rx,
|
||||
config, // Use provided config
|
||||
config,
|
||||
policy,
|
||||
);
|
||||
|
||||
Peer {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use super::{
|
||||
DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
|
||||
PeerMetadata, DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
|
||||
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
|
||||
};
|
||||
@@ -7,7 +9,8 @@ use crate::transactions::constants::tx_fetcher::{
|
||||
DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
|
||||
DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
|
||||
};
|
||||
use derive_more::Constructor;
|
||||
use derive_more::{Constructor, Display};
|
||||
use reth_eth_wire::NetworkPrimitives;
|
||||
|
||||
/// Configuration for managing transactions within the network.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -94,3 +97,55 @@ impl Default for TransactionFetcherConfig {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A policy defining which peers pending transactions are gossiped to.
|
||||
pub trait TransactionPropagationPolicy: Send + Sync + Unpin + 'static {
|
||||
/// Filter a given peer based on the policy.
|
||||
///
|
||||
/// This determines whether transactions can be propagated to this peer.
|
||||
fn can_propagate<N: NetworkPrimitives>(&self, peer: &mut PeerMetadata<N>) -> bool;
|
||||
|
||||
/// A callback on the policy when a new peer session is established.
|
||||
fn on_session_established<N: NetworkPrimitives>(&mut self, peer: &mut PeerMetadata<N>);
|
||||
|
||||
/// A callback on the policy when a peer session is closed.
|
||||
fn on_session_closed<N: NetworkPrimitives>(&mut self, peer: &mut PeerMetadata<N>);
|
||||
}
|
||||
|
||||
/// Determines which peers pending transactions are propagated to.
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub enum TransactionPropagationKind {
|
||||
/// Propagate transactions to all peers.
|
||||
///
|
||||
/// No restructions
|
||||
#[default]
|
||||
All,
|
||||
/// Propagate transactions to only trusted peers.
|
||||
Trusted,
|
||||
}
|
||||
|
||||
impl TransactionPropagationPolicy for TransactionPropagationKind {
|
||||
fn can_propagate<N: NetworkPrimitives>(&self, peer: &mut PeerMetadata<N>) -> bool {
|
||||
match self {
|
||||
Self::All => true,
|
||||
Self::Trusted => peer.peer_kind.is_trusted(),
|
||||
}
|
||||
}
|
||||
|
||||
fn on_session_established<N: NetworkPrimitives>(&mut self, _peer: &mut PeerMetadata<N>) {}
|
||||
|
||||
fn on_session_closed<N: NetworkPrimitives>(&mut self, _peer: &mut PeerMetadata<N>) {}
|
||||
}
|
||||
|
||||
impl FromStr for TransactionPropagationKind {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"All" | "all" => Ok(Self::All),
|
||||
"Trusted" | "trusted" => Ok(Self::Trusted),
|
||||
_ => Err(format!("Invalid transaction propagation policy: {}", s)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,11 @@ pub use self::constants::{
|
||||
tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
|
||||
};
|
||||
pub use config::{TransactionFetcherConfig, TransactionPropagationMode, TransactionsManagerConfig};
|
||||
use config::TransactionPropagationKind;
|
||||
pub use config::{
|
||||
TransactionFetcherConfig, TransactionPropagationMode, TransactionPropagationPolicy,
|
||||
TransactionsManagerConfig,
|
||||
};
|
||||
pub use validation::*;
|
||||
|
||||
pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
|
||||
@@ -42,7 +46,7 @@ use reth_ethereum_primitives::TransactionSigned;
|
||||
use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
|
||||
use reth_network_api::{
|
||||
events::{PeerEvent, SessionInfo},
|
||||
NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers,
|
||||
NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
|
||||
};
|
||||
use reth_network_p2p::{
|
||||
error::{RequestError, RequestResult},
|
||||
@@ -234,7 +238,11 @@ impl<N: NetworkPrimitives> TransactionsHandle<N> {
|
||||
/// propagate new transactions over the network.
|
||||
#[derive(Debug)]
|
||||
#[must_use = "Manager does nothing unless polled."]
|
||||
pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
pub struct TransactionsManager<
|
||||
Pool,
|
||||
N: NetworkPrimitives = EthNetworkPrimitives,
|
||||
P: TransactionPropagationPolicy = TransactionPropagationKind,
|
||||
> {
|
||||
/// Access to the transaction pool.
|
||||
pool: Pool,
|
||||
/// Network access.
|
||||
@@ -290,6 +298,8 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
|
||||
transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
|
||||
/// How the `TransactionsManager` is configured.
|
||||
config: TransactionsManagerConfig,
|
||||
/// The policy to use when propagating transactions.
|
||||
propagation_policy: P,
|
||||
/// `TransactionsManager` metrics
|
||||
metrics: TransactionsManagerMetrics,
|
||||
}
|
||||
@@ -303,6 +313,29 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
|
||||
pool: Pool,
|
||||
from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
|
||||
transactions_manager_config: TransactionsManagerConfig,
|
||||
) -> Self {
|
||||
Self::with_policy(
|
||||
network,
|
||||
pool,
|
||||
from_network,
|
||||
transactions_manager_config,
|
||||
TransactionPropagationKind::default(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Pool: TransactionPool, N: NetworkPrimitives, P: TransactionPropagationPolicy>
|
||||
TransactionsManager<Pool, N, P>
|
||||
{
|
||||
/// Sets up a new instance with given the settings.
|
||||
///
|
||||
/// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
|
||||
pub fn with_policy(
|
||||
network: NetworkHandle<N>,
|
||||
pool: Pool,
|
||||
from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
|
||||
transactions_manager_config: TransactionsManagerConfig,
|
||||
propagation_policy: P,
|
||||
) -> Self {
|
||||
let network_events = network.event_listener();
|
||||
|
||||
@@ -341,6 +374,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
|
||||
NETWORK_POOL_TRANSACTIONS_SCOPE,
|
||||
),
|
||||
config: transactions_manager_config,
|
||||
propagation_policy,
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
@@ -471,10 +505,11 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Pool, N> TransactionsManager<Pool, N>
|
||||
impl<Pool, N, Policy> TransactionsManager<Pool, N, Policy>
|
||||
where
|
||||
Pool: TransactionPool,
|
||||
N: NetworkPrimitives,
|
||||
Policy: TransactionPropagationPolicy,
|
||||
{
|
||||
/// Processes a batch import results.
|
||||
fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
|
||||
@@ -697,7 +732,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Pool, N> TransactionsManager<Pool, N>
|
||||
impl<Pool, N, Policy> TransactionsManager<Pool, N, Policy>
|
||||
where
|
||||
Pool: TransactionPool + 'static,
|
||||
N: NetworkPrimitives<
|
||||
@@ -706,6 +741,7 @@ where
|
||||
>,
|
||||
Pool::Transaction:
|
||||
PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
|
||||
Policy: TransactionPropagationPolicy,
|
||||
{
|
||||
/// Invoked when transactions in the local mempool are considered __pending__.
|
||||
///
|
||||
@@ -891,6 +927,10 @@ where
|
||||
|
||||
// Note: Assuming ~random~ order due to random state of the peers map hasher
|
||||
for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
|
||||
if !self.propagation_policy.can_propagate(peer) {
|
||||
// skip peers we should not propagate to
|
||||
continue
|
||||
}
|
||||
// determine whether to send full tx objects or hashes.
|
||||
let mut builder = if peer_idx > max_num_full {
|
||||
PropagateTransactionsBuilder::pooled(peer.version)
|
||||
@@ -1051,6 +1091,8 @@ where
|
||||
}
|
||||
|
||||
/// Handles session establishment and peer transactions initialization.
|
||||
///
|
||||
/// This is invoked when a new session is established.
|
||||
fn handle_peer_session(
|
||||
&mut self,
|
||||
info: SessionInfo,
|
||||
@@ -1064,6 +1106,7 @@ where
|
||||
version,
|
||||
client_version,
|
||||
self.config.max_transactions_seen_by_peer_history,
|
||||
info.peer_kind,
|
||||
);
|
||||
let peer = match self.peers.entry(peer_id) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
@@ -1073,6 +1116,8 @@ where
|
||||
Entry::Vacant(entry) => entry.insert(peer),
|
||||
};
|
||||
|
||||
self.propagation_policy.on_session_established(peer);
|
||||
|
||||
// Send a `NewPooledTransactionHashes` to the peer with up to
|
||||
// `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
|
||||
// transactions in the pool.
|
||||
@@ -1107,7 +1152,11 @@ where
|
||||
match event_result {
|
||||
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
|
||||
// remove the peer
|
||||
self.peers.remove(&peer_id);
|
||||
|
||||
let peer = self.peers.remove(&peer_id);
|
||||
if let Some(mut peer) = peer {
|
||||
self.propagation_policy.on_session_closed(&mut peer);
|
||||
}
|
||||
self.transaction_fetcher.remove_peer(&peer_id);
|
||||
}
|
||||
NetworkEvent::ActivePeerSession { info, messages } => {
|
||||
@@ -1332,7 +1381,7 @@ where
|
||||
//
|
||||
// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
|
||||
// `NetworkConfig::start_network`(reth_network::NetworkConfig)
|
||||
impl<Pool, N> Future for TransactionsManager<Pool, N>
|
||||
impl<Pool, N, Policy> Future for TransactionsManager<Pool, N, Policy>
|
||||
where
|
||||
Pool: TransactionPool + Unpin + 'static,
|
||||
N: NetworkPrimitives<
|
||||
@@ -1341,6 +1390,7 @@ where
|
||||
>,
|
||||
Pool::Transaction:
|
||||
PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
|
||||
Policy: TransactionPropagationPolicy,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
@@ -1790,6 +1840,8 @@ pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
version: EthVersion,
|
||||
/// The peer's client version.
|
||||
client_version: Arc<str>,
|
||||
/// The kind of peer.
|
||||
peer_kind: PeerKind,
|
||||
}
|
||||
|
||||
impl<N: NetworkPrimitives> PeerMetadata<N> {
|
||||
@@ -1799,12 +1851,14 @@ impl<N: NetworkPrimitives> PeerMetadata<N> {
|
||||
version: EthVersion,
|
||||
client_version: Arc<str>,
|
||||
max_transactions_seen_by_peer: u32,
|
||||
peer_kind: PeerKind,
|
||||
) -> Self {
|
||||
Self {
|
||||
seen_transactions: LruCache::new(max_transactions_seen_by_peer),
|
||||
request_tx,
|
||||
version,
|
||||
client_version,
|
||||
peer_kind,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1979,6 +2033,7 @@ mod tests {
|
||||
version,
|
||||
Arc::from(""),
|
||||
DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
|
||||
PeerKind::Trusted,
|
||||
),
|
||||
to_mock_session_rx,
|
||||
)
|
||||
@@ -2208,7 +2263,7 @@ mod tests {
|
||||
|
||||
// return the transactions corresponding to the transaction hashes.
|
||||
response
|
||||
.send(Ok(reth_eth_wire::PooledTransactions(message)))
|
||||
.send(Ok(PooledTransactions(message)))
|
||||
.expect("should send peer_1 response to tx manager");
|
||||
|
||||
// adance the transaction manager future
|
||||
@@ -2492,7 +2547,7 @@ mod tests {
|
||||
.collect();
|
||||
// response partial request
|
||||
response
|
||||
.send(Ok(reth_eth_wire::PooledTransactions(message)))
|
||||
.send(Ok(PooledTransactions(message)))
|
||||
.expect("should send peer_1 response to tx manager");
|
||||
let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
|
||||
unreachable!()
|
||||
|
||||
@@ -1,16 +1,19 @@
|
||||
//! Testing gossiping of transactions.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use alloy_consensus::TxLegacy;
|
||||
use alloy_primitives::{Signature, U256};
|
||||
use futures::StreamExt;
|
||||
use reth_ethereum_primitives::TransactionSigned;
|
||||
use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEventListenerProvider};
|
||||
use reth_network_api::{events::PeerEvent, PeersInfo};
|
||||
use reth_network::{
|
||||
test_utils::{NetworkEventStream, Testnet},
|
||||
transactions::config::TransactionPropagationKind,
|
||||
NetworkEvent, NetworkEventListenerProvider, Peers,
|
||||
};
|
||||
use reth_network_api::{events::PeerEvent, PeerKind, PeersInfo};
|
||||
use reth_primitives_traits::SignedTransaction;
|
||||
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
|
||||
use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool};
|
||||
use std::sync::Arc;
|
||||
use tokio::join;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_tx_gossip() {
|
||||
@@ -50,6 +53,73 @@ async fn test_tx_gossip() {
|
||||
assert_eq!(received, hash);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_tx_propagation_policy_trusted_only() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let provider = MockEthProvider::default();
|
||||
|
||||
let policy = TransactionPropagationKind::Trusted;
|
||||
let net = Testnet::create_with(2, provider.clone()).await;
|
||||
let net = net.with_eth_pool_config_and_policy(Default::default(), policy);
|
||||
|
||||
let handle = net.spawn();
|
||||
|
||||
// connect all the peers
|
||||
handle.connect_peers().await;
|
||||
|
||||
let peer_0_handle = &handle.peers()[0];
|
||||
let peer_1_handle = &handle.peers()[1];
|
||||
|
||||
let mut peer0_tx_listener = peer_0_handle.pool().unwrap().pending_transactions_listener();
|
||||
let mut peer1_tx_listener = peer_1_handle.pool().unwrap().pending_transactions_listener();
|
||||
|
||||
let mut gen = TransactionGenerator::new(rand::rng());
|
||||
let tx = gen.gen_eip1559_pooled();
|
||||
|
||||
// ensure the sender has balance
|
||||
let sender = tx.sender();
|
||||
provider.add_account(sender, ExtendedAccount::new(0, U256::from(100_000_000)));
|
||||
|
||||
// insert the tx in peer0's pool
|
||||
let hash_0 = peer_0_handle.pool().unwrap().add_external_transaction(tx).await.unwrap();
|
||||
let inserted = peer0_tx_listener.recv().await.unwrap();
|
||||
|
||||
assert_eq!(inserted, hash_0);
|
||||
|
||||
// ensure tx is not gossiped to peer1
|
||||
peer1_tx_listener.try_recv().expect_err("Empty");
|
||||
|
||||
let mut event_stream_0 = NetworkEventStream::new(peer_0_handle.network().event_listener());
|
||||
let mut event_stream_1 = NetworkEventStream::new(peer_1_handle.network().event_listener());
|
||||
|
||||
// disconnect peer1 from peer0
|
||||
peer_0_handle.network().remove_peer(*peer_1_handle.peer_id(), PeerKind::Static);
|
||||
join!(event_stream_0.next_session_closed(), event_stream_1.next_session_closed());
|
||||
|
||||
// re register peer1 as trusted
|
||||
peer_0_handle.network().add_trusted_peer(*peer_1_handle.peer_id(), peer_1_handle.local_addr());
|
||||
join!(event_stream_0.next_session_established(), event_stream_1.next_session_established());
|
||||
|
||||
let mut gen = TransactionGenerator::new(rand::rng());
|
||||
let tx = gen.gen_eip1559_pooled();
|
||||
|
||||
// ensure the sender has balance
|
||||
let sender = tx.sender();
|
||||
provider.add_account(sender, ExtendedAccount::new(0, U256::from(100_000_000)));
|
||||
|
||||
// insert pending tx in peer0's pool
|
||||
let hash_1 = peer_0_handle.pool().unwrap().add_external_transaction(tx).await.unwrap();
|
||||
let inserted = peer0_tx_listener.recv().await.unwrap();
|
||||
assert_eq!(inserted, hash_1);
|
||||
|
||||
// ensure peer1 now receives the pending txs from peer0
|
||||
let mut buff = Vec::with_capacity(2);
|
||||
peer1_tx_listener.recv_many(&mut buff, 2).await;
|
||||
|
||||
assert!(buff.contains(&hash_1));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_4844_tx_gossip_penalization() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
@@ -16,8 +16,9 @@ use reth_cli_util::get_secret_key;
|
||||
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
|
||||
use reth_exex::ExExContext;
|
||||
use reth_network::{
|
||||
transactions::TransactionsManagerConfig, NetworkBuilder, NetworkConfig, NetworkConfigBuilder,
|
||||
NetworkHandle, NetworkManager, NetworkPrimitives,
|
||||
transactions::{TransactionPropagationPolicy, TransactionsManagerConfig},
|
||||
NetworkBuilder, NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
|
||||
NetworkPrimitives,
|
||||
};
|
||||
use reth_node_api::{
|
||||
FullNodePrimitives, FullNodeTypes, FullNodeTypesAdapter, NodeAddOns, NodeTypes,
|
||||
@@ -677,20 +678,26 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
+ 'static,
|
||||
Node::Provider: BlockReaderFor<N>,
|
||||
{
|
||||
self.start_network_with(builder, pool, Default::default())
|
||||
self.start_network_with(
|
||||
builder,
|
||||
pool,
|
||||
self.config().network.transactions_manager_config(),
|
||||
self.config().network.tx_propagation_policy,
|
||||
)
|
||||
}
|
||||
|
||||
/// Convenience function to start the network tasks.
|
||||
///
|
||||
/// Accepts the config for the transaction task.
|
||||
/// Accepts the config for the transaction task and the policy for propagation.
|
||||
///
|
||||
/// Spawns the configured network and associated tasks and returns the [`NetworkHandle`]
|
||||
/// connected to that network.
|
||||
pub fn start_network_with<Pool, N>(
|
||||
pub fn start_network_with<Pool, N, Policy>(
|
||||
&self,
|
||||
builder: NetworkBuilder<(), (), N>,
|
||||
pool: Pool,
|
||||
tx_config: TransactionsManagerConfig,
|
||||
propagation_policy: Policy,
|
||||
) -> NetworkHandle<N>
|
||||
where
|
||||
N: NetworkPrimitives,
|
||||
@@ -702,9 +709,10 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
> + Unpin
|
||||
+ 'static,
|
||||
Node::Provider: BlockReaderFor<N>,
|
||||
Policy: TransactionPropagationPolicy,
|
||||
{
|
||||
let (handle, network, txpool, eth) = builder
|
||||
.transactions(pool, tx_config)
|
||||
.transactions_with_policy(pool, tx_config, propagation_policy)
|
||||
.request_handler(self.provider().clone())
|
||||
.split_with_handle();
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use reth_discv5::{
|
||||
use reth_net_nat::{NatResolver, DEFAULT_NET_IF_NAME};
|
||||
use reth_network::{
|
||||
transactions::{
|
||||
config::TransactionPropagationKind,
|
||||
constants::{
|
||||
tx_fetcher::{
|
||||
DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
|
||||
@@ -154,6 +155,12 @@ pub struct NetworkArgs {
|
||||
/// If flag is set, but no value is passed, the default interface for docker `eth0` is tried.
|
||||
#[arg(long = "net-if.experimental", conflicts_with = "addr", value_name = "IF_NAME")]
|
||||
pub net_if: Option<String>,
|
||||
|
||||
/// Transaction Propagation Policy
|
||||
///
|
||||
/// The policy determines which peers transactions are gossiped to.
|
||||
#[arg(long = "tx-propagation-policy", default_value_t = TransactionPropagationKind::All)]
|
||||
pub tx_propagation_policy: TransactionPropagationKind,
|
||||
}
|
||||
|
||||
impl NetworkArgs {
|
||||
@@ -335,6 +342,7 @@ impl Default for NetworkArgs {
|
||||
max_seen_tx_history: DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
|
||||
max_capacity_cache_txns_pending_fetch: DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
|
||||
net_if: None,
|
||||
tx_propagation_policy: TransactionPropagationKind::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user