mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
test: TransactionGenerator and p2p testing support (#5023)
This commit is contained in:
@@ -1,16 +1,24 @@
|
||||
//! A network implementation for testing purposes.
|
||||
|
||||
use crate::{
|
||||
builder::ETH_REQUEST_CHANNEL_CAPACITY, error::NetworkError, eth_requests::EthRequestHandler,
|
||||
transactions::TransactionsManager, NetworkConfig, NetworkConfigBuilder, NetworkEvent,
|
||||
NetworkHandle, NetworkManager,
|
||||
builder::ETH_REQUEST_CHANNEL_CAPACITY,
|
||||
error::NetworkError,
|
||||
eth_requests::EthRequestHandler,
|
||||
transactions::{TransactionsHandle, TransactionsManager},
|
||||
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkHandle, NetworkManager,
|
||||
};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use pin_project::pin_project;
|
||||
use reth_eth_wire::{capability::Capability, DisconnectReason, HelloBuilder};
|
||||
use reth_primitives::PeerId;
|
||||
use reth_provider::{test_utils::NoopProvider, BlockReader, HeaderProvider};
|
||||
use reth_transaction_pool::test_utils::TestPool;
|
||||
use reth_network_api::{NetworkInfo, Peers};
|
||||
use reth_primitives::{PeerId, MAINNET};
|
||||
use reth_provider::{test_utils::NoopProvider, BlockReader, HeaderProvider, StateProviderFactory};
|
||||
use reth_tasks::TokioTaskExecutor;
|
||||
use reth_transaction_pool::{
|
||||
blobstore::InMemoryBlobStore,
|
||||
test_utils::{testing_pool, TestPool},
|
||||
EthTransactionPool, TransactionPool, TransactionValidationTaskExecutor,
|
||||
};
|
||||
use secp256k1::SecretKey;
|
||||
use std::{
|
||||
fmt,
|
||||
@@ -29,15 +37,14 @@ use tokio::{
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
/// A test network consisting of multiple peers.
|
||||
#[derive(Default)]
|
||||
pub struct Testnet<C> {
|
||||
pub struct Testnet<C, Pool> {
|
||||
/// All running peers in the network.
|
||||
peers: Vec<Peer<C>>,
|
||||
peers: Vec<Peer<C, Pool>>,
|
||||
}
|
||||
|
||||
// === impl Testnet ===
|
||||
|
||||
impl<C> Testnet<C>
|
||||
impl<C> Testnet<C, TestPool>
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Clone,
|
||||
{
|
||||
@@ -56,26 +63,6 @@ where
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
/// Return a mutable slice of all peers.
|
||||
pub fn peers_mut(&mut self) -> &mut [Peer<C>] {
|
||||
&mut self.peers
|
||||
}
|
||||
|
||||
/// Return a slice of all peers.
|
||||
pub fn peers(&self) -> &[Peer<C>] {
|
||||
&self.peers
|
||||
}
|
||||
|
||||
/// Return a mutable iterator over all peers.
|
||||
pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C>> + '_ {
|
||||
self.peers.iter_mut()
|
||||
}
|
||||
|
||||
/// Return an iterator over all peers.
|
||||
pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C>> + '_ {
|
||||
self.peers.iter()
|
||||
}
|
||||
|
||||
/// Extend the list of peers with new peers that are configured with each of the given
|
||||
/// [`PeerConfig`]s.
|
||||
pub async fn extend_peer_with_config(
|
||||
@@ -89,6 +76,32 @@ where
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, Pool> Testnet<C, Pool>
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Clone,
|
||||
Pool: TransactionPool,
|
||||
{
|
||||
/// Return a mutable slice of all peers.
|
||||
pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
|
||||
&mut self.peers
|
||||
}
|
||||
|
||||
/// Return a slice of all peers.
|
||||
pub fn peers(&self) -> &[Peer<C, Pool>] {
|
||||
&self.peers
|
||||
}
|
||||
|
||||
/// Return a mutable iterator over all peers.
|
||||
pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
|
||||
self.peers.iter_mut()
|
||||
}
|
||||
|
||||
/// Return an iterator over all peers.
|
||||
pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
|
||||
self.peers.iter()
|
||||
}
|
||||
|
||||
/// Add a peer to the [`Testnet`] with the given [`PeerConfig`].
|
||||
pub async fn add_peer_with_config(
|
||||
@@ -98,8 +111,14 @@ where
|
||||
let PeerConfig { config, client, secret_key } = config;
|
||||
|
||||
let network = NetworkManager::new(config).await?;
|
||||
let peer =
|
||||
Peer { network, client, secret_key, request_handler: None, transactions_manager: None };
|
||||
let peer = Peer {
|
||||
network,
|
||||
client,
|
||||
secret_key,
|
||||
request_handler: None,
|
||||
transactions_manager: None,
|
||||
pool: None,
|
||||
};
|
||||
self.peers.push(peer);
|
||||
Ok(())
|
||||
}
|
||||
@@ -109,10 +128,19 @@ where
|
||||
self.peers.iter().map(|p| p.handle())
|
||||
}
|
||||
|
||||
/// Maps the pool of each peer with the given closure
|
||||
pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
|
||||
where
|
||||
F: Fn(Peer<C, Pool>) -> Peer<C, P>,
|
||||
P: TransactionPool,
|
||||
{
|
||||
Testnet { peers: self.peers.into_iter().map(f).collect() }
|
||||
}
|
||||
|
||||
/// Apply a closure on each peer
|
||||
pub fn for_each<F>(&self, f: F)
|
||||
where
|
||||
F: Fn(&Peer<C>),
|
||||
F: Fn(&Peer<C, Pool>),
|
||||
{
|
||||
self.peers.iter().for_each(f)
|
||||
}
|
||||
@@ -120,19 +148,45 @@ where
|
||||
/// Apply a closure on each peer
|
||||
pub fn for_each_mut<F>(&mut self, f: F)
|
||||
where
|
||||
F: FnMut(&mut Peer<C>),
|
||||
F: FnMut(&mut Peer<C, Pool>),
|
||||
{
|
||||
self.peers.iter_mut().for_each(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Testnet<C>
|
||||
impl<C, Pool> Testnet<C, Pool>
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Unpin + 'static,
|
||||
C: StateProviderFactory + BlockReader + HeaderProvider + Clone + 'static,
|
||||
Pool: TransactionPool,
|
||||
{
|
||||
/// Installs an eth pool on each peer
|
||||
pub fn with_eth_pool(self) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
|
||||
self.map_pool(|peer| {
|
||||
let blob_store = InMemoryBlobStore::default();
|
||||
let pool = TransactionValidationTaskExecutor::eth(
|
||||
peer.client.clone(),
|
||||
MAINNET.clone(),
|
||||
blob_store.clone(),
|
||||
TokioTaskExecutor::default(),
|
||||
);
|
||||
peer.map_transactions_manager(EthTransactionPool::eth_pool(
|
||||
pool,
|
||||
blob_store,
|
||||
Default::default(),
|
||||
))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, Pool> Testnet<C, Pool>
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Clone + Unpin + 'static,
|
||||
Pool: TransactionPool + Unpin + 'static,
|
||||
{
|
||||
/// Spawns the testnet to a separate task
|
||||
pub fn spawn(self) -> TestnetHandle<C> {
|
||||
pub fn spawn(self) -> TestnetHandle<C, Pool> {
|
||||
let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
|
||||
let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
|
||||
let mut net = self;
|
||||
let handle = tokio::task::spawn(async move {
|
||||
let mut tx = None;
|
||||
@@ -147,11 +201,11 @@ where
|
||||
}
|
||||
});
|
||||
|
||||
TestnetHandle { _handle: handle, terminate: tx }
|
||||
TestnetHandle { _handle: handle, peers, terminate: tx }
|
||||
}
|
||||
}
|
||||
|
||||
impl Testnet<NoopProvider> {
|
||||
impl Testnet<NoopProvider, TestPool> {
|
||||
/// Same as [`Self::try_create`] but panics on error
|
||||
pub async fn create(num_peers: usize) -> Self {
|
||||
Self::try_create(num_peers).await.unwrap()
|
||||
@@ -171,15 +225,22 @@ impl Testnet<NoopProvider> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> fmt::Debug for Testnet<C> {
|
||||
impl<C, Pool> Default for Testnet<C, Pool> {
|
||||
fn default() -> Self {
|
||||
Self { peers: Vec::new() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Testnet {{}}").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Future for Testnet<C>
|
||||
impl<C, Pool> Future for Testnet<C, Pool>
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Unpin,
|
||||
Pool: TransactionPool + Unpin + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
@@ -194,47 +255,81 @@ where
|
||||
|
||||
/// A handle to a [`Testnet`] that can be shared.
|
||||
#[derive(Debug)]
|
||||
pub struct TestnetHandle<C> {
|
||||
pub struct TestnetHandle<C, Pool> {
|
||||
_handle: JoinHandle<()>,
|
||||
terminate: oneshot::Sender<oneshot::Sender<Testnet<C>>>,
|
||||
peers: Vec<PeerHandle<Pool>>,
|
||||
terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
|
||||
}
|
||||
|
||||
// === impl TestnetHandle ===
|
||||
|
||||
impl<C> TestnetHandle<C> {
|
||||
impl<C, Pool> TestnetHandle<C, Pool> {
|
||||
/// Terminates the task and returns the [`Testnet`] back.
|
||||
pub async fn terminate(self) -> Testnet<C> {
|
||||
pub async fn terminate(self) -> Testnet<C, Pool> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.terminate.send(tx).unwrap();
|
||||
rx.await.unwrap()
|
||||
}
|
||||
|
||||
/// Returns the [`PeerHandle`]s of this [`Testnet`].
|
||||
pub fn peers(&self) -> &[PeerHandle<Pool>] {
|
||||
&self.peers
|
||||
}
|
||||
|
||||
/// Connects all peers with each other
|
||||
pub async fn connect_peers(&self) {
|
||||
if self.peers.len() < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
|
||||
let mut events = NetworkEventStream::new(handle.event_listener());
|
||||
for idx in (idx + 1)..self.peers.len() {
|
||||
let neighbour = &self.peers[idx];
|
||||
handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
|
||||
let connected = events.next_session_established().await.unwrap();
|
||||
assert_eq!(connected, *neighbour.peer_id());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A peer in the [`Testnet`].
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct Peer<C> {
|
||||
pub struct Peer<C, Pool = TestPool> {
|
||||
#[pin]
|
||||
network: NetworkManager<C>,
|
||||
#[pin]
|
||||
request_handler: Option<EthRequestHandler<C>>,
|
||||
#[pin]
|
||||
transactions_manager: Option<TransactionsManager<TestPool>>,
|
||||
transactions_manager: Option<TransactionsManager<Pool>>,
|
||||
pool: Option<Pool>,
|
||||
client: C,
|
||||
secret_key: SecretKey,
|
||||
}
|
||||
|
||||
// === impl Peer ===
|
||||
|
||||
impl<C> Peer<C>
|
||||
impl<C, Pool> Peer<C, Pool>
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Clone,
|
||||
Pool: TransactionPool,
|
||||
{
|
||||
/// Returns the number of connected peers.
|
||||
pub fn num_peers(&self) -> usize {
|
||||
self.network.num_connected_peers()
|
||||
}
|
||||
|
||||
/// Returns a handle to the peer's network.
|
||||
pub fn peer_handle(&self) -> PeerHandle<Pool> {
|
||||
PeerHandle {
|
||||
network: self.network.handle().clone(),
|
||||
pool: self.pool.clone(),
|
||||
transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
|
||||
}
|
||||
}
|
||||
|
||||
/// The address that listens for incoming connections.
|
||||
pub fn local_addr(&self) -> SocketAddr {
|
||||
self.network.local_addr()
|
||||
@@ -245,6 +340,11 @@ where
|
||||
self.network.handle().clone()
|
||||
}
|
||||
|
||||
/// Returns the [`TestPool`] of this peer.
|
||||
pub fn pool(&self) -> Option<&Pool> {
|
||||
self.pool.as_ref()
|
||||
}
|
||||
|
||||
/// Set a new request handler that's connected to the peer's network
|
||||
pub fn install_request_handler(&mut self) {
|
||||
let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
|
||||
@@ -255,17 +355,49 @@ where
|
||||
}
|
||||
|
||||
/// Set a new transactions manager that's connected to the peer's network
|
||||
pub fn install_transactions_manager(&mut self, pool: TestPool) {
|
||||
pub fn install_transactions_manager(&mut self, pool: Pool) {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
self.network.set_transactions(tx);
|
||||
let transactions_manager = TransactionsManager::new(self.handle(), pool, rx);
|
||||
let transactions_manager = TransactionsManager::new(self.handle(), pool.clone(), rx);
|
||||
self.transactions_manager = Some(transactions_manager);
|
||||
self.pool = Some(pool);
|
||||
}
|
||||
|
||||
/// Set a new transactions manager that's connected to the peer's network
|
||||
pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
|
||||
where
|
||||
P: TransactionPool,
|
||||
{
|
||||
let Self { mut network, request_handler, client, secret_key, .. } = self;
|
||||
let (tx, rx) = unbounded_channel();
|
||||
network.set_transactions(tx);
|
||||
let transactions_manager =
|
||||
TransactionsManager::new(network.handle().clone(), pool.clone(), rx);
|
||||
Peer {
|
||||
network,
|
||||
request_handler,
|
||||
transactions_manager: Some(transactions_manager),
|
||||
pool: Some(pool),
|
||||
client,
|
||||
secret_key,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Future for Peer<C>
|
||||
impl<C> Peer<C>
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Clone,
|
||||
{
|
||||
/// Installs a new [testing_pool]
|
||||
pub fn install_test_pool(&mut self) {
|
||||
self.install_transactions_manager(testing_pool())
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, Pool> Future for Peer<C, Pool>
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Unpin,
|
||||
Pool: TransactionPool + Unpin + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
@@ -292,6 +424,47 @@ pub struct PeerConfig<C = NoopProvider> {
|
||||
secret_key: SecretKey,
|
||||
}
|
||||
|
||||
/// A handle to a peer in the [`Testnet`].
|
||||
#[derive(Debug)]
|
||||
pub struct PeerHandle<Pool> {
|
||||
network: NetworkHandle,
|
||||
transactions: Option<TransactionsHandle>,
|
||||
pool: Option<Pool>,
|
||||
}
|
||||
|
||||
// === impl PeerHandle ===
|
||||
|
||||
impl<Pool> PeerHandle<Pool> {
|
||||
/// Returns the [`PeerId`] used in the network.
|
||||
pub fn peer_id(&self) -> &PeerId {
|
||||
self.network.peer_id()
|
||||
}
|
||||
|
||||
pub fn local_addr(&self) -> SocketAddr {
|
||||
self.network.local_addr()
|
||||
}
|
||||
|
||||
/// Creates a new [`NetworkEvent`] listener channel.
|
||||
pub fn event_listener(&self) -> UnboundedReceiverStream<NetworkEvent> {
|
||||
self.network.event_listener()
|
||||
}
|
||||
|
||||
/// Returns the [`TransactionsHandle`] of this peer.
|
||||
pub fn transactions(&self) -> Option<&TransactionsHandle> {
|
||||
self.transactions.as_ref()
|
||||
}
|
||||
|
||||
/// Returns the [`TestPool`] of this peer.
|
||||
pub fn pool(&self) -> Option<&Pool> {
|
||||
self.pool.as_ref()
|
||||
}
|
||||
|
||||
/// Returns the [`NetworkHandle`] of this peer.
|
||||
pub fn network(&self) -> &NetworkHandle {
|
||||
&self.network
|
||||
}
|
||||
}
|
||||
|
||||
// === impl PeerConfig ===
|
||||
|
||||
impl<C> PeerConfig<C>
|
||||
@@ -302,8 +475,14 @@ where
|
||||
pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
|
||||
let PeerConfig { config, client, secret_key } = self;
|
||||
let network = NetworkManager::new(config).await?;
|
||||
let peer =
|
||||
Peer { network, client, secret_key, request_handler: None, transactions_manager: None };
|
||||
let peer = Peer {
|
||||
network,
|
||||
client,
|
||||
secret_key,
|
||||
request_handler: None,
|
||||
transactions_manager: None,
|
||||
pool: None,
|
||||
};
|
||||
Ok(peer)
|
||||
}
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ const GET_POOLED_TRANSACTION_SOFT_LIMIT_SIZE: GetPooledTransactionLimit =
|
||||
pub type PoolImportFuture = Pin<Box<dyn Future<Output = PoolResult<TxHash>> + Send + 'static>>;
|
||||
|
||||
/// Api to interact with [`TransactionsManager`] task.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TransactionsHandle {
|
||||
/// Command channel to the [`TransactionsManager`]
|
||||
manager_tx: mpsc::UnboundedSender<TransactionsCommand>,
|
||||
@@ -212,13 +212,18 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
|
||||
|
||||
impl<Pool> TransactionsManager<Pool>
|
||||
where
|
||||
Pool: TransactionPool + 'static,
|
||||
Pool: TransactionPool,
|
||||
{
|
||||
/// Returns a new handle that can send commands to this type.
|
||||
pub fn handle(&self) -> TransactionsHandle {
|
||||
TransactionsHandle { manager_tx: self.command_tx.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Pool> TransactionsManager<Pool>
|
||||
where
|
||||
Pool: TransactionPool + 'static,
|
||||
{
|
||||
#[inline]
|
||||
fn update_import_metrics(&self) {
|
||||
self.metrics.pending_pool_imports.set(self.pool_imports.len() as f64);
|
||||
|
||||
Reference in New Issue
Block a user