mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-28 08:37:59 -05:00
test(net): speedup network tests (#1269)
This commit is contained in:
@@ -1239,7 +1239,7 @@ mod test {
|
||||
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
|
||||
|
||||
let backoff_durations =
|
||||
PeerBackoffDurations { low: Duration::from_secs(3), ..Default::default() };
|
||||
PeerBackoffDurations { low: Duration::from_millis(200), ..Default::default() };
|
||||
let config = PeersConfig { backoff_durations, ..Default::default() };
|
||||
let mut peers = PeersManager::new(config);
|
||||
peers.add_peer(peer, socket_addr, None);
|
||||
@@ -1302,8 +1302,11 @@ mod test {
|
||||
let peer = PeerId::random();
|
||||
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
|
||||
|
||||
let backoff_durations =
|
||||
PeerBackoffDurations { high: Duration::from_secs(3), ..Default::default() };
|
||||
let backoff_durations = PeerBackoffDurations {
|
||||
high: Duration::from_millis(200),
|
||||
low: Duration::from_millis(200),
|
||||
..Default::default()
|
||||
};
|
||||
let config = PeersConfig { backoff_durations, ..Default::default() };
|
||||
let mut peers = PeersManager::new(config);
|
||||
peers.add_peer(peer, socket_addr, None);
|
||||
|
||||
@@ -918,12 +918,13 @@ mod tests {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_send_many_messages() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let mut builder = SessionBuilder::default();
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = listener.local_addr().unwrap();
|
||||
|
||||
let num_messages = 10_000;
|
||||
let num_messages = 100;
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move {
|
||||
for _ in 0..num_messages {
|
||||
@@ -960,12 +961,12 @@ mod tests {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = listener.local_addr().unwrap();
|
||||
|
||||
let request_timeout = Duration::from_millis(500);
|
||||
let request_timeout = Duration::from_millis(100);
|
||||
let drop_timeout = Duration::from_millis(1500);
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, move |client_stream| async move {
|
||||
let _client_stream = client_stream;
|
||||
tokio::time::sleep(drop_timeout * 100).await;
|
||||
tokio::time::sleep(drop_timeout * 10).await;
|
||||
});
|
||||
tokio::task::spawn(fut);
|
||||
|
||||
@@ -975,6 +976,7 @@ mod tests {
|
||||
.internal_request_timeout
|
||||
.store(request_timeout.as_millis() as u64, Ordering::Relaxed);
|
||||
session.protocol_breach_request_timeout = drop_timeout;
|
||||
session.internal_request_timeout_interval = tokio::time::interval(request_timeout);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let req = PeerRequest::GetBlockBodies { request: GetBlockBodies(vec![]), response: tx };
|
||||
session.on_internal_peer_request(req, Instant::now());
|
||||
@@ -999,7 +1001,7 @@ mod tests {
|
||||
let local_addr = listener.local_addr().unwrap();
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, move |mut client_stream| async move {
|
||||
let _ = tokio::time::timeout(Duration::from_secs(25), client_stream.next()).await;
|
||||
let _ = tokio::time::timeout(Duration::from_secs(5), client_stream.next()).await;
|
||||
client_stream.into_inner().disconnect(DisconnectReason::UselessPeer).await.unwrap();
|
||||
});
|
||||
|
||||
|
||||
@@ -78,8 +78,10 @@ where
|
||||
&mut self,
|
||||
configs: impl IntoIterator<Item = PeerConfig<C>>,
|
||||
) -> Result<(), NetworkError> {
|
||||
for config in configs {
|
||||
self.add_peer_with_config(config).await?;
|
||||
let peers = configs.into_iter().map(|c| async { c.launch().await }).collect::<Vec<_>>();
|
||||
let peers = futures::future::join_all(peers).await;
|
||||
for peer in peers {
|
||||
self.peers.push(peer?);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -277,6 +279,14 @@ impl<C> PeerConfig<C>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider,
|
||||
{
|
||||
/// Launches the network and returns the [Peer] that manages it
|
||||
pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
|
||||
let PeerConfig { config, client, secret_key } = self;
|
||||
let network = NetworkManager::new(config).await?;
|
||||
let peer = Peer { network, client, secret_key, request_handler: None };
|
||||
Ok(peer)
|
||||
}
|
||||
|
||||
/// Initialize the network with a random secret key, allowing the devp2p and discovery to bind
|
||||
/// to any available IP and port.
|
||||
pub fn new(client: Arc<C>) -> Self {
|
||||
|
||||
@@ -28,7 +28,7 @@ use tokio::task;
|
||||
async fn test_establish_connections() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
for _ in 0..10 {
|
||||
for _ in 0..3 {
|
||||
let net = Testnet::create(3).await;
|
||||
|
||||
net.for_each(|peer| assert_eq!(0, peer.num_peers()));
|
||||
@@ -131,15 +131,15 @@ async fn test_already_connected() {
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_get_peer() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let mut net = Testnet::default();
|
||||
|
||||
let mut net = Testnet::default();
|
||||
let secret_key = SecretKey::new(&mut rand::thread_rng());
|
||||
let secret_key_1 = SecretKey::new(&mut rand::thread_rng());
|
||||
let client = Arc::new(NoopProvider::default());
|
||||
|
||||
let p1 = PeerConfig::default();
|
||||
let p2 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key);
|
||||
let p3 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key_1);
|
||||
|
||||
net.extend_peer_with_config(vec![p1, p2, p3]).await.unwrap();
|
||||
|
||||
let mut handles = net.handles();
|
||||
@@ -160,7 +160,6 @@ async fn test_get_peer() {
|
||||
|
||||
let peers = handle0.get_peers().await.unwrap();
|
||||
assert_eq!(handle0.num_connected_peers(), peers.len());
|
||||
dbg!(peers);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
|
||||
Reference in New Issue
Block a user