diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index c254a47698..7754c9f465 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -1239,7 +1239,7 @@ mod test { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let backoff_durations = - PeerBackoffDurations { low: Duration::from_secs(3), ..Default::default() }; + PeerBackoffDurations { low: Duration::from_millis(200), ..Default::default() }; let config = PeersConfig { backoff_durations, ..Default::default() }; let mut peers = PeersManager::new(config); peers.add_peer(peer, socket_addr, None); @@ -1302,8 +1302,11 @@ mod test { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); - let backoff_durations = - PeerBackoffDurations { high: Duration::from_secs(3), ..Default::default() }; + let backoff_durations = PeerBackoffDurations { + high: Duration::from_millis(200), + low: Duration::from_millis(200), + ..Default::default() + }; let config = PeersConfig { backoff_durations, ..Default::default() }; let mut peers = PeersManager::new(config); peers.add_peer(peer, socket_addr, None); diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index bc06f1cc1b..c910ad9f61 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -918,12 +918,13 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_send_many_messages() { + reth_tracing::init_test_tracing(); let mut builder = SessionBuilder::default(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let local_addr = listener.local_addr().unwrap(); - let num_messages = 10_000; + let num_messages = 100; let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move { for _ in 0..num_messages { @@ -960,12 +961,12 @@ mod tests { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let local_addr = listener.local_addr().unwrap(); - let request_timeout = Duration::from_millis(500); + let request_timeout = Duration::from_millis(100); let drop_timeout = Duration::from_millis(1500); let fut = builder.with_client_stream(local_addr, move |client_stream| async move { let _client_stream = client_stream; - tokio::time::sleep(drop_timeout * 100).await; + tokio::time::sleep(drop_timeout * 10).await; }); tokio::task::spawn(fut); @@ -975,6 +976,7 @@ mod tests { .internal_request_timeout .store(request_timeout.as_millis() as u64, Ordering::Relaxed); session.protocol_breach_request_timeout = drop_timeout; + session.internal_request_timeout_interval = tokio::time::interval(request_timeout); let (tx, rx) = oneshot::channel(); let req = PeerRequest::GetBlockBodies { request: GetBlockBodies(vec![]), response: tx }; session.on_internal_peer_request(req, Instant::now()); @@ -999,7 +1001,7 @@ mod tests { let local_addr = listener.local_addr().unwrap(); let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move { - let _ = tokio::time::timeout(Duration::from_secs(25), client_stream.next()).await; + let _ = tokio::time::timeout(Duration::from_secs(5), client_stream.next()).await; client_stream.into_inner().disconnect(DisconnectReason::UselessPeer).await.unwrap(); }); diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index dad8bf19e7..85d3ad672f 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -78,8 +78,10 @@ where &mut self, configs: impl IntoIterator>, ) -> Result<(), NetworkError> { - for config in configs { - self.add_peer_with_config(config).await?; + let peers = configs.into_iter().map(|c| async { c.launch().await }).collect::>(); + let peers = futures::future::join_all(peers).await; + for peer in peers { + self.peers.push(peer?); } Ok(()) } @@ -277,6 +279,14 @@ impl PeerConfig where C: BlockProvider + HeaderProvider, { + /// Launches the network and returns the [Peer] that manages it + pub async fn launch(self) -> Result, NetworkError> { + let PeerConfig { config, client, secret_key } = self; + let network = NetworkManager::new(config).await?; + let peer = Peer { network, client, secret_key, request_handler: None }; + Ok(peer) + } + /// Initialize the network with a random secret key, allowing the devp2p and discovery to bind /// to any available IP and port. pub fn new(client: Arc) -> Self { diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index c81670db0f..3c38014192 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -28,7 +28,7 @@ use tokio::task; async fn test_establish_connections() { reth_tracing::init_test_tracing(); - for _ in 0..10 { + for _ in 0..3 { let net = Testnet::create(3).await; net.for_each(|peer| assert_eq!(0, peer.num_peers())); @@ -131,15 +131,15 @@ async fn test_already_connected() { #[tokio::test(flavor = "multi_thread")] async fn test_get_peer() { reth_tracing::init_test_tracing(); - let mut net = Testnet::default(); + let mut net = Testnet::default(); let secret_key = SecretKey::new(&mut rand::thread_rng()); let secret_key_1 = SecretKey::new(&mut rand::thread_rng()); let client = Arc::new(NoopProvider::default()); + let p1 = PeerConfig::default(); let p2 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key); let p3 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key_1); - net.extend_peer_with_config(vec![p1, p2, p3]).await.unwrap(); let mut handles = net.handles(); @@ -160,7 +160,6 @@ async fn test_get_peer() { let peers = handle0.get_peers().await.unwrap(); assert_eq!(handle0.num_connected_peers(), peers.len()); - dbg!(peers); } #[tokio::test(flavor = "multi_thread")]