diff --git a/script/nodetool.py b/script/nodetool.py index 3b853c320..e23b4d0cb 100644 --- a/script/nodetool.py +++ b/script/nodetool.py @@ -99,6 +99,12 @@ async def main(argv): current_time = time.strftime("%H:%M:%S", t) match ev: + case "inbound_connected": + addr = info["addr"] + print(f"{current_time} inbound (connect): {addr}") + case "inbound_disconnected": + addr = info["addr"] + print(f"{current_time} inbound (disconnect): {addr}") case "outbound_slot_sleeping": slot = info["slot"] print(f"{current_time} slot {slot}: sleeping") diff --git a/src/net/dnet.rs b/src/net/dnet.rs index c8275914d..4aaf40ba6 100644 --- a/src/net/dnet.rs +++ b/src/net/dnet.rs @@ -44,6 +44,15 @@ pub struct MessageInfo { pub type SendMessage = MessageInfo; pub type RecvMessage = MessageInfo; +#[derive(Clone, Debug)] +pub struct InboundInfo { + pub addr: Url, + pub channel_id: u32, +} + +pub type InboundConnected = InboundInfo; +pub type InboundDisconnected = InboundInfo; + #[derive(Clone, Debug)] pub struct OutboundSlotSleeping { pub slot: u32, @@ -78,6 +87,8 @@ pub struct OutboundPeerDiscovery { pub enum DnetEvent { SendMessage(MessageInfo), RecvMessage(MessageInfo), + InboundConnected(InboundConnected), + InboundDisconnected(InboundDisconnected), OutboundSlotSleeping(OutboundSlotSleeping), OutboundSlotConnecting(OutboundSlotConnecting), OutboundSlotConnected(OutboundSlotConnected), diff --git a/src/net/session/inbound_session.rs b/src/net/session/inbound_session.rs index e6ad7dc3f..ff0f092e9 100644 --- a/src/net/session/inbound_session.rs +++ b/src/net/session/inbound_session.rs @@ -34,6 +34,7 @@ use super::{ super::{ acceptor::{Acceptor, AcceptorPtr}, channel::ChannelPtr, + dnet::{self, dnetev, DnetEvent}, p2p::{P2p, P2pPtr}, }, Session, SessionBitFlag, SESSION_INBOUND, @@ -157,18 +158,28 @@ impl InboundSession { "[P2P] Connected Inbound #{} [{}]", index, channel.address(), ); + dnetev!(self, InboundConnected, { + addr: channel.info.addr.clone(), + channel_id: channel.info.id, + }); + let stop_sub = channel.subscribe_stop().await?; self.register_channel(channel.clone(), ex.clone()).await?; stop_sub.receive().await; + self.p2p().remove(channel.clone()).await; + debug!( target: "net::inbound_session::setup_channel()", - "Received stop_sub, removing channel from P2P", + "Received stop_sub, channel removed from P2P", ); - self.p2p().remove(channel).await; + dnetev!(self, InboundDisconnected, { + addr: channel.info.addr.clone(), + channel_id: channel.info.id, + }); Ok(()) } diff --git a/src/rpc/from_impl.rs b/src/rpc/from_impl.rs index 372971ed0..f8fd4c6b1 100644 --- a/src/rpc/from_impl.rs +++ b/src/rpc/from_impl.rs @@ -37,6 +37,16 @@ impl From for JsonValue { } } +#[cfg(feature = "net")] +impl From for JsonValue { + fn from(info: net::dnet::InboundInfo) -> JsonValue { + json_map([ + ("addr", JsonStr(info.addr.to_string())), + ("channel_id", JsonNum(info.channel_id.into())), + ]) + } +} + #[cfg(feature = "net")] impl From for JsonValue { fn from(info: net::dnet::OutboundSlotSleeping) -> JsonValue { @@ -89,6 +99,12 @@ impl From for JsonValue { net::dnet::DnetEvent::RecvMessage(info) => { json_map([("event", json_str("recv")), ("info", info.into())]) } + net::dnet::DnetEvent::InboundConnected(info) => { + json_map([("event", json_str("inbound_connected")), ("info", info.into())]) + } + net::dnet::DnetEvent::InboundDisconnected(info) => { + json_map([("event", json_str("inbound_disconnected")), ("info", info.into())]) + } net::dnet::DnetEvent::OutboundSlotSleeping(info) => { json_map([("event", json_str("outbound_slot_sleeping")), ("info", info.into())]) }