diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index f0a749a4a5..7b5e12b28e 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -16,6 +16,9 @@ use secp256k1::SecretKey; use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration}; use tokio::task; +// The timeout for tests that create a GethInstance +const GETH_TIMEOUT: Duration = Duration::from_secs(60); + #[tokio::test(flavor = "multi_thread")] async fn test_establish_connections() { reth_tracing::init_tracing(); @@ -128,188 +131,204 @@ async fn test_connect_with_builder() { #[tokio::test(flavor = "multi_thread")] async fn test_incoming_node_id_blacklist() { reth_tracing::init_tracing(); - let secret_key = SecretKey::new(&mut rand::thread_rng()); + tokio::time::timeout(GETH_TIMEOUT, async move { + let secret_key = SecretKey::new(&mut rand::thread_rng()); - // instantiate geth and add ourselves as a peer - let temp_dir = tempfile::tempdir().unwrap().into_path(); - let geth = Geth::new().data_dir(temp_dir).disable_discovery().spawn(); - let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()); - let provider = Provider::::try_from(format!("http://{geth_endpoint}")).unwrap(); + // instantiate geth and add ourselves as a peer + let temp_dir = tempfile::tempdir().unwrap().into_path(); + let geth = Geth::new().data_dir(temp_dir).disable_discovery().spawn(); + let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()); + let provider = Provider::::try_from(format!("http://{geth_endpoint}")).unwrap(); - // get the peer id we should be expecting - let geth_peer_id: PeerId = - provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into(); + // get the peer id we should be expecting + let geth_peer_id: PeerId = + provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into(); - let ban_list = BanList::new(vec![geth_peer_id], HashSet::new()); - let peer_config = PeersConfig::default().with_ban_list(ban_list); + let ban_list = BanList::new(vec![geth_peer_id], HashSet::new()); + let peer_config = PeersConfig::default().with_ban_list(ban_list); - let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30303); - let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30304); - let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key) - .listener_addr(reth_p2p_socket) - .discovery_addr(reth_disc_socket) - .peer_config(peer_config) - .build(); + let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30303); + let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30304); + let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key) + .listener_addr(reth_p2p_socket) + .discovery_addr(reth_disc_socket) + .peer_config(peer_config) + .build(); - let network = NetworkManager::new(config).await.unwrap(); + let network = NetworkManager::new(config).await.unwrap(); - let handle = network.handle().clone(); - let events = handle.event_listener(); + let handle = network.handle().clone(); + let events = handle.event_listener(); - tokio::task::spawn(network); + tokio::task::spawn(network); - // make geth connect to us - let our_peer_id = handle.peer_id(); - let our_enode = format!("enode://{}@{}", hex::encode(our_peer_id.0), reth_p2p_socket); + // make geth connect to us + let our_peer_id = handle.peer_id(); + let our_enode = format!("enode://{}@{}", hex::encode(our_peer_id.0), reth_p2p_socket); - provider.add_peer(our_enode).await.unwrap(); + provider.add_peer(our_enode).await.unwrap(); - let mut event_stream = NetworkEventStream::new(events); + let mut event_stream = NetworkEventStream::new(events); - // check for session to be opened - let incoming_peer_id = event_stream.next_session_established().await.unwrap(); - assert_eq!(incoming_peer_id, geth_peer_id); + // check for session to be opened + let incoming_peer_id = event_stream.next_session_established().await.unwrap(); + assert_eq!(incoming_peer_id, geth_peer_id); - // check to see that the session was closed - let incoming_peer_id = event_stream.next_session_closed().await.unwrap(); - assert_eq!(incoming_peer_id, geth_peer_id); + // check to see that the session was closed + let incoming_peer_id = event_stream.next_session_closed().await.unwrap(); + assert_eq!(incoming_peer_id, geth_peer_id); + }) + .await + .unwrap(); } #[tokio::test(flavor = "multi_thread")] #[serial_test::serial] async fn test_incoming_connect_with_single_geth() { reth_tracing::init_tracing(); - let secret_key = SecretKey::new(&mut rand::thread_rng()); + tokio::time::timeout(GETH_TIMEOUT, async move { + let secret_key = SecretKey::new(&mut rand::thread_rng()); - // instantiate geth and add ourselves as a peer - let temp_dir = tempfile::tempdir().unwrap().into_path(); - let geth = Geth::new().data_dir(temp_dir).disable_discovery().spawn(); - let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()); - let provider = Provider::::try_from(format!("http://{geth_endpoint}")).unwrap(); + // instantiate geth and add ourselves as a peer + let temp_dir = tempfile::tempdir().unwrap().into_path(); + let geth = Geth::new().data_dir(temp_dir).disable_discovery().spawn(); + let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()); + let provider = Provider::::try_from(format!("http://{geth_endpoint}")).unwrap(); - // get the peer id we should be expecting - let geth_peer_id: PeerId = - provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into(); + // get the peer id we should be expecting + let geth_peer_id: PeerId = + provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into(); - let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30305); - let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30306); - let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key) - .listener_addr(reth_p2p_socket) - .discovery_addr(reth_disc_socket) - .build(); + let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30305); + let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30306); + let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key) + .listener_addr(reth_p2p_socket) + .discovery_addr(reth_disc_socket) + .build(); - let network = NetworkManager::new(config).await.unwrap(); + let network = NetworkManager::new(config).await.unwrap(); - let handle = network.handle().clone(); - tokio::task::spawn(network); + let handle = network.handle().clone(); + tokio::task::spawn(network); - // make geth connect to us - let our_peer_id = handle.peer_id(); - let our_enode = format!("enode://{}@{}", hex::encode(our_peer_id.0), reth_p2p_socket); + // make geth connect to us + let our_peer_id = handle.peer_id(); + let our_enode = format!("enode://{}@{}", hex::encode(our_peer_id.0), reth_p2p_socket); - provider.add_peer(our_enode).await.unwrap(); + provider.add_peer(our_enode).await.unwrap(); - let events = handle.event_listener(); - let mut event_stream = NetworkEventStream::new(events); + let events = handle.event_listener(); + let mut event_stream = NetworkEventStream::new(events); - // check for a sessionestablished event - let incoming_peer_id = event_stream.next_session_established().await.unwrap(); - assert_eq!(incoming_peer_id, geth_peer_id); + // check for a sessionestablished event + let incoming_peer_id = event_stream.next_session_established().await.unwrap(); + assert_eq!(incoming_peer_id, geth_peer_id); + }) + .await + .unwrap(); } #[tokio::test(flavor = "multi_thread")] #[serial_test::serial] async fn test_outgoing_connect_with_single_geth() { reth_tracing::init_tracing(); - let secret_key = SecretKey::new(&mut rand::thread_rng()); + tokio::time::timeout(GETH_TIMEOUT, async move { + let secret_key = SecretKey::new(&mut rand::thread_rng()); - let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30307); - let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30308); - let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key) - .listener_addr(reth_p2p_socket) - .discovery_addr(reth_disc_socket) - .build(); - let network = NetworkManager::new(config).await.unwrap(); + let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30307); + let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30308); + let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key) + .listener_addr(reth_p2p_socket) + .discovery_addr(reth_disc_socket) + .build(); + let network = NetworkManager::new(config).await.unwrap(); - let handle = network.handle().clone(); - tokio::task::spawn(network); + let handle = network.handle().clone(); + tokio::task::spawn(network); - // instantiate geth and add ourselves as a peer - let temp_dir = tempfile::tempdir().unwrap().into_path(); - let geth = Geth::new().disable_discovery().data_dir(temp_dir).spawn(); + // instantiate geth and add ourselves as a peer + let temp_dir = tempfile::tempdir().unwrap().into_path(); + let geth = Geth::new().disable_discovery().data_dir(temp_dir).spawn(); - let geth_p2p_port = geth.p2p_port().unwrap(); - let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port); - let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string(); + let geth_p2p_port = geth.p2p_port().unwrap(); + let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port); + let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string(); - let provider = Provider::::try_from(format!("http://{geth_endpoint}")).unwrap(); + let provider = Provider::::try_from(format!("http://{geth_endpoint}")).unwrap(); - // get the peer id we should be expecting - let geth_peer_id: PeerId = - provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into(); + // get the peer id we should be expecting + let geth_peer_id: PeerId = + provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into(); - // add geth as a peer then wait for a `SessionEstablished` event - handle.add_peer(geth_peer_id, geth_socket); + // add geth as a peer then wait for a `SessionEstablished` event + handle.add_peer(geth_peer_id, geth_socket); - // create networkeventstream to get the next session established event easily - let events = handle.event_listener(); - let mut event_stream = NetworkEventStream::new(events); + // create networkeventstream to get the next session established event easily + let events = handle.event_listener(); + let mut event_stream = NetworkEventStream::new(events); - // check for a sessionestablished event - let incoming_peer_id = event_stream.next_session_established().await.unwrap(); - assert_eq!(incoming_peer_id, geth_peer_id); + // check for a sessionestablished event + let incoming_peer_id = event_stream.next_session_established().await.unwrap(); + assert_eq!(incoming_peer_id, geth_peer_id); + }) + .await + .unwrap(); } #[tokio::test(flavor = "multi_thread")] #[serial_test::serial] async fn test_geth_disconnect() { reth_tracing::init_tracing(); - let secret_key = SecretKey::new(&mut rand::thread_rng()); + tokio::time::timeout(GETH_TIMEOUT, async move { + let secret_key = SecretKey::new(&mut rand::thread_rng()); - let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30309); - let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30310); - let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key) - .listener_addr(reth_p2p_socket) - .discovery_addr(reth_disc_socket) - .build(); - let network = NetworkManager::new(config).await.unwrap(); + let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30309); + let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30310); + let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key) + .listener_addr(reth_p2p_socket) + .discovery_addr(reth_disc_socket) + .build(); + let network = NetworkManager::new(config).await.unwrap(); - let handle = network.handle().clone(); - tokio::task::spawn(network); + let handle = network.handle().clone(); + tokio::task::spawn(network); - // instantiate geth and add ourselves as a peer - let temp_dir = tempfile::tempdir().unwrap().into_path(); - let geth = Geth::new().disable_discovery().data_dir(temp_dir).spawn(); + // instantiate geth and add ourselves as a peer + let temp_dir = tempfile::tempdir().unwrap().into_path(); + let geth = Geth::new().disable_discovery().data_dir(temp_dir).spawn(); - let geth_p2p_port = geth.p2p_port().unwrap(); - let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port); - let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string(); + let geth_p2p_port = geth.p2p_port().unwrap(); + let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port); + let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string(); - let provider = Provider::::try_from(format!("http://{geth_endpoint}")).unwrap(); + let provider = Provider::::try_from(format!("http://{geth_endpoint}")).unwrap(); - // get the peer id we should be expecting - let geth_peer_id: PeerId = - provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into(); + // get the peer id we should be expecting + let geth_peer_id: PeerId = + provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into(); - // add geth as a peer then wait for a `SessionEstablished` event - handle.add_peer(geth_peer_id, geth_socket); + // add geth as a peer then wait for a `SessionEstablished` event + handle.add_peer(geth_peer_id, geth_socket); - // create networkeventstream to get the next session established event easily - let mut events = handle.event_listener(); + // create networkeventstream to get the next session established event easily + let mut events = handle.event_listener(); - if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await { - assert_eq!(peer_id, geth_peer_id); - } else { - panic!("Expected a session established event"); - } + if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await { + assert_eq!(peer_id, geth_peer_id); + } else { + panic!("Expected a session established event"); + } - // remove geth as a peer deliberately - handle.disconnect_peer(geth_peer_id); + // remove geth as a peer deliberately + handle.disconnect_peer(geth_peer_id); - // wait for a disconnect from geth - if let Some(NetworkEvent::SessionClosed { peer_id }) = events.next().await { - assert_eq!(peer_id, geth_peer_id); - } else { - panic!("Expected a session closed event"); - } + // wait for a disconnect from geth + if let Some(NetworkEvent::SessionClosed { peer_id }) = events.next().await { + assert_eq!(peer_id, geth_peer_id); + } else { + panic!("Expected a session closed event"); + } + }) + .await + .unwrap(); }