|
|
|
|
@@ -1,15 +1,12 @@
|
|
|
|
|
//! Connection tests
|
|
|
|
|
|
|
|
|
|
use alloy_node_bindings::Geth;
|
|
|
|
|
use alloy_primitives::map::HashSet;
|
|
|
|
|
use alloy_provider::{ext::AdminApi, ProviderBuilder};
|
|
|
|
|
use futures::StreamExt;
|
|
|
|
|
use reth_chainspec::{MAINNET, SEPOLIA};
|
|
|
|
|
use reth_discv4::Discv4Config;
|
|
|
|
|
use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, HeadersDirection};
|
|
|
|
|
use reth_net_banlist::BanList;
|
|
|
|
|
use reth_network::{
|
|
|
|
|
test_utils::{enr_to_peer_id, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT},
|
|
|
|
|
test_utils::{NetworkEventStream, PeerConfig, Testnet},
|
|
|
|
|
BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEventListenerProvider,
|
|
|
|
|
NetworkManager, PeersConfig,
|
|
|
|
|
};
|
|
|
|
|
@@ -25,7 +22,7 @@ use reth_network_peers::{mainnet_nodes, NodeRecord, TrustedPeer};
|
|
|
|
|
use reth_storage_api::noop::NoopProvider;
|
|
|
|
|
use reth_transaction_pool::test_utils::testing_pool;
|
|
|
|
|
use secp256k1::SecretKey;
|
|
|
|
|
use std::{net::SocketAddr, time::Duration};
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
use tokio::task;
|
|
|
|
|
use url::Host;
|
|
|
|
|
|
|
|
|
|
@@ -313,223 +310,6 @@ async fn test_connect_to_trusted_peer() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
|
|
|
#[serial_test::serial]
|
|
|
|
|
#[cfg_attr(not(feature = "geth-tests"), ignore)]
|
|
|
|
|
async fn test_incoming_node_id_blacklist() {
|
|
|
|
|
reth_tracing::init_test_tracing();
|
|
|
|
|
tokio::time::timeout(GETH_TIMEOUT, async move {
|
|
|
|
|
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
|
|
|
|
|
|
|
|
|
|
// instantiate geth and add ourselves as a peer
|
|
|
|
|
let temp_dir = tempfile::tempdir().unwrap().into_path();
|
|
|
|
|
let geth = Geth::new().data_dir(temp_dir).disable_discovery().authrpc_port(0).spawn();
|
|
|
|
|
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port());
|
|
|
|
|
|
|
|
|
|
let provider =
|
|
|
|
|
ProviderBuilder::new().connect_http(format!("http://{geth_endpoint}").parse().unwrap());
|
|
|
|
|
|
|
|
|
|
// get the peer id we should be expecting
|
|
|
|
|
let enr = provider.node_info().await.unwrap().enr;
|
|
|
|
|
let geth_peer_id = enr_to_peer_id(enr.parse().unwrap());
|
|
|
|
|
|
|
|
|
|
let ban_list = BanList::new(vec![geth_peer_id], vec![]);
|
|
|
|
|
let peer_config = PeersConfig::default().with_ban_list(ban_list);
|
|
|
|
|
|
|
|
|
|
let config = NetworkConfigBuilder::new(secret_key)
|
|
|
|
|
.listener_port(0)
|
|
|
|
|
.disable_discovery()
|
|
|
|
|
.peer_config(peer_config)
|
|
|
|
|
.build(NoopProvider::default());
|
|
|
|
|
|
|
|
|
|
let network = NetworkManager::new(config).await.unwrap();
|
|
|
|
|
|
|
|
|
|
let handle = network.handle().clone();
|
|
|
|
|
let events = handle.event_listener();
|
|
|
|
|
|
|
|
|
|
tokio::task::spawn(network);
|
|
|
|
|
|
|
|
|
|
// make geth connect to us
|
|
|
|
|
let our_enode = NodeRecord::new(handle.local_addr(), *handle.peer_id());
|
|
|
|
|
|
|
|
|
|
provider.add_peer(&our_enode.to_string()).await.unwrap();
|
|
|
|
|
|
|
|
|
|
let mut event_stream = NetworkEventStream::new(events);
|
|
|
|
|
|
|
|
|
|
// check for session to be opened
|
|
|
|
|
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
|
|
|
|
|
assert_eq!(incoming_peer_id, geth_peer_id);
|
|
|
|
|
|
|
|
|
|
// check to see that the session was closed
|
|
|
|
|
let incoming_peer_id = event_stream.next_session_closed().await.unwrap().0;
|
|
|
|
|
assert_eq!(incoming_peer_id, geth_peer_id);
|
|
|
|
|
})
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
|
|
|
#[serial_test::serial]
|
|
|
|
|
#[cfg_attr(not(feature = "geth-tests"), ignore)]
|
|
|
|
|
async fn test_incoming_connect_with_single_geth() {
|
|
|
|
|
reth_tracing::init_test_tracing();
|
|
|
|
|
tokio::time::timeout(GETH_TIMEOUT, async move {
|
|
|
|
|
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
|
|
|
|
|
|
|
|
|
|
// instantiate geth and add ourselves as a peer
|
|
|
|
|
let temp_dir = tempfile::tempdir().unwrap().into_path();
|
|
|
|
|
let geth = Geth::new().data_dir(temp_dir).disable_discovery().authrpc_port(0).spawn();
|
|
|
|
|
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port());
|
|
|
|
|
let provider =
|
|
|
|
|
ProviderBuilder::new().connect_http(format!("http://{geth_endpoint}").parse().unwrap());
|
|
|
|
|
|
|
|
|
|
// get the peer id we should be expecting
|
|
|
|
|
let enr = provider.node_info().await.unwrap().enr;
|
|
|
|
|
let geth_peer_id = enr_to_peer_id(enr.parse().unwrap());
|
|
|
|
|
|
|
|
|
|
let config = NetworkConfigBuilder::new(secret_key)
|
|
|
|
|
.listener_port(0)
|
|
|
|
|
.disable_discovery()
|
|
|
|
|
.build(NoopProvider::default());
|
|
|
|
|
|
|
|
|
|
let network = NetworkManager::new(config).await.unwrap();
|
|
|
|
|
|
|
|
|
|
let handle = network.handle().clone();
|
|
|
|
|
tokio::task::spawn(network);
|
|
|
|
|
|
|
|
|
|
let events = handle.event_listener();
|
|
|
|
|
let mut event_stream = NetworkEventStream::new(events);
|
|
|
|
|
|
|
|
|
|
// make geth connect to us
|
|
|
|
|
let our_enode = NodeRecord::new(handle.local_addr(), *handle.peer_id());
|
|
|
|
|
|
|
|
|
|
provider.add_peer(&our_enode.to_string()).await.unwrap();
|
|
|
|
|
|
|
|
|
|
// check for a sessionestablished event
|
|
|
|
|
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
|
|
|
|
|
assert_eq!(incoming_peer_id, geth_peer_id);
|
|
|
|
|
})
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
|
|
|
#[serial_test::serial]
|
|
|
|
|
#[cfg_attr(not(feature = "geth-tests"), ignore)]
|
|
|
|
|
async fn test_outgoing_connect_with_single_geth() {
|
|
|
|
|
reth_tracing::init_test_tracing();
|
|
|
|
|
tokio::time::timeout(GETH_TIMEOUT, async move {
|
|
|
|
|
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
|
|
|
|
|
|
|
|
|
|
let config = NetworkConfigBuilder::new(secret_key)
|
|
|
|
|
.listener_port(0)
|
|
|
|
|
.disable_discovery()
|
|
|
|
|
.build(NoopProvider::default());
|
|
|
|
|
let network = NetworkManager::new(config).await.unwrap();
|
|
|
|
|
|
|
|
|
|
let handle = network.handle().clone();
|
|
|
|
|
tokio::task::spawn(network);
|
|
|
|
|
|
|
|
|
|
// create networkeventstream to get the next session established event easily
|
|
|
|
|
let events = handle.event_listener();
|
|
|
|
|
let mut event_stream = NetworkEventStream::new(events);
|
|
|
|
|
|
|
|
|
|
// instantiate geth and add ourselves as a peer
|
|
|
|
|
let temp_dir = tempfile::tempdir().unwrap().into_path();
|
|
|
|
|
let geth = Geth::new().disable_discovery().data_dir(temp_dir).authrpc_port(0).spawn();
|
|
|
|
|
|
|
|
|
|
let geth_p2p_port = geth.p2p_port().unwrap();
|
|
|
|
|
let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port);
|
|
|
|
|
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string();
|
|
|
|
|
|
|
|
|
|
let provider =
|
|
|
|
|
ProviderBuilder::new().connect_http(format!("http://{geth_endpoint}").parse().unwrap());
|
|
|
|
|
|
|
|
|
|
// get the peer id we should be expecting
|
|
|
|
|
let enr = provider.node_info().await.unwrap().enr;
|
|
|
|
|
let geth_peer_id = enr_to_peer_id(enr.parse().unwrap());
|
|
|
|
|
|
|
|
|
|
// add geth as a peer then wait for a `SessionEstablished` event
|
|
|
|
|
handle.add_peer(geth_peer_id, geth_socket);
|
|
|
|
|
|
|
|
|
|
// check for a sessionestablished event
|
|
|
|
|
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
|
|
|
|
|
assert_eq!(incoming_peer_id, geth_peer_id);
|
|
|
|
|
})
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
|
|
|
#[serial_test::serial]
|
|
|
|
|
#[cfg_attr(not(feature = "geth-tests"), ignore)]
|
|
|
|
|
async fn test_geth_disconnect() {
|
|
|
|
|
reth_tracing::init_test_tracing();
|
|
|
|
|
tokio::time::timeout(GETH_TIMEOUT, async move {
|
|
|
|
|
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
|
|
|
|
|
|
|
|
|
|
let config = NetworkConfigBuilder::eth(secret_key)
|
|
|
|
|
.listener_port(0)
|
|
|
|
|
.disable_discovery()
|
|
|
|
|
.build(NoopProvider::default());
|
|
|
|
|
let network = NetworkManager::new(config).await.unwrap();
|
|
|
|
|
|
|
|
|
|
let handle = network.handle().clone();
|
|
|
|
|
tokio::task::spawn(network);
|
|
|
|
|
|
|
|
|
|
// create networkeventstream to get the next session established event easily
|
|
|
|
|
let mut events = handle.event_listener();
|
|
|
|
|
|
|
|
|
|
// instantiate geth and add ourselves as a peer
|
|
|
|
|
let temp_dir = tempfile::tempdir().unwrap().into_path();
|
|
|
|
|
let geth = Geth::new().disable_discovery().data_dir(temp_dir).authrpc_port(0).spawn();
|
|
|
|
|
|
|
|
|
|
let geth_p2p_port = geth.p2p_port().unwrap();
|
|
|
|
|
let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port);
|
|
|
|
|
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string();
|
|
|
|
|
|
|
|
|
|
let provider =
|
|
|
|
|
ProviderBuilder::new().connect_http(format!("http://{geth_endpoint}").parse().unwrap());
|
|
|
|
|
|
|
|
|
|
// get the peer id we should be expecting
|
|
|
|
|
let enr = provider.node_info().await.unwrap().enr;
|
|
|
|
|
let geth_peer_id = enr_to_peer_id(enr.parse().unwrap());
|
|
|
|
|
|
|
|
|
|
// add geth as a peer then wait for `PeerAdded` and `SessionEstablished` events.
|
|
|
|
|
handle.add_peer(geth_peer_id, geth_socket);
|
|
|
|
|
|
|
|
|
|
match events.next().await {
|
|
|
|
|
Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => {
|
|
|
|
|
assert_eq!(peer_id, geth_peer_id)
|
|
|
|
|
}
|
|
|
|
|
_ => panic!("Expected a peer added event"),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if let Some(NetworkEvent::Peer(PeerEvent::SessionEstablished(session_info))) =
|
|
|
|
|
events.next().await
|
|
|
|
|
{
|
|
|
|
|
let SessionInfo { peer_id, .. } = session_info;
|
|
|
|
|
assert_eq!(peer_id, geth_peer_id);
|
|
|
|
|
} else {
|
|
|
|
|
panic!("Expected a session established event");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// remove geth as a peer deliberately
|
|
|
|
|
handle.disconnect_peer(geth_peer_id);
|
|
|
|
|
|
|
|
|
|
// wait for a disconnect from geth
|
|
|
|
|
if let Some(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. })) =
|
|
|
|
|
events.next().await
|
|
|
|
|
{
|
|
|
|
|
assert_eq!(peer_id, geth_peer_id);
|
|
|
|
|
} else {
|
|
|
|
|
panic!("Expected a session closed event");
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
|
|
|
async fn test_shutdown() {
|
|
|
|
|
reth_tracing::init_test_tracing();
|
|
|
|
|
|