From 182efa4b46f185a1c083efea783075c1fb4a949d Mon Sep 17 00:00:00 2001 From: draoi Date: Sun, 7 Apr 2024 14:33:32 +0200 Subject: [PATCH] p2p_test: slightly more expansive testing --- src/net/tests.rs | 399 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 303 insertions(+), 96 deletions(-) diff --git a/src/net/tests.rs b/src/net/tests.rs index 4b91e3dd5..3da3c654f 100644 --- a/src/net/tests.rs +++ b/src/net/tests.rs @@ -1,4 +1,4 @@ -/* This file is part of DarkFi (https://dark.fi) +/* This file is part 10f DarkFi (https://dark.fi) * * Copyright (C) 2020-2024 Dyne.org foundation * @@ -20,8 +20,8 @@ use std::{collections::HashSet, sync::Arc}; -use log::{debug, info, warn}; -use rand::Rng; +use log::{info, warn}; +use rand::{prelude::SliceRandom, rngs::ThreadRng, Rng}; use smol::{channel, future, Executor}; use url::Url; @@ -31,32 +31,27 @@ use crate::{ }; // Number of nodes to spawn and number of peers each node connects to -const N_NODES: usize = 10; -const N_CONNS: usize = 5; +const N_NODES: usize = 5; +//const N_CONNS: usize = 5; +const SEED: &str = "tcp://127.0.0.1:51505"; -#[test] -fn p2p_test() { +fn init_logger() { let mut cfg = simplelog::ConfigBuilder::new(); - cfg.add_filter_ignore("sled".to_string()); cfg.add_filter_ignore("net::protocol_ping".to_string()); cfg.add_filter_ignore("net::channel::subscribe_stop()".to_string()); cfg.add_filter_ignore("net::hosts".to_string()); - cfg.add_filter_ignore("net::inbound_session".to_string()); - cfg.add_filter_ignore("net::outbound_session".to_string()); cfg.add_filter_ignore("net::session".to_string()); - cfg.add_filter_ignore("net::refinery".to_string()); + cfg.add_filter_ignore("net::outbound_session".to_string()); + cfg.add_filter_ignore("net::inbound_session".to_string()); cfg.add_filter_ignore("net::message_subscriber".to_string()); cfg.add_filter_ignore("net::protocol_address".to_string()); - cfg.add_filter_ignore("net::protocol_jobs_manager".to_string()); cfg.add_filter_ignore("net::protocol_version".to_string()); cfg.add_filter_ignore("net::protocol_registry".to_string()); - cfg.add_filter_ignore("net::protocol_seed".to_string()); - cfg.add_filter_ignore("net::channel".to_string()); - cfg.add_filter_ignore("net::p2p::seed".to_string()); - cfg.add_filter_ignore("net::p2p::start".to_string()); + cfg.add_filter_ignore("net::protocol_jobs_manager".to_string()); cfg.add_filter_ignore("net::channel::send()".to_string()); cfg.add_filter_ignore("net::channel::start()".to_string()); + cfg.add_filter_ignore("net::channel::handle_stop()".to_string()); cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string()); cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string()); cfg.add_filter_ignore("net::tcp".to_string()); @@ -64,8 +59,8 @@ fn p2p_test() { // We check this error so we can execute same file tests in parallel, // otherwise second one fails to init logger here. if simplelog::TermLogger::init( - //simplelog::LevelFilter::Info, - simplelog::LevelFilter::Debug, + simplelog::LevelFilter::Info, + //simplelog::LevelFilter::Debug, //simplelog::LevelFilter::Trace, cfg.build(), simplelog::TerminalMode::Mixed, @@ -73,107 +68,319 @@ fn p2p_test() { ) .is_err() { - warn!(target: "net::test", "Logger already initialized"); + warn!(target: "test_harness", "Logger already initialized"); } - - let ex = Arc::new(Executor::new()); - let ex_ = ex.clone(); - let (signal, shutdown) = channel::unbounded::<()>(); - - // Run a thread for each node. - easy_parallel::Parallel::new() - .each(0..N_NODES, |_| future::block_on(ex.run(shutdown.recv()))) - .finish(|| { - future::block_on(async { - hostlist_propagation(ex_).await; - drop(signal); - }) - }); } -async fn hostlist_propagation(ex: Arc>) { - let seed_addr = Url::parse(&format!("tcp://127.0.0.1:{}", 51505)).unwrap(); - - let mut p2p_instances = vec![]; - let mut rng = rand::thread_rng(); - +async fn spawn_node( + inbound_addrs: Vec, + external_addrs: Vec, + peers: Vec, + seeds: Vec, + node_id: String, + ex: Arc>, +) -> Arc { let settings = Settings { localnet: true, - inbound_addrs: vec![seed_addr.clone()], - external_addrs: vec![seed_addr.clone()], - outbound_connections: 0, - greylist_refinery_interval: 2, + inbound_addrs, + external_addrs, + outbound_connections: 2, + outbound_peer_discovery_cooloff_time: 2, + //outbound_connect_timeout: 2, inbound_connections: usize::MAX, - seeds: vec![], - peers: vec![], + greylist_refinery_interval: 15, + peers, + seeds, + node_id, allowed_transports: vec!["tcp".to_string()], - node_id: "seed".to_string(), ..Default::default() }; let p2p = P2p::new(settings, ex.clone()).await; - p2p_instances.push(p2p); - info!("Initializing outbound nodes"); + p2p +} + +async fn spawn_seed_session(starting_port: usize, ex: Arc>) -> Vec> { + let mut p2p_instances = vec![]; + let seed_addr = Url::parse(SEED).unwrap(); + info!("========================================================"); + info!("Initializing outbound nodes..."); + info!("========================================================"); for i in 0..N_NODES { - // Everyone will connect to N_CONNS random peers. - let mut peers = vec![]; - for _ in 0..N_CONNS { - let mut port = 53200 + i; - while port == 53200 + i { - port = 53200 + rng.gen_range(0..N_NODES); - } - peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap()); - } - let settings = Settings { - localnet: true, - inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 53200 + i)).unwrap()], - external_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 53200 + i)).unwrap()], - outbound_connections: 8, - inbound_connections: usize::MAX, - seeds: vec![seed_addr.clone()], - peers, - allowed_transports: vec!["tcp".to_string()], - node_id: i.to_string(), - ..Default::default() - }; - - let p2p = P2p::new(settings, ex.clone()).await; + let p2p = spawn_node( + vec![Url::parse(&format!("tcp://127.0.0.1:{}", starting_port + i)).unwrap()], + vec![Url::parse(&format!("tcp://127.0.0.1:{}", starting_port + i)).unwrap()], + vec![], + vec![seed_addr.clone()], + (starting_port + i).to_string(), + ex.clone(), + ) + .await; p2p_instances.push(p2p); } + + // Start the P2P network + for p2p in p2p_instances.iter() { + info!("========================================================"); + info!("Starting node={}", p2p.settings().external_addrs[0]); + info!("========================================================"); + p2p.clone().start().await.unwrap(); + } + + p2p_instances +} + +/*async fn spawn_manual_session( + peer_indexes: &[usize], + starting_port: usize, + rng: &mut ThreadRng, + ex: Arc>, +) -> Vec> { + let mut p2p_instances = vec![]; + + // Initialize the nodes + for i in 0..N_NODES { + // Everyone will connect to N_CONNS random peers. + let mut peer_indexes_copy = peer_indexes.to_owned(); + peer_indexes_copy.remove(i); + let peer_indexes_to_connect: Vec<_> = + peer_indexes_copy.choose_multiple(rng, N_CONNS).collect(); + + let mut peers = vec![]; + for peer_index in peer_indexes_to_connect { + let port = starting_port + peer_index; + peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap()); + } + + let p2p = spawn_node( + vec![Url::parse(&format!("tcp://127.0.0.1:{}", starting_port + i)).unwrap()], + vec![Url::parse(&format!("tcp://127.0.0.1:{}", starting_port + i)).unwrap()], + vec![], + peers, + (starting_port + i).to_string(), + ex.clone(), + ) + .await; + + p2p_instances.push(p2p); + } + // Start the P2P network for p2p in p2p_instances.iter() { p2p.clone().start().await.unwrap(); } - info!("Waiting until all peers connect"); - sleep(15).await; + info!("Waiting 5s until all peers connect"); + sleep(5).await; - for p2p in p2p_instances.iter() { - let hosts = p2p.hosts(); + p2p_instances +}*/ - // Ensure there are no duplicate entries in and between any of the hostlists. - let mut urls = HashSet::new(); - let greylist = hosts.container.fetch_all(HostColor::Grey).await; - for (url, _) in greylist { - assert!(urls.insert(url)); - } - let whitelist = hosts.container.fetch_all(HostColor::White).await; - for (url, _) in whitelist { - assert!(urls.insert(url)); - } - let anchorlist = hosts.container.fetch_all(HostColor::Gold).await; - for (url, _) in anchorlist { - assert!(urls.insert(url)); - } - // We should have some peers at this point. - assert!(!urls.is_empty()); +/*async fn assert_hostlist_not_empty( + p2p_instances: &Vec>, + rng: &mut ThreadRng, + color: HostColor, +) { + let random_node = p2p_instances.choose(rng).unwrap(); + assert!(!random_node.hosts().container.is_empty(color).await); +}*/ + +/*async fn assert_entry_exists( + p2p_instances: &Vec>, + rng: &mut ThreadRng, + color: HostColor, + entry: &Url, +) { + let mut urls = HashSet::new(); + let random_node = p2p_instances.choose(rng).unwrap(); + let external_addr = &random_node.settings().external_addrs[0]; + + info!("Checking {} entry exists on {:?} list node={}", entry, color, external_addr); + assert!(random_node.hosts().container.contains(color as usize, entry).await); +}*/ + +async fn get_random_gold_host(p2p_instances: &Vec>, index: usize) -> ((Url, u64), usize) { + let random_node = &p2p_instances[index]; + let external_addr = &random_node.settings().external_addrs[0]; + + info!("========================================================"); + info!("Getting gold addr from node={}", external_addr); + info!("========================================================"); + + random_node.hosts().container.fetch_random(HostColor::Gold).await +} + +async fn check_random_hostlist(p2p_instances: &Vec>, rng: &mut ThreadRng) { + let mut urls = HashSet::new(); + let random_node = p2p_instances.choose(rng).unwrap(); + let external_addr = &random_node.settings().external_addrs[0]; + + info!("========================================================"); + info!("Checking node={}", external_addr); + info!("========================================================"); + + let greylist = random_node.hosts().container.fetch_all(HostColor::Grey).await; + let whitelist = random_node.hosts().container.fetch_all(HostColor::White).await; + let goldlist = random_node.hosts().container.fetch_all(HostColor::Gold).await; + + for (url, _) in greylist { + assert!(urls.insert(url)); } + for (url, _) in whitelist { + assert!(urls.insert(url)); + } + for (url, _) in goldlist { + assert!(urls.insert(url)); + } + assert!(!urls.is_empty()); +} - // Stop the P2P network - for p2p in p2p_instances.iter() { - debug!("Stopping P2P instances..."); - p2p.clone().stop().await; - debug!("Node {} stopped!", p2p.settings().node_id); +async fn kill_node(p2p_instances: &Vec>, node: Url) { + for p2p in p2p_instances { + if p2p.settings().external_addrs[0] == node { + info!("========================================================"); + info!("Shutting down node: {}", p2p.settings().external_addrs[0]); + info!("========================================================"); + p2p.stop().await; + } + } +} + +macro_rules! test_body { + ($real_call:ident) => { + init_logger(); + + let ex = Arc::new(Executor::new()); + let ex_ = ex.clone(); + let (signal, shutdown) = channel::unbounded::<()>(); + + // Run a thread for each node. + easy_parallel::Parallel::new() + .each(0..N_NODES, |_| future::block_on(ex.run(shutdown.recv()))) + .finish(|| { + future::block_on(async { + $real_call(ex_).await; + drop(signal); + }) + }); + }; +} + +#[test] +fn p2p_test() { + test_body!(p2p_test_real); +} + +async fn p2p_test_real(ex: Arc>) { + let mut rng = rand::thread_rng(); + // ============================================================ + // 1. Create a new seed node. + // ============================================================ + //let peer_indexes: Vec = (0..N_NODES).collect(); + let seed_addr = Url::parse(SEED).unwrap(); + + let settings = Settings { + localnet: true, + inbound_addrs: vec![seed_addr.clone()], + outbound_connections: 0, + inbound_connections: usize::MAX, + seeds: vec![], + peers: vec![], + allowed_transports: vec!["tcp".to_string()], + greylist_refinery_interval: 12, + node_id: "seed".to_string(), + ..Default::default() + }; + + let seed = P2p::new(settings, ex.clone()).await; + info!("========================================================"); + info!("Starting seed node on {}", SEED); + info!("========================================================"); + seed.clone().start().await.unwrap(); + + // ============================================================ + // 2. Spawn outbound nodes that will connect to the seed node. + // ============================================================ + let p2p_instances = spawn_seed_session(43200, ex.clone()).await; + + info!("========================================================"); + info!("Waiting 10s for all peers to reach the seed node"); + info!("========================================================"); + sleep(10).await; + + // =========================================================== + // 3. Assert that all nodes have shared their external addr + // with the seed node. + // =========================================================== + let greylist = seed.hosts().container.fetch_all(HostColor::Grey).await; + assert!(greylist.len() == N_NODES); + info!("========================================================"); + info!("Seedsync session successful!"); + info!("========================================================"); + + info!("========================================================"); + info!("Waiting 5s for seed node refinery to kick in..."); + info!("========================================================"); + sleep(5).await; + + // =========================================================== + // 4. Assert that seed node has at least one whitelist entry, + // indicating that the refinery process is happening correctly. + // =========================================================== + assert!(!seed.hosts().container.is_empty(HostColor::White).await); + info!("========================================================"); + info!("Seed node refinery operating successfully!"); + info!("========================================================"); + + info!("========================================================"); + info!("Waiting 5s for peers to propagate..."); + info!("========================================================"); + sleep(5).await; + + // =========================================================== + // 5. Select a random peer and ensure that its hostlist is not + // empty. This ensures the seed node is sharing whitelisted + // nodes around the network. + // =========================================================== + check_random_hostlist(&p2p_instances, &mut rng).await; + info!("========================================================"); + info!("Peer successfully received addrs!"); + info!("========================================================"); + + // =========================================================== + // 6. Select a random gold peer from one of the nodes and kill + // it. + // =========================================================== + info!("========================================================"); + info!("Selecting a random gold entry..."); + info!("========================================================"); + + let random_node_index = rand::thread_rng().gen_range(0..p2p_instances.len()); + let ((addr, _), _) = get_random_gold_host(&p2p_instances, random_node_index).await; + + kill_node(&p2p_instances, addr.clone()).await; + + info!("========================================================"); + info!("Waiting for greylist downgrade sequence to occur..."); + info!("========================================================"); + + // =========================================================== + // 7. Verify the peer has been removed from the Gold list. + // =========================================================== + p2p_instances[random_node_index] + .hosts() + .container + .contains(HostColor::Grey as usize, &addr) + .await; + info!("========================================================"); + info!("Greylist downgrade occured successfully!"); + info!("========================================================"); + + // =========================================================== + // 8. Stop the P2P network + // =========================================================== + for p2p in p2p_instances.iter() { + p2p.clone().stop().await; } }