test(net): add Testnet implementation for testing (#232)

* test(net): add Testnet implementation for testing

* test(net): add testnet type and test
This commit is contained in:
Matthias Seitz
2022-11-23 01:33:01 +01:00
committed by GitHub
parent 7e693046c6
commit 0f45f16455
13 changed files with 388 additions and 29 deletions

View File

@@ -44,4 +44,8 @@ secp256k1 = { version = "0.24", features = [
] }
[dev-dependencies]
# reth
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
rand = "0.8"

View File

@@ -129,6 +129,18 @@ impl<C> NetworkConfigBuilder<C> {
self
}
/// Sets the socket address the network will listen on
pub fn listener_addr(mut self, listener_addr: SocketAddr) -> Self {
self.listener_addr = Some(listener_addr);
self
}
/// Sets the socket address the discovery network will listen on
pub fn discovery_addr(mut self, discovery_addr: SocketAddr) -> Self {
self.discovery_addr = Some(discovery_addr);
self
}
/// Consumes the type and creates the actual [`NetworkConfig`]
pub fn build(self) -> NetworkConfig<C> {
let Self {

View File

@@ -32,6 +32,6 @@ mod swarm;
mod transactions;
pub use config::NetworkConfig;
pub use manager::NetworkManager;
pub use manager::{NetworkEvent, NetworkManager};
pub use network::NetworkHandle;
pub use peers::PeersConfig;

View File

@@ -93,8 +93,6 @@ pub struct NetworkManager<C> {
/// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
num_active_peers: Arc<AtomicUsize>,
/// Local copy of the `PeerId` of the local node.
local_peer_id: PeerId,
}
// === impl NetworkManager ===
@@ -158,7 +156,6 @@ where
event_listeners: Default::default(),
to_transactions: None,
num_active_peers,
local_peer_id,
})
}
@@ -167,6 +164,21 @@ where
self.to_transactions = Some(tx);
}
/// Returns the [`SocketAddr`] that listens for incoming connections.
pub fn local_addr(&self) -> SocketAddr {
self.swarm.listener().local_address()
}
/// How many peers we're currently connected to.
pub fn num_connected_peers(&self) -> usize {
self.swarm.state().num_connected_peers()
}
/// Returns the [`PeerId`] used in the network.
pub fn peer_id(&self) -> &PeerId {
self.handle.peer_id()
}
/// Returns the [`NetworkHandle`] that can be cloned and shared.
///
/// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
@@ -181,7 +193,7 @@ where
_capabilities: Arc<Capabilities>,
_message: CapabilityMessage,
) {
trace!(?peer_id, target = "net", "received unexpected message");
trace!(target : "net", ?peer_id, "received unexpected message");
// TODO: disconnect?
}
@@ -292,7 +304,7 @@ where
}
NetworkHandleMessage::AnnounceBlock(block, hash) => {
if self.handle.mode().is_stake() {
error!(target = "net", "Block propagation is not supported in POS - [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)");
error!(target : "net", "Block propagation is not supported in POS - [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)");
return
}
let msg = NewBlockMessage { hash, block: Arc::new(block) };
@@ -308,6 +320,12 @@ where
.swarm
.sessions_mut()
.send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
NetworkHandleMessage::AddPeerAddress(peer, addr) => {
self.swarm.state_mut().add_peer_address(peer, addr);
}
NetworkHandleMessage::DisconnectPeer(peer_id) => {
self.swarm.sessions_mut().disconnect(peer_id, None);
}
}
}
}
@@ -351,24 +369,24 @@ where
this.on_invalid_message(peer_id, capabilities, message)
}
SwarmEvent::TcpListenerClosed { remote_addr } => {
trace!(?remote_addr, target = "net", "TCP listener closed.");
trace!(target : "net", ?remote_addr, "TCP listener closed.");
}
SwarmEvent::TcpListenerError(err) => {
trace!(?err, target = "net", "TCP connection error.");
trace!(target : "net", ?err, "TCP connection error.");
}
SwarmEvent::IncomingTcpConnection { remote_addr, .. } => {
trace!(?remote_addr, target = "net", "Incoming connection");
trace!(target : "net",?remote_addr, "Incoming connection");
}
SwarmEvent::OutgoingTcpConnection { remote_addr } => {
trace!(?remote_addr, target = "net", "Starting outbound connection.");
trace!(target : "net", ?remote_addr,"Starting outbound connection.");
}
SwarmEvent::SessionEstablished { peer_id, remote_addr, capabilities, messages } => {
let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
trace!(
target : "net",
?remote_addr,
?peer_id,
?total_active,
target = "net",
"Session established"
);
@@ -381,11 +399,11 @@ where
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
trace!(
target : "net",
?remote_addr,
?peer_id,
?total_active,
?error,
target = "net",
"Session disconnected"
);
@@ -408,11 +426,17 @@ where
#[derive(Debug, Clone)]
pub enum NetworkEvent {
/// Closed the peer session.
SessionClosed { peer_id: PeerId },
SessionClosed {
/// The identifier of the peer to which a session was closed.
peer_id: PeerId,
},
/// Established a new session with the given peer.
SessionEstablished {
/// The identifier of the peer to which a session was established.
peer_id: PeerId,
/// Capabilities the peer announced
capabilities: Arc<Capabilities>,
/// A request channel to the session task.
messages: PeerRequestSender,
},
}
@@ -434,7 +458,7 @@ impl NetworkEventListeners {
self.listeners.retain(|listener| {
let open = listener.send(event.clone()).is_ok();
if !open {
trace!(target = "net", "event listener channel closed",);
trace!(target : "net", "event listener channel closed",);
}
open
});

View File

@@ -4,9 +4,13 @@ use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, Transactions};
use reth_primitives::{PeerId, H256};
use std::{
net::SocketAddr,
sync::{atomic::AtomicUsize, Arc},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tokio::sync::{mpsc, mpsc::UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A _shareable_ network frontend. Used to interact with the network.
///
@@ -40,15 +44,30 @@ impl NetworkHandle {
Self { inner: Arc::new(inner) }
}
/// How many peers we're currently connected to.
pub fn num_connected_peers(&self) -> usize {
self.inner.num_active_peers.load(Ordering::Relaxed)
}
/// Returns the [`SocketAddr`] that listens for incoming connections.
pub fn local_addr(&self) -> SocketAddr {
*self.inner.listener_address.lock()
}
/// Returns the [`PeerId`] used in the network.
pub fn peer_id(&self) -> &PeerId {
&self.inner.local_peer_id
}
fn manager(&self) -> &UnboundedSender<NetworkHandleMessage> {
&self.inner.to_manager_tx
}
/// Creates a new [`NetworkEvent`] listener channel.
pub fn event_listener(&self) -> mpsc::UnboundedReceiver<NetworkEvent> {
pub fn event_listener(&self) -> UnboundedReceiverStream<NetworkEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.manager().send(NetworkHandleMessage::EventListener(tx));
rx
UnboundedReceiverStream::new(rx)
}
/// Returns the mode of the network, either pow, or pos
@@ -61,8 +80,19 @@ impl NetworkHandle {
let _ = self.inner.to_manager_tx.send(msg);
}
/// Sends a message to the [`NetworkManager`] to add a peer to the known set
pub fn add_peer(&self, peer: PeerId, addr: SocketAddr) {
let _ = self.inner.to_manager_tx.send(NetworkHandleMessage::AddPeerAddress(peer, addr));
}
/// Sends a message to the [`NetworkManager`] to disconnect an existing connection to the given
/// peer.
pub fn disconnect_peer(&self, peer: PeerId) {
let _ = self.inner.to_manager_tx.send(NetworkHandleMessage::DisconnectPeer(peer));
}
/// Sends a [`PeerRequest`] to the given peer's session.
pub fn send_request(&mut self, peer_id: PeerId, request: PeerRequest) {
pub fn send_request(&self, peer_id: PeerId, request: PeerRequest) {
self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
}
}
@@ -85,6 +115,10 @@ struct NetworkInner {
/// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager).
#[allow(missing_docs)]
pub(crate) enum NetworkHandleMessage {
/// Adds an address for a peer.
AddPeerAddress(PeerId, SocketAddr),
/// Disconnect a connection to a peer if it exists.
DisconnectPeer(PeerId),
/// Add a new listener for [`NetworkEvent`].
EventListener(UnboundedSender<NetworkEvent>),
/// Broadcast event to announce a new block to all nodes.

View File

@@ -216,7 +216,7 @@ impl ActiveSession {
self.queued_outgoing.push_back(msg.into());
}
Err(err) => {
error!(?err, target = "net", "Failed to respond to received request");
error!(target : "net", ?err, "Failed to respond to received request");
}
}
}
@@ -225,8 +225,8 @@ impl ActiveSession {
fn emit_message(&self, message: PeerMessage) {
let _ = self.try_emit_message(message).map_err(|err| {
warn!(
target : "net",
%err,
target = "net",
"dropping incoming message",
);
});

View File

@@ -220,8 +220,8 @@ impl SessionManager {
return match event {
ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
trace!(
target : "net::session",
?peer_id,
target = "net::session",
"gracefully disconnected active session."
);
let _ = self.active_sessions.remove(&peer_id);
@@ -232,7 +232,7 @@ impl SessionManager {
remote_addr,
error,
} => {
trace!(?peer_id, ?error, target = "net::session", "closed session.");
trace!(target : "net::session", ?peer_id, ?error,"closed session.");
let _ = self.active_sessions.remove(&peer_id);
Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
remote_addr,
@@ -260,9 +260,9 @@ impl SessionManager {
match event {
PendingSessionEvent::SuccessfulHandshake { remote_addr, session_id } => {
trace!(
target : "net::session",
?session_id,
?remote_addr,
target = "net::session",
"successful handshake"
);
}
@@ -321,9 +321,9 @@ impl SessionManager {
}
PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
trace!(
target : "net::session",
?session_id,
?remote_addr,
target = "net::session",
"disconnected pending session"
);
let _ = self.pending_sessions.remove(&session_id);
@@ -350,11 +350,11 @@ impl SessionManager {
error,
} => {
trace!(
target : "net::session",
?error,
?session_id,
?remote_addr,
?peer_id,
target = "net::session",
"connection refused"
);
let _ = self.pending_sessions.remove(&session_id);
@@ -366,10 +366,10 @@ impl SessionManager {
PendingSessionEvent::EciesAuthError { remote_addr, session_id, error } => {
let _ = self.pending_sessions.remove(&session_id);
warn!(
target : "net::session",
?error,
?session_id,
?remote_addr,
target = "net::session",
"ecies auth failed"
);
let _ = self.pending_sessions.remove(&session_id);

View File

@@ -80,6 +80,11 @@ where
}
}
/// How many peers we're currently connected to.
pub fn num_connected_peers(&self) -> usize {
self.connected_peers.len()
}
/// Event hook for an activated session for the peer.
///
/// Returns `Ok` if the session is valid, returns an `Err` if the session is not accepted and
@@ -92,7 +97,7 @@ where
request_tx: PeerRequestSender,
) -> Result<(), AddSessionError> {
// TODO add capacity check
debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible");
debug_assert!(!self.connected_peers.contains_key(&peer), "Already connected; not possible");
// find the corresponding block number
let block_number =
@@ -210,6 +215,11 @@ where
}
}
/// Adds a peer and its address to the peerset.
pub(crate) fn add_peer_address(&mut self, peer_id: PeerId, addr: SocketAddr) {
self.peers_manager.add_discovered_node(peer_id, addr)
}
/// Event hook for events received from the discovery service.
fn on_discovery_event(&mut self, event: DiscoveryEvent) {
match event {
@@ -318,8 +328,8 @@ where
match response.poll(cx) {
Poll::Ready(Err(_)) => {
trace!(
target : "net",
?id,
target = "net",
"Request canceled, response channel closed."
);
disconnect_sessions.push(*id);

View File

@@ -53,11 +53,20 @@ where
Self { incoming, sessions, state }
}
/// Access to the state.
pub(crate) fn state(&self) -> &NetworkState<C> {
&self.state
}
/// Mutable access to the state.
pub(crate) fn state_mut(&mut self) -> &mut NetworkState<C> {
&mut self.state
}
/// Access to the [`ConnectionListener`].
pub(crate) fn listener(&self) -> &ConnectionListener {
&self.incoming
}
/// Mutable access to the [`SessionManager`].
pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager {
&mut self.sessions

View File

@@ -110,7 +110,7 @@ where
Self {
pool,
network,
network_events: UnboundedReceiverStream::new(network_events),
network_events,
inflight_requests: Default::default(),
transactions_by_peers: Default::default(),
pool_imports: Default::default(),

View File

@@ -0,0 +1,48 @@
//! Connection tests
use super::testnet::Testnet;
use futures::StreamExt;
use reth_network::NetworkEvent;
use std::collections::HashSet;
#[tokio::test(flavor = "multi_thread")]
async fn test_establish_connections() {
let net = Testnet::create(3).await;
net.for_each(|peer| assert_eq!(0, peer.num_peers()));
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
let handle2 = handles.next().unwrap();
drop(handles);
let handle = net.spawn();
let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
handle0.add_peer(*handle2.peer_id(), handle2.local_addr());
let mut expected_connections = HashSet::from([*handle1.peer_id(), *handle2.peer_id()]);
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionClosed { .. } => {
panic!("unexpected event")
}
NetworkEvent::SessionEstablished { peer_id, .. } => {
assert!(expected_connections.remove(&peer_id))
}
}
}
assert!(expected_connections.is_empty());
let net = handle.terminate().await;
net.for_each(|peer| {
assert!(peer.num_peers() >= 1);
});
}

View File

@@ -0,0 +1,5 @@
mod connect;
mod testnet;
pub use testnet::*;
fn main() {}

View File

@@ -0,0 +1,213 @@
//! A network implementation for testing purposes.
use futures::FutureExt;
use pin_project::pin_project;
use reth_interfaces::{provider::BlockProvider, test_utils::TestApi};
use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager};
use secp256k1::SecretKey;
use std::{
fmt,
future::Future,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{sync::oneshot, task::JoinHandle};
/// A test network consisting of multiple peers.
#[derive(Default)]
pub struct Testnet<C> {
/// All running peers in the network.
peers: Vec<Peer<C>>,
}
// === impl Testnet ===
impl<C> Testnet<C>
where
C: BlockProvider,
{
pub fn peers_mut(&mut self) -> &mut [Peer<C>] {
&mut self.peers
}
pub fn peers(&self) -> &[Peer<C>] {
&self.peers
}
pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C>> + '_ {
self.peers.iter_mut()
}
pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C>> + '_ {
self.peers.iter()
}
pub async fn add_peer_with_config(
&mut self,
config: PeerConfig<C>,
) -> Result<(), NetworkError> {
let PeerConfig { config, client, secret_key } = config;
let network = NetworkManager::new(config).await?;
let peer = Peer { network, client, secret_key };
self.peers.push(peer);
Ok(())
}
/// Returns all handles to the networks
pub fn handles(&self) -> impl Iterator<Item = NetworkHandle> + '_ {
self.peers.iter().map(|p| p.handle())
}
/// Apply a closure on each peer
pub fn for_each<F>(&self, f: F)
where
F: Fn(&Peer<C>),
{
self.peers.iter().for_each(f)
}
}
impl<C> Testnet<C>
where
C: BlockProvider + 'static,
{
/// Spawns the testnet to a separate task
pub fn spawn(self) -> TestnetHandle<C> {
let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
let mut net = self;
let handle = tokio::task::spawn(async move {
let mut tx = None;
loop {
tokio::select! {
_ = &mut net => { break}
inc = rx => {
tx = inc.ok();
break
}
}
}
if let Some(tx) = tx {
let _ = tx.send(net);
}
});
TestnetHandle { _handle: handle, terminate: tx }
}
}
impl Testnet<TestApi> {
/// Same as [`Self::try_create`] but panics on error
pub async fn create(num_peers: usize) -> Self {
Self::try_create(num_peers).await.unwrap()
}
/// Creates a new [`Testnet`] with the given number of peers
pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
let mut this = Testnet::default();
for _ in 0..num_peers {
this.add_peer().await?;
}
Ok(this)
}
pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
self.add_peer_with_config(Default::default()).await
}
}
impl<C> fmt::Debug for Testnet<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Testnet {{}}").finish_non_exhaustive()
}
}
impl<C> Future for Testnet<C>
where
C: BlockProvider,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
for peer in this.peers.iter_mut() {
let _ = peer.poll_unpin(cx);
}
Poll::Pending
}
}
pub struct TestnetHandle<C> {
_handle: JoinHandle<()>,
terminate: oneshot::Sender<oneshot::Sender<Testnet<C>>>,
}
// === impl TestnetHandle ===
impl<C> TestnetHandle<C> {
/// Terminates the task and returns the [`Testnet`] back.
pub async fn terminate(self) -> Testnet<C> {
let (tx, rx) = oneshot::channel();
self.terminate.send(tx).unwrap();
rx.await.unwrap()
}
}
#[pin_project]
pub struct Peer<C> {
#[pin]
network: NetworkManager<C>,
client: Arc<C>,
secret_key: SecretKey,
}
// === impl Peer ===
impl<C> Peer<C>
where
C: BlockProvider,
{
pub fn num_peers(&self) -> usize {
self.network.num_connected_peers()
}
/// The address that listens for incoming connections.
pub fn local_addr(&self) -> SocketAddr {
self.network.local_addr()
}
pub fn handle(&self) -> NetworkHandle {
self.network.handle().clone()
}
}
impl<C> Future for Peer<C>
where
C: BlockProvider,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().network.poll(cx)
}
}
pub struct PeerConfig<C = TestApi> {
config: NetworkConfig<C>,
client: Arc<C>,
secret_key: SecretKey,
}
impl Default for PeerConfig {
fn default() -> Self {
let client = Arc::new(TestApi::default());
let secret_key = SecretKey::new(&mut rand::thread_rng());
let config = NetworkConfig::builder(Arc::clone(&client), secret_key)
.listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.build();
Self { config, client, secret_key }
}
}