diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index cdcf95b5f7..68725f2ba1 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -837,6 +837,9 @@ async fn authenticate_stream( }; // if the hello handshake was successful we can try status handshake + // + // Before trying status handshake, set up the version to shared_capability + let status = Status { version: p2p_stream.shared_capability().version(), ..status }; let eth_unauthed = UnauthedEthStream::new(p2p_stream); let (eth_stream, their_status) = match eth_unauthed.handshake(status, fork_filter).await { Ok(stream_res) => stream_res, diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 3834fb8e93..06697dfb6f 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -6,7 +6,7 @@ use crate::{ }; use futures::{FutureExt, StreamExt}; use pin_project::pin_project; -use reth_eth_wire::DisconnectReason; +use reth_eth_wire::{capability::Capability, DisconnectReason, HelloBuilder}; use reth_primitives::PeerId; use reth_provider::{test_utils::NoopProvider, BlockProvider, HeaderProvider}; use secp256k1::SecretKey; @@ -281,17 +281,33 @@ where /// to any available IP and port. pub fn new(client: Arc) -> Self { let secret_key = SecretKey::new(&mut rand::thread_rng()); - Self::with_secret_key(client, secret_key) + let config = Self::network_config_builder(secret_key).build(Arc::clone(&client)); + Self { config, client, secret_key } } /// Initialize the network with a given secret key, allowing devp2p and discovery to bind any /// available IP and port. pub fn with_secret_key(client: Arc, secret_key: SecretKey) -> Self { - let config = NetworkConfigBuilder::new(secret_key) + let config = Self::network_config_builder(secret_key).build(Arc::clone(&client)); + Self { config, client, secret_key } + } + + /// Initialize the network with a given capabilities. + pub fn with_capabilities(client: Arc, capabilities: Vec) -> Self { + let secret_key = SecretKey::new(&mut rand::thread_rng()); + + let builder = Self::network_config_builder(secret_key); + let hello_message = + HelloBuilder::new(builder.get_peer_id()).capabilities(capabilities).build(); + let config = builder.hello_message(hello_message).build(Arc::clone(&client)); + + Self { config, client, secret_key } + } + + fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder { + NetworkConfigBuilder::new(secret_key) .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) - .build(Arc::clone(&client)); - Self { config, client, secret_key } } } diff --git a/crates/net/network/tests/it/main.rs b/crates/net/network/tests/it/main.rs index e52017ac74..87fbbfb96b 100644 --- a/crates/net/network/tests/it/main.rs +++ b/crates/net/network/tests/it/main.rs @@ -1,4 +1,5 @@ mod connect; mod requests; +mod session; fn main() {} diff --git a/crates/net/network/tests/it/session.rs b/crates/net/network/tests/it/session.rs new file mode 100644 index 0000000000..56ff3616d2 --- /dev/null +++ b/crates/net/network/tests/it/session.rs @@ -0,0 +1,87 @@ +//! Session tests + +use futures::StreamExt; +use reth_eth_wire::{capability::Capability, EthVersion}; +use reth_network::{ + test_utils::{PeerConfig, Testnet}, + NetworkEvent, +}; +use reth_network_api::{NetworkInfo, Peers}; +use reth_provider::test_utils::NoopProvider; +use std::sync::Arc; + +#[tokio::test(flavor = "multi_thread")] +async fn test_session_established_with_highest_version() { + reth_tracing::init_test_tracing(); + + let net = Testnet::create(2).await; + + net.for_each(|peer| assert_eq!(0, peer.num_peers())); + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + drop(handles); + + let handle = net.spawn(); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + + let mut events = handle0.event_listener().take(2); + while let Some(event) = events.next().await { + match event { + NetworkEvent::PeerAdded(peer_id) => { + assert_eq!(handle1.peer_id(), &peer_id); + } + NetworkEvent::SessionEstablished { peer_id, status, .. } => { + assert_eq!(handle1.peer_id(), &peer_id); + assert_eq!(status.version, EthVersion::Eth67 as u8); + } + _ => { + panic!("unexpected event") + } + } + } + + handle.terminate().await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_session_established_with_different_capability() { + reth_tracing::init_test_tracing(); + + let mut net = Testnet::create(1).await; + + let capabilities = vec![Capability::new("eth".into(), EthVersion::Eth66 as usize)]; + let p1 = PeerConfig::with_capabilities(Arc::new(NoopProvider::default()), capabilities); + net.add_peer_with_config(p1).await.unwrap(); + + net.for_each(|peer| assert_eq!(0, peer.num_peers())); + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + drop(handles); + + let handle = net.spawn(); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + + let mut events = handle0.event_listener().take(2); + while let Some(event) = events.next().await { + match event { + NetworkEvent::PeerAdded(peer_id) => { + assert_eq!(handle1.peer_id(), &peer_id); + } + NetworkEvent::SessionEstablished { peer_id, status, .. } => { + assert_eq!(handle1.peer_id(), &peer_id); + assert_eq!(status.version, EthVersion::Eth66 as u8); + } + _ => { + panic!("unexpected event") + } + } + } + + handle.terminate().await; +}