feat(eth-wire): add timeouts to tests using Geth (#529)

This commit is contained in:
Dan Cline
2022-12-19 17:35:06 -05:00
committed by GitHub
parent 3db7455eca
commit ac1c0419d2

View File

@@ -16,6 +16,9 @@ use secp256k1::SecretKey;
use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use tokio::task;
// The timeout for tests that create a GethInstance
const GETH_TIMEOUT: Duration = Duration::from_secs(60);
#[tokio::test(flavor = "multi_thread")]
async fn test_establish_connections() {
reth_tracing::init_tracing();
@@ -128,188 +131,204 @@ async fn test_connect_with_builder() {
#[tokio::test(flavor = "multi_thread")]
async fn test_incoming_node_id_blacklist() {
reth_tracing::init_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());
tokio::time::timeout(GETH_TIMEOUT, async move {
let secret_key = SecretKey::new(&mut rand::thread_rng());
// instantiate geth and add ourselves as a peer
let temp_dir = tempfile::tempdir().unwrap().into_path();
let geth = Geth::new().data_dir(temp_dir).disable_discovery().spawn();
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port());
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
// instantiate geth and add ourselves as a peer
let temp_dir = tempfile::tempdir().unwrap().into_path();
let geth = Geth::new().data_dir(temp_dir).disable_discovery().spawn();
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port());
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
// get the peer id we should be expecting
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
// get the peer id we should be expecting
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
let ban_list = BanList::new(vec![geth_peer_id], HashSet::new());
let peer_config = PeersConfig::default().with_ban_list(ban_list);
let ban_list = BanList::new(vec![geth_peer_id], HashSet::new());
let peer_config = PeersConfig::default().with_ban_list(ban_list);
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30303);
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30304);
let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key)
.listener_addr(reth_p2p_socket)
.discovery_addr(reth_disc_socket)
.peer_config(peer_config)
.build();
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30303);
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30304);
let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key)
.listener_addr(reth_p2p_socket)
.discovery_addr(reth_disc_socket)
.peer_config(peer_config)
.build();
let network = NetworkManager::new(config).await.unwrap();
let network = NetworkManager::new(config).await.unwrap();
let handle = network.handle().clone();
let events = handle.event_listener();
let handle = network.handle().clone();
let events = handle.event_listener();
tokio::task::spawn(network);
tokio::task::spawn(network);
// make geth connect to us
let our_peer_id = handle.peer_id();
let our_enode = format!("enode://{}@{}", hex::encode(our_peer_id.0), reth_p2p_socket);
// make geth connect to us
let our_peer_id = handle.peer_id();
let our_enode = format!("enode://{}@{}", hex::encode(our_peer_id.0), reth_p2p_socket);
provider.add_peer(our_enode).await.unwrap();
provider.add_peer(our_enode).await.unwrap();
let mut event_stream = NetworkEventStream::new(events);
let mut event_stream = NetworkEventStream::new(events);
// check for session to be opened
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
// check for session to be opened
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
// check to see that the session was closed
let incoming_peer_id = event_stream.next_session_closed().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
// check to see that the session was closed
let incoming_peer_id = event_stream.next_session_closed().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
#[serial_test::serial]
async fn test_incoming_connect_with_single_geth() {
reth_tracing::init_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());
tokio::time::timeout(GETH_TIMEOUT, async move {
let secret_key = SecretKey::new(&mut rand::thread_rng());
// instantiate geth and add ourselves as a peer
let temp_dir = tempfile::tempdir().unwrap().into_path();
let geth = Geth::new().data_dir(temp_dir).disable_discovery().spawn();
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port());
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
// instantiate geth and add ourselves as a peer
let temp_dir = tempfile::tempdir().unwrap().into_path();
let geth = Geth::new().data_dir(temp_dir).disable_discovery().spawn();
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port());
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
// get the peer id we should be expecting
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
// get the peer id we should be expecting
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30305);
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30306);
let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key)
.listener_addr(reth_p2p_socket)
.discovery_addr(reth_disc_socket)
.build();
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30305);
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30306);
let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key)
.listener_addr(reth_p2p_socket)
.discovery_addr(reth_disc_socket)
.build();
let network = NetworkManager::new(config).await.unwrap();
let network = NetworkManager::new(config).await.unwrap();
let handle = network.handle().clone();
tokio::task::spawn(network);
let handle = network.handle().clone();
tokio::task::spawn(network);
// make geth connect to us
let our_peer_id = handle.peer_id();
let our_enode = format!("enode://{}@{}", hex::encode(our_peer_id.0), reth_p2p_socket);
// make geth connect to us
let our_peer_id = handle.peer_id();
let our_enode = format!("enode://{}@{}", hex::encode(our_peer_id.0), reth_p2p_socket);
provider.add_peer(our_enode).await.unwrap();
provider.add_peer(our_enode).await.unwrap();
let events = handle.event_listener();
let mut event_stream = NetworkEventStream::new(events);
let events = handle.event_listener();
let mut event_stream = NetworkEventStream::new(events);
// check for a sessionestablished event
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
// check for a sessionestablished event
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
#[serial_test::serial]
async fn test_outgoing_connect_with_single_geth() {
reth_tracing::init_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());
tokio::time::timeout(GETH_TIMEOUT, async move {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30307);
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30308);
let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key)
.listener_addr(reth_p2p_socket)
.discovery_addr(reth_disc_socket)
.build();
let network = NetworkManager::new(config).await.unwrap();
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30307);
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30308);
let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key)
.listener_addr(reth_p2p_socket)
.discovery_addr(reth_disc_socket)
.build();
let network = NetworkManager::new(config).await.unwrap();
let handle = network.handle().clone();
tokio::task::spawn(network);
let handle = network.handle().clone();
tokio::task::spawn(network);
// instantiate geth and add ourselves as a peer
let temp_dir = tempfile::tempdir().unwrap().into_path();
let geth = Geth::new().disable_discovery().data_dir(temp_dir).spawn();
// instantiate geth and add ourselves as a peer
let temp_dir = tempfile::tempdir().unwrap().into_path();
let geth = Geth::new().disable_discovery().data_dir(temp_dir).spawn();
let geth_p2p_port = geth.p2p_port().unwrap();
let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port);
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string();
let geth_p2p_port = geth.p2p_port().unwrap();
let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port);
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string();
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
// get the peer id we should be expecting
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
// get the peer id we should be expecting
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
// add geth as a peer then wait for a `SessionEstablished` event
handle.add_peer(geth_peer_id, geth_socket);
// add geth as a peer then wait for a `SessionEstablished` event
handle.add_peer(geth_peer_id, geth_socket);
// create networkeventstream to get the next session established event easily
let events = handle.event_listener();
let mut event_stream = NetworkEventStream::new(events);
// create networkeventstream to get the next session established event easily
let events = handle.event_listener();
let mut event_stream = NetworkEventStream::new(events);
// check for a sessionestablished event
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
// check for a sessionestablished event
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
#[serial_test::serial]
async fn test_geth_disconnect() {
reth_tracing::init_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());
tokio::time::timeout(GETH_TIMEOUT, async move {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30309);
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30310);
let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key)
.listener_addr(reth_p2p_socket)
.discovery_addr(reth_disc_socket)
.build();
let network = NetworkManager::new(config).await.unwrap();
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30309);
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30310);
let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key)
.listener_addr(reth_p2p_socket)
.discovery_addr(reth_disc_socket)
.build();
let network = NetworkManager::new(config).await.unwrap();
let handle = network.handle().clone();
tokio::task::spawn(network);
let handle = network.handle().clone();
tokio::task::spawn(network);
// instantiate geth and add ourselves as a peer
let temp_dir = tempfile::tempdir().unwrap().into_path();
let geth = Geth::new().disable_discovery().data_dir(temp_dir).spawn();
// instantiate geth and add ourselves as a peer
let temp_dir = tempfile::tempdir().unwrap().into_path();
let geth = Geth::new().disable_discovery().data_dir(temp_dir).spawn();
let geth_p2p_port = geth.p2p_port().unwrap();
let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port);
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string();
let geth_p2p_port = geth.p2p_port().unwrap();
let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port);
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string();
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
// get the peer id we should be expecting
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
// get the peer id we should be expecting
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
// add geth as a peer then wait for a `SessionEstablished` event
handle.add_peer(geth_peer_id, geth_socket);
// add geth as a peer then wait for a `SessionEstablished` event
handle.add_peer(geth_peer_id, geth_socket);
// create networkeventstream to get the next session established event easily
let mut events = handle.event_listener();
// create networkeventstream to get the next session established event easily
let mut events = handle.event_listener();
if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session established event");
}
if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session established event");
}
// remove geth as a peer deliberately
handle.disconnect_peer(geth_peer_id);
// remove geth as a peer deliberately
handle.disconnect_peer(geth_peer_id);
// wait for a disconnect from geth
if let Some(NetworkEvent::SessionClosed { peer_id }) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session closed event");
}
// wait for a disconnect from geth
if let Some(NetworkEvent::SessionClosed { peer_id }) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session closed event");
}
})
.await
.unwrap();
}