net/dnet: add outbound connected/disconnected events

This commit is contained in:
x
2023-08-22 13:49:54 +02:00
parent a0b73961fb
commit ebe6f4e5c4
8 changed files with 144 additions and 73 deletions

View File

@@ -28,7 +28,7 @@ use smol::Executor;
use url::Url;
use super::{
dnet::{dnet, DnetEvent, MessageInfo},
dnet::{self, dnet, DnetEvent},
message,
message::Packet,
message_subscriber::{MessageSubscription, MessageSubsystem},
@@ -48,13 +48,13 @@ pub type ChannelPtr = Arc<Channel>;
/// Channel debug info
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ChannelInfo {
pub address: Url,
pub random_id: u32,
pub addr: Url,
pub id: u32,
}
impl ChannelInfo {
fn new(address: Url) -> Self {
Self { address, random_id: OsRng.gen() }
fn new(addr: Url) -> Self {
Self { addr, id: OsRng.gen() }
}
}
@@ -75,7 +75,7 @@ pub struct Channel {
/// Weak pointer to respective session
session: SessionWeakPtr,
/// Channel debug info
info: ChannelInfo,
pub info: ChannelInfo,
}
impl std::fmt::Debug for Channel {
@@ -88,11 +88,7 @@ impl Channel {
/// Sets up a new channel. Creates a reader and writer [`PtStream`] and
/// summons the message subscriber subsystem. Performs a network handshake
/// on the subsystem dispatchers.
pub async fn new(
stream: Box<dyn PtStream>,
address: Url,
session: SessionWeakPtr,
) -> Arc<Self> {
pub async fn new(stream: Box<dyn PtStream>, addr: Url, session: SessionWeakPtr) -> Arc<Self> {
let (reader, writer) = stream.split();
let reader = Mutex::new(reader);
let writer = Mutex::new(writer);
@@ -100,7 +96,7 @@ impl Channel {
let message_subsystem = MessageSubsystem::new();
Self::setup_dispatchers(&message_subsystem).await;
let info = ChannelInfo::new(address.clone());
let info = ChannelInfo::new(addr.clone());
Arc::new(Self {
reader,
@@ -212,7 +208,7 @@ impl Channel {
let packet = Packet { command: M::NAME.to_string(), payload: serialize(message) };
dnet!(self,
let event = DnetEvent::SendMessage(MessageInfo {
let event = DnetEvent::SendMessage(dnet::MessageInfo {
chan: self.info.clone(),
cmd: packet.command.clone(),
time: NanoTimestamp::current_time(),
@@ -292,6 +288,15 @@ impl Channel {
}
};
dnet!(self,
let event = DnetEvent::RecvMessage(dnet::MessageInfo {
chan: self.info.clone(),
cmd: packet.command.clone(),
time: NanoTimestamp::current_time(),
});
self.p2p().dnet_notify(event).await;
);
// Send result to our subscribers
self.message_subsystem.notify(&packet.command, &packet.payload).await;
}
@@ -299,7 +304,7 @@ impl Channel {
/// Returns the local socket address
pub fn address(&self) -> &Url {
&self.info.address
&self.info.addr
}
/// Returns the inner [`MessageSubsystem`] reference

View File

@@ -18,7 +18,7 @@
use super::channel::ChannelInfo;
use crate::util::time::NanoTimestamp;
use darkfi_serial::{SerialDecodable, SerialEncodable};
use url::Url;
macro_rules! dnet {
($self:expr, $($code:tt)*) => {
@@ -31,14 +31,25 @@ macro_rules! dnet {
}
pub(crate) use dnet;
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug)]
pub struct MessageInfo {
pub chan: ChannelInfo,
pub cmd: String,
pub time: NanoTimestamp,
}
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
#[derive(Clone, Debug)]
pub struct OutboundConnect {
pub slot: u32,
pub addr: Url,
pub channel_id: u32,
}
#[derive(Clone, Debug)]
pub enum DnetEvent {
SendMessage(MessageInfo),
RecvMessage(MessageInfo),
//OutboundConnecting(OutboundConnect),
OutboundConnected(OutboundConnect),
OutboundDisconnected(u32),
}

View File

@@ -18,7 +18,7 @@
use darkfi_serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt};
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use log::debug;
use log::trace;
use url::Url;
use crate::{Error, Result};
@@ -100,12 +100,12 @@ pub async fn read_packet<R: AsyncRead + Unpin + Sized>(stream: &mut R) -> Result
// Packets should have a 4 byte header of magic digits.
// This is used for network debugging.
let mut magic = [0u8; 4];
debug!(target: "net::message", "Reading magic...");
trace!(target: "net::message", "Reading magic...");
stream.read_exact(&mut magic).await?;
debug!(target: "net::message", "Read magic {:?}", magic);
trace!(target: "net::message", "Read magic {:?}", magic);
if magic != MAGIC_BYTES {
debug!(target: "net::message", "Error: Magic bytes mismatch");
trace!(target: "net::message", "Error: Magic bytes mismatch");
return Err(Error::MalformedPacket)
}
@@ -114,13 +114,13 @@ pub async fn read_packet<R: AsyncRead + Unpin + Sized>(stream: &mut R) -> Result
let mut cmd = vec![0u8; command_len];
stream.read_exact(&mut cmd).await?;
let command = String::from_utf8(cmd)?;
debug!(target: "net::message", "Read command: {}", command);
trace!(target: "net::message", "Read command: {}", command);
// The message-dependent data (see message types)
let payload_len = VarInt::decode_async(stream).await?.0 as usize;
let mut payload = vec![0u8; payload_len];
stream.read_exact(&mut payload).await?;
debug!(target: "net::message", "Read payload {} bytes", payload_len);
trace!(target: "net::message", "Read payload {} bytes", payload_len);
Ok(Packet { command, payload })
}
@@ -137,23 +137,23 @@ pub async fn send_packet<W: AsyncWrite + Unpin + Sized>(
let mut written: usize = 0;
debug!(target: "net::message", "Sending magic...");
trace!(target: "net::message", "Sending magic...");
stream.write_all(&MAGIC_BYTES).await?;
written += MAGIC_BYTES.len();
debug!(target: "net::message", "Sent magic");
trace!(target: "net::message", "Sent magic");
debug!(target: "net::message", "Sending command...");
trace!(target: "net::message", "Sending command...");
written += VarInt(packet.command.len() as u64).encode_async(stream).await?;
let cmd_ref = packet.command.as_bytes();
stream.write_all(cmd_ref).await?;
written += cmd_ref.len();
debug!(target: "net::message", "Sent command: {}", packet.command);
trace!(target: "net::message", "Sent command: {}", packet.command);
debug!(target: "net::message", "Sending payload...");
trace!(target: "net::message", "Sending payload...");
written += VarInt(packet.payload.len() as u64).encode_async(stream).await?;
stream.write_all(&packet.payload).await?;
written += packet.payload.len();
debug!(target: "net::message", "Sent payload {} bytes", packet.payload.len() as u64);
trace!(target: "net::message", "Sent payload {} bytes", packet.payload.len() as u64);
stream.flush().await?;

View File

@@ -81,7 +81,7 @@ pub struct P2p {
/// Enable network debugging
pub dnet_enabled: Mutex<bool>,
/// The subscriber for which we can give dnet info over
dnet_sub: SubscriberPtr<DnetEvent>,
dnet_subscriber: SubscriberPtr<DnetEvent>,
}
impl P2p {
@@ -111,7 +111,7 @@ impl P2p {
session_outbound: Mutex::new(None),
dnet_enabled: Mutex::new(false),
dnet_sub: Subscriber::new(),
dnet_subscriber: Subscriber::new(),
});
let parent = Arc::downgrade(&self_);
@@ -319,13 +319,13 @@ impl P2p {
warn!("[P2P] Network debugging disabled!");
}
/// Return a reference to the dnet subscriber
pub fn dnet_sub(&self) -> SubscriberPtr<DnetEvent> {
self.dnet_sub.clone()
/// Subscribe to dnet events
pub async fn dnet_subscribe(&self) -> Subscription<DnetEvent> {
self.dnet_subscriber.clone().subscribe().await
}
/// Send a dnet notification over the subscriber
pub async fn dnet_notify(&self, event: DnetEvent) {
self.dnet_sub.notify(event).await;
self.dnet_subscriber.notify(event).await;
}
}

View File

@@ -38,6 +38,7 @@ use super::{
super::{
channel::ChannelPtr,
connector::Connector,
dnet::{self, dnet, DnetEvent},
message::GetAddrsMessage,
p2p::{P2p, P2pPtr},
},
@@ -103,7 +104,7 @@ impl OutboundSession {
// Activate mutex lock on connection slots.
let mut connect_slots = self.connect_slots.lock().await;
for i in 0..n_slots {
for i in 0..n_slots as u32 {
let task = StoppableTask::new();
task.clone().start(
@@ -132,7 +133,7 @@ impl OutboundSession {
/// Creates a connector object and tries to connect using it.
pub async fn channel_connect_loop(
self: Arc<Self>,
slot_number: usize,
slot: u32,
ex: Arc<Executor<'_>>,
) -> Result<()> {
let parent = Arc::downgrade(&self);
@@ -153,19 +154,24 @@ impl OutboundSession {
// signal and then exit. Once it exits, we'll run `try_connect` again
// and attempt to fill the slot with another peer.
loop {
match self.try_connect(slot_number, &connector, transports, ex.clone()).await {
match self.try_connect(slot, &connector, transports, ex.clone()).await {
Ok(()) => {
info!(
target: "net::outbound_session",
"[P2P] Outbound slot #{} disconnected",
slot_number
slot
);
}
Err(e) => {
error!(
target: "net::outbound_session",
"[P2P] Outbound slot #{} connection failed: {}",
slot_number, e,
slot, e,
);
dnet!(self,
let event = DnetEvent::OutboundDisconnected(slot);
self.p2p().dnet_notify(event).await;
);
}
}
@@ -181,7 +187,7 @@ impl OutboundSession {
/// main connect loop (parent of this function) will iterate again.
async fn try_connect(
&self,
slot_number: usize,
slot: u32,
connector: &Connector,
transports: &[String],
ex: Arc<Executor<'_>>,
@@ -189,15 +195,15 @@ impl OutboundSession {
debug!(
target: "net::outbound_session::try_connect()",
"[P2P] Finding a host to connect to for outbound slot #{}",
slot_number,
slot,
);
// Find an address to connect to. We also do peer discovery here if needed.
let addr = self.load_address(slot_number, transports, ex.clone()).await?;
let addr = self.load_address(slot, transports, ex.clone()).await?;
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Connecting outbound slot #{} [{}]",
slot_number, addr,
slot, addr,
);
match connector.connect(&addr).await {
@@ -205,7 +211,16 @@ impl OutboundSession {
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Outbound slot #{} connected [{}]",
slot_number, url
slot, url
);
dnet!(self,
let event = DnetEvent::OutboundConnected(dnet::OutboundConnect {
slot,
addr: addr.clone(),
channel_id: channel.info.id
});
self.p2p().dnet_notify(event).await;
);
let stop_sub =
@@ -232,7 +247,7 @@ impl OutboundSession {
error!(
target: "net::outbound_session::try_connect()",
"[P2P] Unable to connect outbound slot #{} [{}]: {}",
slot_number, addr, e
slot, addr, e
);
}
}
@@ -255,7 +270,7 @@ impl OutboundSession {
/// to do peer discovery and try to fill the slot again.
async fn load_address(
&self,
slot_number: usize,
slot: u32,
transports: &[String],
ex: Arc<Executor<'_>>,
) -> Result<Url> {
@@ -267,7 +282,7 @@ impl OutboundSession {
debug!(
target: "net::outbound_session::load_address()",
"[P2P] #{} Peer discovery active, waiting {} seconds...",
slot_number, retry_sleep,
slot, retry_sleep,
);
sleep(retry_sleep).await;
}
@@ -324,7 +339,7 @@ impl OutboundSession {
info!(
target: "net::outbound_session::load_address()",
"[P2P] Outbound #{}: No peers found. Starting peer discovery...",
slot_number,
slot,
);
// NOTE: A design decision here is to do a sleep inside peer_discovery()
// so that there's a certain period (outbound_connect_timeout) of time
@@ -332,7 +347,7 @@ impl OutboundSession {
// inside peer_discovery, it will block here in the slot sessions, while
// other slots can keep trying to find hosts. This is also why we sleep
// in the beginning of this loop if peer discovery is currently active.
self.peer_discovery(slot_number, ex.clone()).await;
self.peer_discovery(slot, ex.clone()).await;
}
}
@@ -343,14 +358,14 @@ impl OutboundSession {
/// This function will also sleep `Settings::outbound_connect_timeout` seconds
/// after broadcasting in order to let the P2P stack receive and work through
/// the addresses it is expecting.
async fn peer_discovery(&self, slot_number: usize, ex: Arc<Executor<'_>>) {
async fn peer_discovery(&self, slot: u32, ex: Arc<Executor<'_>>) {
let p2p = self.p2p();
if *p2p.peer_discovery_running.lock().await {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Outbound #{}: Peer discovery already active",
slot_number,
slot,
);
return
}
@@ -358,7 +373,7 @@ impl OutboundSession {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Outbound #{}: Started peer discovery",
slot_number,
slot,
);
*p2p.peer_discovery_running.lock().await = true;
@@ -369,7 +384,7 @@ impl OutboundSession {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Outbound #{}: Broadcasting GetAddrs across active channels",
slot_number,
slot,
);
p2p.broadcast(&get_addrs).await;
} else {
@@ -391,7 +406,7 @@ impl OutboundSession {
debug!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Outbound #{}: Sleeping {} seconds",
slot_number, p2p.settings().outbound_connect_timeout,
slot, p2p.settings().outbound_connect_timeout,
);
sleep(p2p.settings().outbound_connect_timeout).await;
*p2p.peer_discovery_running.lock().await = false;

View File

@@ -16,32 +16,65 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::collections::HashMap;
use tinyjson::JsonValue::{self, Number as JsonNum, Object as JsonObj, String as JsonStr};
use crate::net;
// helper functions
fn json_map<const N: usize>(vals: [(&str, JsonValue); N]) -> JsonValue {
JsonObj(HashMap::from(vals.map(|(k, v)| (k.to_string(), v))))
}
fn json_str(val: &str) -> JsonValue {
JsonStr(val.to_string())
}
#[cfg(feature = "net")]
impl From<crate::net::channel::ChannelInfo> for tinyjson::JsonValue {
fn from(info: crate::net::channel::ChannelInfo) -> tinyjson::JsonValue {
tinyjson::JsonValue::Object(std::collections::HashMap::from([
("address".to_string(), tinyjson::JsonValue::String(info.address.to_string())),
("random_id".to_string(), tinyjson::JsonValue::Number(info.random_id.into())),
]))
impl From<net::channel::ChannelInfo> for JsonValue {
fn from(info: net::channel::ChannelInfo) -> JsonValue {
json_map([("addr", JsonStr(info.addr.to_string())), ("id", JsonNum(info.id.into()))])
}
}
#[cfg(feature = "net")]
impl From<crate::net::dnet::MessageInfo> for tinyjson::JsonValue {
fn from(info: crate::net::dnet::MessageInfo) -> tinyjson::JsonValue {
tinyjson::JsonValue::Object(std::collections::HashMap::from([
("chan".to_string(), info.chan.into()),
("cmd".to_string(), tinyjson::JsonValue::String(info.cmd.clone())),
("time".to_string(), tinyjson::JsonValue::String(info.time.0.to_string())),
]))
impl From<net::dnet::MessageInfo> for JsonValue {
fn from(info: net::dnet::MessageInfo) -> JsonValue {
json_map([
("chan", info.chan.into()),
("cmd", JsonStr(info.cmd.clone())),
("time", JsonStr(info.time.0.to_string())),
])
}
}
#[cfg(feature = "net")]
impl From<crate::net::dnet::DnetEvent> for tinyjson::JsonValue {
fn from(event: crate::net::dnet::DnetEvent) -> tinyjson::JsonValue {
impl From<net::dnet::OutboundConnect> for JsonValue {
fn from(info: net::dnet::OutboundConnect) -> JsonValue {
json_map([
("slot", JsonNum(info.slot.into())),
("addr", JsonStr(info.addr.to_string())),
("channel_id", JsonNum(info.channel_id.into())),
])
}
}
#[cfg(feature = "net")]
impl From<net::dnet::DnetEvent> for JsonValue {
fn from(event: net::dnet::DnetEvent) -> JsonValue {
match event {
crate::net::dnet::DnetEvent::SendMessage(message_info) => message_info.into(),
net::dnet::DnetEvent::SendMessage(info) => {
json_map([("event", json_str("send")), ("info", info.into())])
}
net::dnet::DnetEvent::RecvMessage(info) => {
json_map([("event", json_str("recv")), ("info", info.into())])
}
net::dnet::DnetEvent::OutboundConnected(info) => {
json_map([("event", json_str("outbound_connected")), ("info", info.into())])
}
net::dnet::DnetEvent::OutboundDisconnected(slot) => json_map([
("event", json_str("outbound_disconnected")),
("slot", JsonNum(slot.into())),
]),
}
}
}