test: benchmark for fetch pending hashes (#15574)

This commit is contained in:
int88
2025-04-14 15:59:03 +08:00
committed by GitHub
parent 9f6082982f
commit df6d5dd1dd
5 changed files with 171 additions and 85 deletions

View File

@@ -2,9 +2,11 @@
mod init;
mod testnet;
pub mod transactions;
pub use init::{
enr_to_peer_id, unused_port, unused_tcp_addr, unused_tcp_and_udp_port, unused_tcp_udp,
unused_udp_addr, unused_udp_port, GETH_TIMEOUT,
};
pub use testnet::{NetworkEventStream, Peer, PeerConfig, PeerHandle, Testnet, TestnetHandle};
pub use transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager};

View File

@@ -0,0 +1,101 @@
//! Test helper impls for transactions
#![allow(dead_code)]
use crate::{
cache::LruCache,
transactions::{
constants::{
tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS,
tx_manager::DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
},
fetcher::{TransactionFetcher, TxFetchMetadata},
PeerMetadata, TransactionsManager,
},
NetworkConfigBuilder, NetworkManager,
};
use alloy_primitives::TxHash;
use reth_eth_wire::EthVersion;
use reth_eth_wire_types::EthNetworkPrimitives;
use reth_network_api::{PeerKind, PeerRequest, PeerRequestSender};
use reth_network_peers::PeerId;
use reth_storage_api::noop::NoopProvider;
use reth_transaction_pool::test_utils::{testing_pool, TestPool};
use secp256k1::SecretKey;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::trace;
/// A new tx manager for testing.
pub async fn new_tx_manager(
) -> (TransactionsManager<TestPool, EthNetworkPrimitives>, NetworkManager<EthNetworkPrimitives>) {
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
let client = NoopProvider::default();
let config = NetworkConfigBuilder::new(secret_key)
// let OS choose port
.listener_port(0)
.disable_discovery()
.build(client);
let pool = testing_pool();
let transactions_manager_config = config.transactions_manager_config.clone();
let (_network_handle, network, transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
(transactions, network)
}
/// Directly buffer hahs into tx fetcher for testing.
pub fn buffer_hash_to_tx_fetcher(
tx_fetcher: &mut TransactionFetcher,
hash: TxHash,
peer_id: PeerId,
retries: u8,
tx_encoded_length: Option<usize>,
) {
match tx_fetcher.hashes_fetch_inflight_and_pending_fetch.get_or_insert(hash, || {
TxFetchMetadata::new(
retries,
LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32),
tx_encoded_length,
)
}) {
Some(metadata) => {
metadata.fallback_peers_mut().insert(peer_id);
}
None => {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
%hash,
"failed to insert hash from peer in schnellru::LruMap, dropping hash"
)
}
}
tx_fetcher.hashes_pending_fetch.insert(hash);
}
/// Mock a new session, returns (peer, channel-to-send-get-pooled-tx-response-on).
pub fn new_mock_session(
peer_id: PeerId,
version: EthVersion,
) -> (PeerMetadata<EthNetworkPrimitives>, mpsc::Receiver<PeerRequest>) {
let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);
(
PeerMetadata::new(
PeerRequestSender::new(peer_id, to_mock_session_tx),
version,
Arc::from(""),
DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
PeerKind::Trusted,
),
to_mock_session_rx,
)
}

View File

@@ -84,7 +84,7 @@ pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
/// which a [`GetPooledTransactions`] request is inflight.
pub hashes_pending_fetch: LruCache<TxHash>,
/// Tracks all hashes in the transaction fetcher.
pub(super) hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
pub hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
/// Filter for valid announcement and response data.
pub(super) filter_valid_message: MessageFilter,
/// Info on capacity of the transaction fetcher.
@@ -1301,7 +1301,7 @@ struct TxFetcherSearchDurations {
#[cfg(test)]
mod test {
use super::*;
use crate::transactions::tests::{buffer_hash_to_tx_fetcher, new_mock_session};
use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
use alloy_primitives::{hex, B256};
use alloy_rlp::Decodable;
use derive_more::IntoIterator;

View File

@@ -1846,7 +1846,7 @@ pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
impl<N: NetworkPrimitives> PeerMetadata<N> {
/// Returns a new instance of [`PeerMetadata`].
fn new(
pub fn new(
request_tx: PeerRequestSender<PeerRequest<N>>,
version: EthVersion,
client_version: Arc<str>,
@@ -1867,6 +1867,11 @@ impl<N: NetworkPrimitives> PeerMetadata<N> {
&self.request_tx
}
/// Return a
pub fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
&mut self.seen_transactions
}
/// Returns the negotiated `EthVersion` of the session.
pub const fn version(&self) -> EthVersion {
self.version
@@ -1983,11 +1988,16 @@ struct TxManagerPollDurations {
#[cfg(test)]
mod tests {
use super::*;
use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager};
use crate::{
test_utils::{
transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
Testnet,
},
NetworkConfigBuilder, NetworkManager,
};
use alloy_consensus::{transaction::PooledTransaction, TxEip1559, TxLegacy};
use alloy_primitives::{hex, Signature, TxKind, U256};
use alloy_rlp::Decodable;
use constants::tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS;
use futures::FutureExt;
use reth_chainspec::MIN_TRANSACTION_GAS;
use reth_ethereum_primitives::{Transaction, TransactionSigned};
@@ -1998,7 +2008,7 @@ mod tests {
};
use reth_storage_api::noop::NoopProvider;
use reth_transaction_pool::test_utils::{
testing_pool, MockTransaction, MockTransactionFactory, TestPool,
testing_pool, MockTransaction, MockTransactionFactory,
};
use secp256k1::SecretKey;
use std::{
@@ -2006,82 +2016,8 @@ mod tests {
net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr,
};
use tests::fetcher::TxFetchMetadata;
use tracing::error;
async fn new_tx_manager(
) -> (TransactionsManager<TestPool, EthNetworkPrimitives>, NetworkManager<EthNetworkPrimitives>)
{
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
let client = NoopProvider::default();
let config = NetworkConfigBuilder::new(secret_key)
// let OS choose port
.listener_port(0)
.disable_discovery()
.build(client);
let pool = testing_pool();
let transactions_manager_config = config.transactions_manager_config.clone();
let (_network_handle, network, transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
(transactions, network)
}
pub(super) fn buffer_hash_to_tx_fetcher(
tx_fetcher: &mut TransactionFetcher,
hash: TxHash,
peer_id: PeerId,
retries: u8,
tx_encoded_length: Option<usize>,
) {
match tx_fetcher.hashes_fetch_inflight_and_pending_fetch.get_or_insert(hash, || {
TxFetchMetadata::new(
retries,
LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32),
tx_encoded_length,
)
}) {
Some(metadata) => {
metadata.fallback_peers_mut().insert(peer_id);
}
None => {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
%hash,
"failed to insert hash from peer in schnellru::LruMap, dropping hash"
)
}
}
tx_fetcher.hashes_pending_fetch.insert(hash);
}
// Returns (peer, channel-to-send-get-pooled-tx-response-on).
pub(super) fn new_mock_session(
peer_id: PeerId,
version: EthVersion,
) -> (PeerMetadata<EthNetworkPrimitives>, mpsc::Receiver<PeerRequest>) {
let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);
(
PeerMetadata::new(
PeerRequestSender::new(peer_id, to_mock_session_tx),
version,
Arc::from(""),
DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
PeerKind::Trusted,
),
to_mock_session_rx,
)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_ignored_tx_broadcasts_while_initially_syncing() {
reth_tracing::init_test_tracing();