From ebe6f4e5c402f97a3a640cbdddcd561c9cfa9dce Mon Sep 17 00:00:00 2001 From: x Date: Tue, 22 Aug 2023 13:49:54 +0200 Subject: [PATCH] net/dnet: add outbound connected/disconnected events --- bin/darkirc/src/main.rs | 2 +- script/nodetool.py | 9 +++- src/net/channel.rs | 33 ++++++++------- src/net/dnet.rs | 17 ++++++-- src/net/message.rs | 24 +++++------ src/net/p2p.rs | 12 +++--- src/net/session/outbound_session.rs | 55 +++++++++++++++--------- src/rpc/from_impl.rs | 65 ++++++++++++++++++++++------- 8 files changed, 144 insertions(+), 73 deletions(-) diff --git a/bin/darkirc/src/main.rs b/bin/darkirc/src/main.rs index 34377809a..c908de899 100644 --- a/bin/darkirc/src/main.rs +++ b/bin/darkirc/src/main.rs @@ -217,7 +217,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<( let dnet_task = StoppableTask::new(); dnet_task.clone().start( async move { - let dnet_sub = p2p_.dnet_sub().subscribe().await; + let dnet_sub = p2p_.dnet_subscribe().await; loop { let event = dnet_sub.receive().await; debug!("Got dnet event: {:?}", event); diff --git a/script/nodetool.py b/script/nodetool.py index 41358d5c1..75e08cd70 100644 --- a/script/nodetool.py +++ b/script/nodetool.py @@ -74,13 +74,20 @@ class JsonRpc: async def main(argv): rpc = JsonRpc() - await rpc.start("localhost", 26660) + while True: + try: + await rpc.start("localhost", 26660) + break + except OSError: + pass await rpc.dnet_switch(True) await rpc.dnet_subscribe_events() while True: data = await rpc.reader.readline() data = json.loads(data.decode().strip()) + if data["params"][0]["event"] in ["send", "recv"]: + continue print(data) await rpc.dnet_switch(False) diff --git a/src/net/channel.rs b/src/net/channel.rs index e823789df..6eb92fb28 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -28,7 +28,7 @@ use smol::Executor; use url::Url; use super::{ - dnet::{dnet, DnetEvent, MessageInfo}, + dnet::{self, dnet, DnetEvent}, message, message::Packet, message_subscriber::{MessageSubscription, MessageSubsystem}, @@ -48,13 +48,13 @@ pub type ChannelPtr = Arc; /// Channel debug info #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct ChannelInfo { - pub address: Url, - pub random_id: u32, + pub addr: Url, + pub id: u32, } impl ChannelInfo { - fn new(address: Url) -> Self { - Self { address, random_id: OsRng.gen() } + fn new(addr: Url) -> Self { + Self { addr, id: OsRng.gen() } } } @@ -75,7 +75,7 @@ pub struct Channel { /// Weak pointer to respective session session: SessionWeakPtr, /// Channel debug info - info: ChannelInfo, + pub info: ChannelInfo, } impl std::fmt::Debug for Channel { @@ -88,11 +88,7 @@ impl Channel { /// Sets up a new channel. Creates a reader and writer [`PtStream`] and /// summons the message subscriber subsystem. Performs a network handshake /// on the subsystem dispatchers. - pub async fn new( - stream: Box, - address: Url, - session: SessionWeakPtr, - ) -> Arc { + pub async fn new(stream: Box, addr: Url, session: SessionWeakPtr) -> Arc { let (reader, writer) = stream.split(); let reader = Mutex::new(reader); let writer = Mutex::new(writer); @@ -100,7 +96,7 @@ impl Channel { let message_subsystem = MessageSubsystem::new(); Self::setup_dispatchers(&message_subsystem).await; - let info = ChannelInfo::new(address.clone()); + let info = ChannelInfo::new(addr.clone()); Arc::new(Self { reader, @@ -212,7 +208,7 @@ impl Channel { let packet = Packet { command: M::NAME.to_string(), payload: serialize(message) }; dnet!(self, - let event = DnetEvent::SendMessage(MessageInfo { + let event = DnetEvent::SendMessage(dnet::MessageInfo { chan: self.info.clone(), cmd: packet.command.clone(), time: NanoTimestamp::current_time(), @@ -292,6 +288,15 @@ impl Channel { } }; + dnet!(self, + let event = DnetEvent::RecvMessage(dnet::MessageInfo { + chan: self.info.clone(), + cmd: packet.command.clone(), + time: NanoTimestamp::current_time(), + }); + self.p2p().dnet_notify(event).await; + ); + // Send result to our subscribers self.message_subsystem.notify(&packet.command, &packet.payload).await; } @@ -299,7 +304,7 @@ impl Channel { /// Returns the local socket address pub fn address(&self) -> &Url { - &self.info.address + &self.info.addr } /// Returns the inner [`MessageSubsystem`] reference diff --git a/src/net/dnet.rs b/src/net/dnet.rs index 64636fb77..ae4142554 100644 --- a/src/net/dnet.rs +++ b/src/net/dnet.rs @@ -18,7 +18,7 @@ use super::channel::ChannelInfo; use crate::util::time::NanoTimestamp; -use darkfi_serial::{SerialDecodable, SerialEncodable}; +use url::Url; macro_rules! dnet { ($self:expr, $($code:tt)*) => { @@ -31,14 +31,25 @@ macro_rules! dnet { } pub(crate) use dnet; -#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug)] pub struct MessageInfo { pub chan: ChannelInfo, pub cmd: String, pub time: NanoTimestamp, } -#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] +#[derive(Clone, Debug)] +pub struct OutboundConnect { + pub slot: u32, + pub addr: Url, + pub channel_id: u32, +} + +#[derive(Clone, Debug)] pub enum DnetEvent { SendMessage(MessageInfo), + RecvMessage(MessageInfo), + //OutboundConnecting(OutboundConnect), + OutboundConnected(OutboundConnect), + OutboundDisconnected(u32), } diff --git a/src/net/message.rs b/src/net/message.rs index 21a2bf623..097339d9c 100644 --- a/src/net/message.rs +++ b/src/net/message.rs @@ -18,7 +18,7 @@ use darkfi_serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}; use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use log::debug; +use log::trace; use url::Url; use crate::{Error, Result}; @@ -100,12 +100,12 @@ pub async fn read_packet(stream: &mut R) -> Result // Packets should have a 4 byte header of magic digits. // This is used for network debugging. let mut magic = [0u8; 4]; - debug!(target: "net::message", "Reading magic..."); + trace!(target: "net::message", "Reading magic..."); stream.read_exact(&mut magic).await?; - debug!(target: "net::message", "Read magic {:?}", magic); + trace!(target: "net::message", "Read magic {:?}", magic); if magic != MAGIC_BYTES { - debug!(target: "net::message", "Error: Magic bytes mismatch"); + trace!(target: "net::message", "Error: Magic bytes mismatch"); return Err(Error::MalformedPacket) } @@ -114,13 +114,13 @@ pub async fn read_packet(stream: &mut R) -> Result let mut cmd = vec![0u8; command_len]; stream.read_exact(&mut cmd).await?; let command = String::from_utf8(cmd)?; - debug!(target: "net::message", "Read command: {}", command); + trace!(target: "net::message", "Read command: {}", command); // The message-dependent data (see message types) let payload_len = VarInt::decode_async(stream).await?.0 as usize; let mut payload = vec![0u8; payload_len]; stream.read_exact(&mut payload).await?; - debug!(target: "net::message", "Read payload {} bytes", payload_len); + trace!(target: "net::message", "Read payload {} bytes", payload_len); Ok(Packet { command, payload }) } @@ -137,23 +137,23 @@ pub async fn send_packet( let mut written: usize = 0; - debug!(target: "net::message", "Sending magic..."); + trace!(target: "net::message", "Sending magic..."); stream.write_all(&MAGIC_BYTES).await?; written += MAGIC_BYTES.len(); - debug!(target: "net::message", "Sent magic"); + trace!(target: "net::message", "Sent magic"); - debug!(target: "net::message", "Sending command..."); + trace!(target: "net::message", "Sending command..."); written += VarInt(packet.command.len() as u64).encode_async(stream).await?; let cmd_ref = packet.command.as_bytes(); stream.write_all(cmd_ref).await?; written += cmd_ref.len(); - debug!(target: "net::message", "Sent command: {}", packet.command); + trace!(target: "net::message", "Sent command: {}", packet.command); - debug!(target: "net::message", "Sending payload..."); + trace!(target: "net::message", "Sending payload..."); written += VarInt(packet.payload.len() as u64).encode_async(stream).await?; stream.write_all(&packet.payload).await?; written += packet.payload.len(); - debug!(target: "net::message", "Sent payload {} bytes", packet.payload.len() as u64); + trace!(target: "net::message", "Sent payload {} bytes", packet.payload.len() as u64); stream.flush().await?; diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 9d6f7d25c..0067fabb3 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -81,7 +81,7 @@ pub struct P2p { /// Enable network debugging pub dnet_enabled: Mutex, /// The subscriber for which we can give dnet info over - dnet_sub: SubscriberPtr, + dnet_subscriber: SubscriberPtr, } impl P2p { @@ -111,7 +111,7 @@ impl P2p { session_outbound: Mutex::new(None), dnet_enabled: Mutex::new(false), - dnet_sub: Subscriber::new(), + dnet_subscriber: Subscriber::new(), }); let parent = Arc::downgrade(&self_); @@ -319,13 +319,13 @@ impl P2p { warn!("[P2P] Network debugging disabled!"); } - /// Return a reference to the dnet subscriber - pub fn dnet_sub(&self) -> SubscriberPtr { - self.dnet_sub.clone() + /// Subscribe to dnet events + pub async fn dnet_subscribe(&self) -> Subscription { + self.dnet_subscriber.clone().subscribe().await } /// Send a dnet notification over the subscriber pub async fn dnet_notify(&self, event: DnetEvent) { - self.dnet_sub.notify(event).await; + self.dnet_subscriber.notify(event).await; } } diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index d669e89ac..fc8eac0d8 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -38,6 +38,7 @@ use super::{ super::{ channel::ChannelPtr, connector::Connector, + dnet::{self, dnet, DnetEvent}, message::GetAddrsMessage, p2p::{P2p, P2pPtr}, }, @@ -103,7 +104,7 @@ impl OutboundSession { // Activate mutex lock on connection slots. let mut connect_slots = self.connect_slots.lock().await; - for i in 0..n_slots { + for i in 0..n_slots as u32 { let task = StoppableTask::new(); task.clone().start( @@ -132,7 +133,7 @@ impl OutboundSession { /// Creates a connector object and tries to connect using it. pub async fn channel_connect_loop( self: Arc, - slot_number: usize, + slot: u32, ex: Arc>, ) -> Result<()> { let parent = Arc::downgrade(&self); @@ -153,19 +154,24 @@ impl OutboundSession { // signal and then exit. Once it exits, we'll run `try_connect` again // and attempt to fill the slot with another peer. loop { - match self.try_connect(slot_number, &connector, transports, ex.clone()).await { + match self.try_connect(slot, &connector, transports, ex.clone()).await { Ok(()) => { info!( target: "net::outbound_session", "[P2P] Outbound slot #{} disconnected", - slot_number + slot ); } Err(e) => { error!( target: "net::outbound_session", "[P2P] Outbound slot #{} connection failed: {}", - slot_number, e, + slot, e, + ); + + dnet!(self, + let event = DnetEvent::OutboundDisconnected(slot); + self.p2p().dnet_notify(event).await; ); } } @@ -181,7 +187,7 @@ impl OutboundSession { /// main connect loop (parent of this function) will iterate again. async fn try_connect( &self, - slot_number: usize, + slot: u32, connector: &Connector, transports: &[String], ex: Arc>, @@ -189,15 +195,15 @@ impl OutboundSession { debug!( target: "net::outbound_session::try_connect()", "[P2P] Finding a host to connect to for outbound slot #{}", - slot_number, + slot, ); // Find an address to connect to. We also do peer discovery here if needed. - let addr = self.load_address(slot_number, transports, ex.clone()).await?; + let addr = self.load_address(slot, transports, ex.clone()).await?; info!( target: "net::outbound_session::try_connect()", "[P2P] Connecting outbound slot #{} [{}]", - slot_number, addr, + slot, addr, ); match connector.connect(&addr).await { @@ -205,7 +211,16 @@ impl OutboundSession { info!( target: "net::outbound_session::try_connect()", "[P2P] Outbound slot #{} connected [{}]", - slot_number, url + slot, url + ); + + dnet!(self, + let event = DnetEvent::OutboundConnected(dnet::OutboundConnect { + slot, + addr: addr.clone(), + channel_id: channel.info.id + }); + self.p2p().dnet_notify(event).await; ); let stop_sub = @@ -232,7 +247,7 @@ impl OutboundSession { error!( target: "net::outbound_session::try_connect()", "[P2P] Unable to connect outbound slot #{} [{}]: {}", - slot_number, addr, e + slot, addr, e ); } } @@ -255,7 +270,7 @@ impl OutboundSession { /// to do peer discovery and try to fill the slot again. async fn load_address( &self, - slot_number: usize, + slot: u32, transports: &[String], ex: Arc>, ) -> Result { @@ -267,7 +282,7 @@ impl OutboundSession { debug!( target: "net::outbound_session::load_address()", "[P2P] #{} Peer discovery active, waiting {} seconds...", - slot_number, retry_sleep, + slot, retry_sleep, ); sleep(retry_sleep).await; } @@ -324,7 +339,7 @@ impl OutboundSession { info!( target: "net::outbound_session::load_address()", "[P2P] Outbound #{}: No peers found. Starting peer discovery...", - slot_number, + slot, ); // NOTE: A design decision here is to do a sleep inside peer_discovery() // so that there's a certain period (outbound_connect_timeout) of time @@ -332,7 +347,7 @@ impl OutboundSession { // inside peer_discovery, it will block here in the slot sessions, while // other slots can keep trying to find hosts. This is also why we sleep // in the beginning of this loop if peer discovery is currently active. - self.peer_discovery(slot_number, ex.clone()).await; + self.peer_discovery(slot, ex.clone()).await; } } @@ -343,14 +358,14 @@ impl OutboundSession { /// This function will also sleep `Settings::outbound_connect_timeout` seconds /// after broadcasting in order to let the P2P stack receive and work through /// the addresses it is expecting. - async fn peer_discovery(&self, slot_number: usize, ex: Arc>) { + async fn peer_discovery(&self, slot: u32, ex: Arc>) { let p2p = self.p2p(); if *p2p.peer_discovery_running.lock().await { info!( target: "net::outbound_session::peer_discovery()", "[P2P] Outbound #{}: Peer discovery already active", - slot_number, + slot, ); return } @@ -358,7 +373,7 @@ impl OutboundSession { info!( target: "net::outbound_session::peer_discovery()", "[P2P] Outbound #{}: Started peer discovery", - slot_number, + slot, ); *p2p.peer_discovery_running.lock().await = true; @@ -369,7 +384,7 @@ impl OutboundSession { info!( target: "net::outbound_session::peer_discovery()", "[P2P] Outbound #{}: Broadcasting GetAddrs across active channels", - slot_number, + slot, ); p2p.broadcast(&get_addrs).await; } else { @@ -391,7 +406,7 @@ impl OutboundSession { debug!( target: "net::outbound_session::peer_discovery()", "[P2P] Outbound #{}: Sleeping {} seconds", - slot_number, p2p.settings().outbound_connect_timeout, + slot, p2p.settings().outbound_connect_timeout, ); sleep(p2p.settings().outbound_connect_timeout).await; *p2p.peer_discovery_running.lock().await = false; diff --git a/src/rpc/from_impl.rs b/src/rpc/from_impl.rs index 52e38ebb6..724fd1262 100644 --- a/src/rpc/from_impl.rs +++ b/src/rpc/from_impl.rs @@ -16,32 +16,65 @@ * along with this program. If not, see . */ +use std::collections::HashMap; +use tinyjson::JsonValue::{self, Number as JsonNum, Object as JsonObj, String as JsonStr}; + +use crate::net; + +// helper functions +fn json_map(vals: [(&str, JsonValue); N]) -> JsonValue { + JsonObj(HashMap::from(vals.map(|(k, v)| (k.to_string(), v)))) +} +fn json_str(val: &str) -> JsonValue { + JsonStr(val.to_string()) +} + #[cfg(feature = "net")] -impl From for tinyjson::JsonValue { - fn from(info: crate::net::channel::ChannelInfo) -> tinyjson::JsonValue { - tinyjson::JsonValue::Object(std::collections::HashMap::from([ - ("address".to_string(), tinyjson::JsonValue::String(info.address.to_string())), - ("random_id".to_string(), tinyjson::JsonValue::Number(info.random_id.into())), - ])) +impl From for JsonValue { + fn from(info: net::channel::ChannelInfo) -> JsonValue { + json_map([("addr", JsonStr(info.addr.to_string())), ("id", JsonNum(info.id.into()))]) } } #[cfg(feature = "net")] -impl From for tinyjson::JsonValue { - fn from(info: crate::net::dnet::MessageInfo) -> tinyjson::JsonValue { - tinyjson::JsonValue::Object(std::collections::HashMap::from([ - ("chan".to_string(), info.chan.into()), - ("cmd".to_string(), tinyjson::JsonValue::String(info.cmd.clone())), - ("time".to_string(), tinyjson::JsonValue::String(info.time.0.to_string())), - ])) +impl From for JsonValue { + fn from(info: net::dnet::MessageInfo) -> JsonValue { + json_map([ + ("chan", info.chan.into()), + ("cmd", JsonStr(info.cmd.clone())), + ("time", JsonStr(info.time.0.to_string())), + ]) } } #[cfg(feature = "net")] -impl From for tinyjson::JsonValue { - fn from(event: crate::net::dnet::DnetEvent) -> tinyjson::JsonValue { +impl From for JsonValue { + fn from(info: net::dnet::OutboundConnect) -> JsonValue { + json_map([ + ("slot", JsonNum(info.slot.into())), + ("addr", JsonStr(info.addr.to_string())), + ("channel_id", JsonNum(info.channel_id.into())), + ]) + } +} + +#[cfg(feature = "net")] +impl From for JsonValue { + fn from(event: net::dnet::DnetEvent) -> JsonValue { match event { - crate::net::dnet::DnetEvent::SendMessage(message_info) => message_info.into(), + net::dnet::DnetEvent::SendMessage(info) => { + json_map([("event", json_str("send")), ("info", info.into())]) + } + net::dnet::DnetEvent::RecvMessage(info) => { + json_map([("event", json_str("recv")), ("info", info.into())]) + } + net::dnet::DnetEvent::OutboundConnected(info) => { + json_map([("event", json_str("outbound_connected")), ("info", info.into())]) + } + net::dnet::DnetEvent::OutboundDisconnected(slot) => json_map([ + ("event", json_str("outbound_disconnected")), + ("slot", JsonNum(slot.into())), + ]), } } }