mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-10 15:17:57 -05:00
example/p2pdebug: using net2 for debugging
This commit is contained in:
@@ -9,7 +9,7 @@ edition = "2021"
|
||||
[workspace]
|
||||
|
||||
[dependencies]
|
||||
darkfi = {path = "../../", features = ["net"]}
|
||||
darkfi = {path = "../../", features = ["net2"]}
|
||||
# Async
|
||||
smol = "1.2.5"
|
||||
futures = "0.3.21"
|
||||
@@ -27,6 +27,7 @@ clap = {version = "3.1.8", features = ["derive"]}
|
||||
log = "0.4.16"
|
||||
simplelog = "0.12.0-alpha1"
|
||||
fxhash = "0.2.1"
|
||||
url = "2.2.2"
|
||||
|
||||
# Encoding and parsing
|
||||
serde_json = "1.0.79"
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_executor::Executor;
|
||||
use clap::Parser;
|
||||
use easy_parallel::Parallel;
|
||||
use simplelog::{ColorChoice, TermLogger, TerminalMode};
|
||||
|
||||
use url::Url;
|
||||
use rand::{rngs::OsRng, Rng, RngCore};
|
||||
|
||||
use darkfi::net2 as net;
|
||||
use darkfi::{
|
||||
cli_desc, net,
|
||||
cli_desc,
|
||||
util::{cli::log_config, sleep},
|
||||
Result,
|
||||
};
|
||||
@@ -44,30 +45,29 @@ enum State {
|
||||
struct MockP2p {
|
||||
node_number: u8,
|
||||
state: State,
|
||||
p2p: net::P2pPtr,
|
||||
broadcast: bool,
|
||||
address: Option<SocketAddr>,
|
||||
address: Option<Url>,
|
||||
}
|
||||
|
||||
impl MockP2p {
|
||||
async fn new(node_number: u8, _broadcast: bool) -> Result<Self> {
|
||||
let seed_addrs: Vec<SocketAddr> = vec![
|
||||
"127.0.0.1:11001".parse()?,
|
||||
"127.0.0.1:11002".parse()?,
|
||||
"127.0.0.1:11003".parse()?,
|
||||
async fn new(node_number: u8, _broadcast: bool) -> Result<(net::P2pPtr<impl net::Transport>, Self)> {
|
||||
let seed_addrs: Vec<Url> = vec![
|
||||
Url::parse("tcp://127.0.0.1:11001")?,
|
||||
Url::parse("tcp://127.0.0.1:11002")?,
|
||||
Url::parse("tcp://127.0.0.1:11003")?,
|
||||
];
|
||||
|
||||
let state: State;
|
||||
let address: Option<SocketAddr>;
|
||||
let address: Option<Url>;
|
||||
|
||||
let mut broadcast = _broadcast;
|
||||
|
||||
let p2p = match node_number {
|
||||
0..=2 => {
|
||||
address = Some(seed_addrs[node_number as usize]);
|
||||
address = Some(seed_addrs[node_number as usize].clone());
|
||||
|
||||
let net_settings = net::Settings { inbound: address, ..Default::default() };
|
||||
let p2p = net::P2p::new(net_settings).await;
|
||||
let net_settings = net::Settings { inbound: address.clone(), ..Default::default() };
|
||||
let p2p = net::P2p::<net::TcpTransport>::new(net_settings).await;
|
||||
|
||||
broadcast = false;
|
||||
state = State::Seed;
|
||||
@@ -76,16 +76,16 @@ impl MockP2p {
|
||||
}
|
||||
3..=20 => {
|
||||
let random_port: u32 = rand::thread_rng().gen_range(11007..49151);
|
||||
address = Some(format!("127.0.0.1:{}", random_port).parse()?);
|
||||
address = Some(format!("tcp://127.0.0.1:{}", random_port).parse()?);
|
||||
|
||||
let net_settings = net::Settings {
|
||||
inbound: address,
|
||||
external_addr: address,
|
||||
inbound: address.clone(),
|
||||
external_addr: address.clone(),
|
||||
seeds: seed_addrs,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let p2p = net::P2p::new(net_settings).await;
|
||||
let p2p = net::P2p::<net::TcpTransport>::new(net_settings).await;
|
||||
|
||||
state = State::Inbound;
|
||||
|
||||
@@ -100,7 +100,7 @@ impl MockP2p {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let p2p = net::P2p::new(net_settings).await;
|
||||
let p2p = net::P2p::<net::TcpTransport>::new(net_settings).await;
|
||||
state = State::Outbound;
|
||||
|
||||
p2p
|
||||
@@ -109,14 +109,13 @@ impl MockP2p {
|
||||
|
||||
println!("start {:?} node #{} address {:?}", state, node_number, address);
|
||||
|
||||
Ok(Self { node_number, state, p2p, broadcast, address })
|
||||
Ok((p2p, Self { node_number, state, broadcast, address }))
|
||||
}
|
||||
|
||||
async fn run(&self, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
let p2p = self.p2p.clone();
|
||||
async fn run(&self, p2p: net::P2pPtr<impl net::Transport>, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
let state = self.state.clone();
|
||||
let node_number = self.node_number;
|
||||
let address = self.address;
|
||||
let address = self.address.clone();
|
||||
|
||||
let (sender, receiver) = async_channel::unbounded();
|
||||
let sender_clone = sender.clone();
|
||||
@@ -126,13 +125,14 @@ impl MockP2p {
|
||||
|
||||
let registry = p2p.protocol_registry();
|
||||
registry
|
||||
.register(net::SESSION_ALL, move |channel, p2p| {
|
||||
.register(!net::SESSION_SEED, move |channel, p2p| {
|
||||
let sender = sender_clone.clone();
|
||||
let seen_debugmsg_ids = seen_debugmsg_ids_clone.clone();
|
||||
async move { ProtocolDebugmsg::init(channel, sender, seen_debugmsg_ids, p2p).await }
|
||||
})
|
||||
.await;
|
||||
|
||||
let address_cloned = address.clone();
|
||||
if self.broadcast {
|
||||
println!("start broadcast {:?} node #{} address {:?}", state, node_number, address);
|
||||
let sleep_time = 10;
|
||||
@@ -171,7 +171,7 @@ impl MockP2p {
|
||||
let msg = receiver.recv().await.unwrap();
|
||||
println!(
|
||||
"receive {:?} {:?} node #{} address {:?}",
|
||||
msg, state, node_number, address
|
||||
msg, state, node_number, address_cloned
|
||||
);
|
||||
seen_debugmsg_ids_clone.add_seen(msg.id).await;
|
||||
}
|
||||
@@ -184,8 +184,8 @@ impl MockP2p {
|
||||
}
|
||||
|
||||
async fn start(executor: Arc<Executor<'_>>, args: Args) -> Result<()> {
|
||||
let mock_p2p = MockP2p::new(args.node, args.broadcast).await?;
|
||||
mock_p2p.run(executor).await
|
||||
let (p2p, mock_p2p) = MockP2p::new(args.node, args.broadcast).await?;
|
||||
mock_p2p.run(p2p.clone(), executor).await
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
|
||||
@@ -8,7 +8,7 @@ use fxhash::FxHashSet;
|
||||
use log::debug;
|
||||
|
||||
use darkfi::{
|
||||
net,
|
||||
net2 as net,
|
||||
util::serial::{SerialDecodable, SerialEncodable},
|
||||
Result,
|
||||
};
|
||||
@@ -47,16 +47,16 @@ impl SeenDebugmsgIds {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ProtocolDebugmsg {
|
||||
pub struct ProtocolDebugmsg<T: net::Transport> {
|
||||
notify_queue_sender: Sender<Arc<Debugmsg>>,
|
||||
debugmsg_sub: net::MessageSubscription<Debugmsg>,
|
||||
jobsman: net::ProtocolJobsManagerPtr,
|
||||
jobsman: net::ProtocolJobsManagerPtr<T>,
|
||||
seen_ids: SeenDebugmsgIdsPtr,
|
||||
p2p: net::P2pPtr,
|
||||
p2p: net::P2pPtr<T>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl net::ProtocolBase for ProtocolDebugmsg {
|
||||
impl<T: net::Transport> net::ProtocolBase for ProtocolDebugmsg<T> {
|
||||
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
|
||||
/// protocol task manager, then queues the reply. Sends out a ping and
|
||||
/// waits for pong reply. Waits for ping and replies with a pong.
|
||||
@@ -73,12 +73,12 @@ impl net::ProtocolBase for ProtocolDebugmsg {
|
||||
}
|
||||
}
|
||||
|
||||
impl ProtocolDebugmsg {
|
||||
impl<T: net::Transport> ProtocolDebugmsg<T> {
|
||||
pub async fn init(
|
||||
channel: net::ChannelPtr,
|
||||
channel: net::ChannelPtr<T>,
|
||||
notify_queue_sender: Sender<Arc<Debugmsg>>,
|
||||
seen_ids: SeenDebugmsgIdsPtr,
|
||||
p2p: net::P2pPtr,
|
||||
p2p: net::P2pPtr<T>,
|
||||
) -> net::ProtocolBasePtr {
|
||||
let message_subsystem = channel.get_message_subsystem();
|
||||
message_subsystem.add_dispatch::<Debugmsg>().await;
|
||||
|
||||
Reference in New Issue
Block a user