diff --git a/bin/dhtd/dhtd/src/proto.rs b/bin/dhtd/dhtd/src/proto.rs index 07e5b1fa2..d5b6150e3 100644 --- a/bin/dhtd/dhtd/src/proto.rs +++ b/bin/dhtd/dhtd/src/proto.rs @@ -22,8 +22,9 @@ use async_std::sync::Arc; use async_trait::async_trait; use darkfi::{ dht2::net_hashmap::{NetHashMapInsert, NetHashMapRemove}, + impl_p2p_message, net::{ - self, ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr, }, Result, @@ -52,11 +53,7 @@ pub struct ChunkRequest { pub hash: blake3::Hash, } -impl net::Message for ChunkRequest { - fn name() -> &'static str { - "dhtchunkrequest" - } -} +impl_p2p_message!(ChunkRequest, "dhtchunkrequest"); #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct ChunkReply { @@ -64,22 +61,14 @@ pub struct ChunkReply { pub data: Vec, } -impl net::Message for ChunkReply { - fn name() -> &'static str { - "dhtchunkreply" - } -} +impl_p2p_message!(ChunkReply, "dhtchunkreply"); #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FileRequest { pub hash: blake3::Hash, } -impl net::Message for FileRequest { - fn name() -> &'static str { - "dhtfilerequest" - } -} +impl_p2p_message!(FileRequest, "dhtfilerequest"); #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FileReply { @@ -87,16 +76,12 @@ pub struct FileReply { pub chunks: Vec, } -impl net::Message for FileReply { - fn name() -> &'static str { - "dhtfilereply" - } -} +impl_p2p_message!(FileReply, "dhtfilereply"); impl ProtocolDht { #[allow(dead_code)] pub async fn init(channel: ChannelPtr, p2p: P2pPtr, state: DhtdPtr) -> Result { - let msg_subsystem = channel.get_message_subsystem(); + let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::>>().await; msg_subsystem.add_dispatch::>().await; msg_subsystem.add_dispatch::().await; @@ -137,7 +122,7 @@ impl ProtocolDht { state.routing_table.entry(msg.k).or_insert_with(HashSet::new); let hashset = state.routing_table.get_mut(&msg.k).unwrap(); - hashset.insert(self.channel.address()); + hashset.insert(self.channel.address().clone()); } } diff --git a/bin/dhtd/dhtd/src/tests.rs b/bin/dhtd/dhtd/src/tests.rs index 1237abe83..865936bca 100644 --- a/bin/dhtd/dhtd/src/tests.rs +++ b/bin/dhtd/dhtd/src/tests.rs @@ -25,7 +25,7 @@ use async_std::{ }; use darkfi::{ dht2::{Dht, MAX_CHUNK_SIZE}, - net::{self, transport::TransportName, P2p}, + net::{self, P2p}, util::async_util::{msleep, sleep}, Result, }; @@ -52,9 +52,9 @@ async fn dht_remote_get_insert_real(ex: Arc>) -> Result<()> { drop(listener); let settings = net::Settings { - inbound: vec![url.clone()], + inbound_addrs: vec![url.clone()], peers: addrs.clone(), - outbound_transports: vec![TransportName::Tcp(None)], + allowed_transports: vec!["tcp".into()], localnet: true, ..Default::default() }; @@ -88,7 +88,6 @@ async fn dht_remote_get_insert_real(ex: Arc>) -> Result<()> { dhtds.push(dhtd); - p2p.wait_for_outbound(ex.clone()).await?; sleep(1).await; } diff --git a/bin/fud/fu/src/main.rs b/bin/fud/fu/src/main.rs index 65a2a888c..c380b01c2 100644 --- a/bin/fud/fu/src/main.rs +++ b/bin/fud/fu/src/main.rs @@ -134,7 +134,7 @@ async fn main() -> Result<()> { let args = Args::parse(); let log_level = get_log_level(args.verbose.into()); - let log_config = get_log_config(); + let log_config = get_log_config(args.verbose.into()); TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?; let rpc_client = RpcClient::new(args.endpoint).await?; diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index ead9a1db5..e1e5a8d2d 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -73,7 +73,7 @@ struct Args { #[structopt(long, default_value = "8")] /// Connection slots - slots: u32, + slots: usize, #[structopt(long)] /// Connect to seed (repeatable flag) @@ -397,14 +397,13 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { // P2P network let network_settings = net::Settings { - inbound: args.p2p_accept, + inbound_addrs: args.p2p_accept, outbound_connections: args.slots, - external_addr: args.p2p_external, + external_addrs: args.p2p_external, peers: args.peers.clone(), seeds: args.seeds.clone(), - outbound_transports: net::settings::get_outbound_transports(args.transports), + allowed_transports: args.transports, localnet: args.localnet, - channel_log: args.channel_log, ..Default::default() }; @@ -434,9 +433,6 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { }) .detach(); - info!("Waiting for P2P outbound connections"); - p2p.wait_for_outbound(ex).await?; - fud.init().await?; // Wait for SIGINT diff --git a/bin/genev/genev-cli/src/main.rs b/bin/genev/genev-cli/src/main.rs index e77eceda5..708c2733f 100644 --- a/bin/genev/genev-cli/src/main.rs +++ b/bin/genev/genev-cli/src/main.rs @@ -67,7 +67,7 @@ async fn main() -> Result<()> { let args = Args::parse(); let log_level = get_log_level(args.verbose.into()); - let log_config = get_log_config(); + let log_config = get_log_config(args.verbose.into()); TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?; let rpc_client = RpcClient::new(args.endpoint).await?; diff --git a/bin/genev/genevd/src/main.rs b/bin/genev/genevd/src/main.rs index f4ccd7873..2f6d7cc23 100644 --- a/bin/genev/genevd/src/main.rs +++ b/bin/genev/genevd/src/main.rs @@ -99,8 +99,7 @@ async fn realmain(args: Args, executor: Arc>) -> Result<()> { let seen_inv = Seen::new(); // Check the version - let mut net_settings = args.net.clone(); - net_settings.app_version = Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string()); + let net_settings = args.net.clone(); // New p2p let p2p = net::P2p::new(net_settings.into()).await; diff --git a/bin/genev/genevd/src/rpc.rs b/bin/genev/genevd/src/rpc.rs index 00c834cc0..3f0fbdf20 100644 --- a/bin/genev/genevd/src/rpc.rs +++ b/bin/genev/genevd/src/rpc.rs @@ -111,7 +111,7 @@ impl JsonRpcInterface { return JsonResponse::new(json, id).into() } - self.p2p.broadcast(event).await.unwrap(); + self.p2p.broadcast(&event).await; let json = json!(true); JsonResponse::new(json, id).into() diff --git a/bin/tau/tau-cli/src/main.rs b/bin/tau/tau-cli/src/main.rs index b908b9c25..5477fbb0f 100644 --- a/bin/tau/tau-cli/src/main.rs +++ b/bin/tau/tau-cli/src/main.rs @@ -165,7 +165,7 @@ async fn main() -> Result<()> { let args = Args::parse(); let log_level = get_log_level(args.verbose.into()); - let log_config = get_log_config(); + let log_config = get_log_config(args.verbose.into()); TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?; let rpc_client = RpcClient::new(args.endpoint).await?; diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 728fdcccf..f0b7b0069 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -166,7 +166,7 @@ async fn start_sync_loop( timestamp: Timestamp::current_time(), }; - p2p.broadcast(event).await?; + p2p.broadcast(&event).await; } } @@ -331,8 +331,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<( // // P2p setup // - let mut net_settings = settings.net.clone(); - net_settings.app_version = Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string()); + let net_settings = settings.net.clone(); let p2p = net::P2p::new(net_settings.into()).await; let registry = p2p.protocol_registry(); diff --git a/example/dchat/src/dchatmsg.rs b/example/dchat/src/dchatmsg.rs index e0c5ed77e..835720946 100644 --- a/example/dchat/src/dchatmsg.rs +++ b/example/dchat/src/dchatmsg.rs @@ -19,19 +19,15 @@ // ANCHOR: msg use async_std::sync::{Arc, Mutex}; -use darkfi::net; +use darkfi::{impl_p2p_message, net::Message}; use darkfi_serial::{SerialDecodable, SerialEncodable}; pub type DchatMsgsBuffer = Arc>>; -impl net::Message for DchatMsg { - fn name() -> &'static str { - "DchatMsg" - } -} - #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct DchatMsg { pub msg: String, } + +impl_p2p_message!(DchatMsg, "DchatMsg"); // ANCHOR_END: msg diff --git a/example/dchat/src/main.rs b/example/dchat/src/main.rs index 06b5fe62d..609b01168 100644 --- a/example/dchat/src/main.rs +++ b/example/dchat/src/main.rs @@ -113,7 +113,7 @@ impl Dchat { debug!(target: "dchat", "Dchat::register_protocol() [START]"); let registry = self.p2p.protocol_registry(); registry - .register(!net::SESSION_SEED, move |channel, _p2p| { + .register(!net::session::SESSION_SEED, move |channel, _p2p| { let msgs2 = msgs.clone(); async move { ProtocolDchat::init(channel, msgs2).await } }) @@ -145,7 +145,7 @@ impl Dchat { // ANCHOR: send async fn send(&self, msg: String) -> Result<()> { let dchatmsg = DchatMsg { msg }; - self.p2p.broadcast(dchatmsg).await?; + self.p2p.broadcast(&dchatmsg).await; Ok(()) } // ANCHOR_END: send @@ -179,8 +179,8 @@ fn alice() -> Result { let ext_addr = Url::parse("tcp://127.0.0.1:51554").unwrap(); let net = Settings { - inbound: vec![inbound], - external_addr: vec![ext_addr], + inbound_addrs: vec![inbound], + external_addrs: vec![ext_addr], seeds: vec![seed], localnet: true, ..Default::default() @@ -205,7 +205,7 @@ fn bob() -> Result { let seed = Url::parse("tcp://127.0.0.1:50515").unwrap(); let net = Settings { - inbound: vec![], + inbound_addrs: vec![], outbound_connections: 5, seeds: vec![seed], localnet: true, diff --git a/example/dchat/src/protocol_dchat.rs b/example/dchat/src/protocol_dchat.rs index cc14158bd..bd8fe3579 100644 --- a/example/dchat/src/protocol_dchat.rs +++ b/example/dchat/src/protocol_dchat.rs @@ -36,7 +36,7 @@ pub struct ProtocolDchat { impl ProtocolDchat { pub async fn init(channel: net::ChannelPtr, msgs: DchatMsgsBuffer) -> net::ProtocolBasePtr { debug!(target: "dchat", "ProtocolDchat::init() [START]"); - let message_subsytem = channel.get_message_subsystem(); + let message_subsytem = channel.message_subsystem(); message_subsytem.add_dispatch::().await; let msg_sub = diff --git a/example/net.rs b/example/net.rs index 5e80abe14..74321d6f5 100644 --- a/example/net.rs +++ b/example/net.rs @@ -55,7 +55,7 @@ pub struct DarkCli { pub connect: Option>, /// connections slots #[clap(long)] - pub connect_slots: Option, + pub connect_slots: Option, /// RPC port #[clap(long)] pub rpc_port: Option, @@ -93,9 +93,9 @@ impl ProgramOptions { Ok(ProgramOptions { network_settings: net::Settings { - inbound: accept_addr.clone(), + inbound_addrs: accept_addr.clone(), outbound_connections: connection_slots, - external_addr: accept_addr, + external_addrs: accept_addr, peers: manual_connects, seeds: seed_addrs, ..Default::default() @@ -108,7 +108,7 @@ fn main() -> Result<()> { let options = ProgramOptions::load()?; let lvl = get_log_level(1); - let conf = get_log_config(); + let conf = get_log_config(1); TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto)?; diff --git a/src/dht/mod.rs b/src/dht/mod.rs index 38dc7fc1d..daefc1a85 100644 --- a/src/dht/mod.rs +++ b/src/dht/mod.rs @@ -142,10 +142,7 @@ impl Dht { Some(_) => { debug!(target: "dht", "Key removed: {}", key); let request = LookupRequest::new(self.id, key, 1); - if let Err(e) = self.p2p.broadcast(&request).await { - error!(target: "dht", "Failed broadcasting request: {}", e); - return Err(e) - } + self.p2p.broadcast(&request).await; self.lookup_remove(key, self.id) } @@ -225,10 +222,7 @@ impl Dht { let peer = *peers.iter().last().unwrap(); let request = KeyRequest::new(self.id, peer, key); // TODO: ask connected peers directly, not broadcast - if let Err(e) = self.p2p.broadcast(request).await { - error!(target: "dht", "Failed broadcasting request: {}", e); - return Err(e) - } + self.p2p.broadcast(&request).await; Ok(()) } diff --git a/src/dht/protocol.rs b/src/dht/protocol.rs index f66baa0e0..30519ba13 100644 --- a/src/dht/protocol.rs +++ b/src/dht/protocol.rs @@ -55,7 +55,7 @@ impl Protocol { p2p: P2pPtr, ) -> Result { debug!(target: "dht::protocol", "Adding Protocol to the protocol registry"); - let msg_subsystem = channel.get_message_subsystem(); + let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; diff --git a/src/lib.rs b/src/lib.rs index 8567deb5d..7ad45676a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,9 @@ pub mod consensus; #[cfg(feature = "blockchain")] pub mod validator; +#[cfg(feature = "dht")] +pub mod dht; + #[cfg(feature = "dht")] pub mod dht2;