mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
test: fix connect many race condition (#6653)
This commit is contained in:
@@ -289,28 +289,34 @@ impl<C, Pool> TestnetHandle<C, Pool> {
|
||||
&self.peers
|
||||
}
|
||||
|
||||
/// Connects all peers with each other
|
||||
/// Connects all peers with each other.
|
||||
///
|
||||
/// This establishes sessions concurrently between all peers.
|
||||
///
|
||||
/// Returns once all sessions are established.
|
||||
pub async fn connect_peers(&self) {
|
||||
if self.peers.len() < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
let mut streams = Vec::with_capacity(self.peers.len());
|
||||
let mut num_sessions = Vec::with_capacity(self.peers.len());
|
||||
// add an event stream for _each_ peer
|
||||
let streams =
|
||||
self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
|
||||
|
||||
// add all peers to each other
|
||||
for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
|
||||
streams.push(NetworkEventStream::new(handle.event_listener()));
|
||||
let mut num = 0;
|
||||
for idx in (idx + 1)..self.peers.len() {
|
||||
let neighbour = &self.peers[idx];
|
||||
handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
|
||||
num += 1;
|
||||
}
|
||||
num_sessions.push(num);
|
||||
}
|
||||
let fut = streams
|
||||
.into_iter()
|
||||
.zip(num_sessions)
|
||||
.map(|(mut stream, num)| async move { stream.take_session_established(num).await });
|
||||
|
||||
// await all sessions to be established
|
||||
let num_sessions_per_peer = self.peers.len() - 1;
|
||||
let fut = streams.into_iter().map(|mut stream| async move {
|
||||
stream.take_session_established(num_sessions_per_peer).await
|
||||
});
|
||||
|
||||
futures::future::join_all(fut).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ use reth_transaction_pool::test_utils::testing_pool;
|
||||
use secp256k1::SecretKey;
|
||||
use std::{collections::HashSet, net::SocketAddr, time::Duration};
|
||||
use tokio::task;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_establish_connections() {
|
||||
reth_tracing::init_test_tracing();
|
||||
@@ -583,3 +584,21 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() {
|
||||
|
||||
net_handle.terminate().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_connect_many() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let net = Testnet::create_with(5, NoopProvider::default()).await;
|
||||
|
||||
// install request handlers
|
||||
let net = net.with_eth_pool();
|
||||
let handle = net.spawn();
|
||||
// connect all the peers
|
||||
handle.connect_peers().await;
|
||||
|
||||
// check that all the peers are connected
|
||||
for peer in handle.peers() {
|
||||
assert_eq!(peer.network().num_connected_peers(), 4);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user