script/research: new generic-node example added showcasing the p2p generic protol approach

This commit is contained in:
skoupidi
2024-09-09 22:32:01 +03:00
parent 392a26ce6a
commit 4abbb1641e
11 changed files with 485 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
/target
Cargo.lock
rustfmt.toml

View File

@@ -0,0 +1,28 @@
[package]
name = "generic-node"
version = "0.4.1"
description = "Generic node daemon to test out design choices before integrating them to core nodes."
authors = ["Dyne.org foundation <foundation@dyne.org>"]
repository = "https://codeberg.org/darkrenaissance/darkfi"
license = "AGPL-3.0-only"
edition = "2021"
[workspace]
[dependencies]
darkfi = {path = "../../../", features = ["async-daemonize", "net"]}
darkfi-serial = {path = "../../../src/serial"}
rand = "0.8.5"
# Daemon
easy-parallel = "3.3.1"
log = "0.4.22"
signal-hook-async-std = "0.2.2"
signal-hook = "0.3.17"
simplelog = "0.12.2"
smol = "2.0.1"
# Argument parsing
serde = {version = "1.0.209", features = ["derive"]}
structopt = "0.3.26"
structopt-toml = "0.5.1"

View File

@@ -0,0 +1,64 @@
## generic node configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
##
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
## P2P network settings
[net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:38967"]
# P2P external addresses the instance advertises so other peers can
# reach us and connect to us, as long as inbound addrs are configured.
#external_addrs = []
# Peer nodes to manually connect to
#peers = []
# Seed nodes to connect to for peer discovery and/or adversising our
# own external addresses
#seeds = []
# Whitelisted network transports for outbound connections
#allowed_transports = ["tcp+tls"]
# Allow transport mixing (e.g. Tor would be allowed to connect to `tcp://`)
#transport_mixing = true
# Outbound connection slots number, this many connections will be
# attempted. (This does not include manual connections)
#outbound_connections = 8
# Inbound connections slots number, this many active inbound connections
# will be allowed. (This does not include manual or outbound connections)
#inbound_connections = 8
## White connection percent
# gold_connect_count = 2
## White connection percent
# white_connect_percent = 70
# Manual connections retry limit, 0 for forever looping
#manual_attempt_limit = 0
# Outbound connection timeout (in seconds)
#outbound_connect_timeout = 10
# Exchange versions (handshake) timeout (in seconds)
#channel_handshake_timeout = 4
# Ping-pong exchange execution interval (in seconds)
#channel_heartbeat_interval = 10
# Allow localnet hosts
localnet = true
# Cooling off time for peer discovery when unsuccessful
#outbound_peer_discovery_cooloff_time = 30
# Time between peer discovery attempts
#outbound_peer_discovery_attempt_time = 5

View File

@@ -0,0 +1,5 @@
darkfid localnet
================
This will start five `generic-node` node instances
to propagate messages between them.

View File

@@ -0,0 +1,15 @@
## generic node configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
##
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
## P2P network settings
[net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:38967"]
# Allow localnet hosts
localnet = true

View File

@@ -0,0 +1,18 @@
## generic node configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
##
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
## P2P network settings
[net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:38968"]
# Peer nodes to manually connect to
peers = ["tcp+tls://0.0.0.0:38967"]
# Allow localnet hosts
localnet = true

View File

@@ -0,0 +1,21 @@
## generic node configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
##
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
## P2P network settings
[net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:38969"]
# Peer nodes to manually connect to
peers = [
"tcp+tls://0.0.0.0:38967",
"tcp+tls://0.0.0.0:38968",
]
# Allow localnet hosts
localnet = true

View File

@@ -0,0 +1,22 @@
## generic node configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
##
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
## P2P network settings
[net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:38970"]
# Peer nodes to manually connect to
peers = [
"tcp+tls://0.0.0.0:38967",
"tcp+tls://0.0.0.0:38968",
"tcp+tls://0.0.0.0:38969",
]
# Allow localnet hosts
localnet = true

View File

@@ -0,0 +1,23 @@
## generic node configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
##
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
## P2P network settings
[net]
# P2P accept addresses the instance listens on for inbound connections
inbound = ["tcp+tls://0.0.0.0:38971"]
# Peer nodes to manually connect to
peers = [
"tcp+tls://0.0.0.0:38967",
"tcp+tls://0.0.0.0:38968",
"tcp+tls://0.0.0.0:38969",
"tcp+tls://0.0.0.0:38970",
]
# Allow localnet hosts
localnet = true

View File

@@ -0,0 +1,29 @@
#!/bin/sh
set -e
# Start a tmux session with five generic node daemons
session=five-generic-nodes
if [ "$1" = "-vv" ]; then
verbose="-vv"
shift
else
verbose=""
fi
tmux new-session -d -s $session
tmux send-keys -t $session "cd .. && cargo +nightly run --release -- ${verbose} -c localnet/node0.toml --node-id 100" Enter
sleep 1
tmux new-window -t $session
tmux send-keys -t $session "cd .. && cargo +nightly run --release -- ${verbose} -c localnet/node1.toml --node-id 101" Enter
sleep 1
tmux new-window -t $session
tmux send-keys -t $session "cd .. && cargo +nightly run --release -- ${verbose} -c localnet/node2.toml --node-id 102" Enter
sleep 1
tmux new-window -t $session
tmux send-keys -t $session "cd .. && cargo +nightly run --release -- ${verbose} -c localnet/node3.toml --node-id 103" Enter
sleep 1
tmux new-window -t $session
tmux send-keys -t $session "cd .. && cargo +nightly run --release -- ${verbose} -c localnet/node4.toml --node-id 104" Enter
tmux attach -t $session

View File

@@ -0,0 +1,257 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2024 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{collections::HashSet, sync::Arc};
use log::{error, info};
use rand::{rngs::OsRng, Rng};
use smol::{stream::StreamExt, Executor};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use darkfi::{
async_daemonize, cli_desc, impl_p2p_message,
net::{
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
session::SESSION_DEFAULT,
settings::SettingsOpt,
Message, P2p, P2pPtr, Settings,
},
system::{sleep, StoppableTask, StoppableTaskPtr},
Error, Result,
};
use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
const CONFIG_FILE: &str = "generic_node_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../generic_node_config.toml");
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "generic-node", about = cli_desc!())]
struct Args {
#[structopt(short, long)]
/// Configuration file to use
config: Option<String>,
#[structopt(short, long)]
/// Set log file to ouput into
log: Option<String>,
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
verbose: u8,
#[structopt(short, long)]
/// Node ID, used in the dummy messages
node_id: u64,
/// P2P network settings
#[structopt(flatten)]
net: SettingsOpt,
}
// Generic messages
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct GenericStringMessage {
msg: String,
}
impl_p2p_message!(GenericStringMessage, "generic_string_message");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct GenericNumberMessage {
num: u64,
}
impl_p2p_message!(GenericNumberMessage, "generic_number_message");
/// Generic daemon structure
struct Genericd {
/// Node ID, used in the dummy messages
node_id: u64,
/// P2P network pointer
p2p: P2pPtr,
/// GenericStringMessage handler
generic_string_msg_handler: ProtocolGenericHandlerPtr<GenericStringMessage>,
/// GenericNumberMessage handler
generic_number_msg_handler: ProtocolGenericHandlerPtr<GenericNumberMessage>,
/// Broadcasting messages task
broadcast_task: StoppableTaskPtr,
}
impl Genericd {
// Initialize daemon with all its required stuff.
async fn new(
node_id: u64,
settings: &Settings,
executor: &Arc<Executor<'static>>,
) -> Result<Self> {
// Generating the p2p configuration and attaching our protocols
let p2p = P2p::new(settings.clone(), executor.clone()).await?;
// Add a generic protocol handler for GenericStringMessage
let generic_string_msg_handler =
ProtocolGenericHandler::new(&p2p, "ProtocolGenericString", SESSION_DEFAULT).await;
// Add a generic protocol for GenericNumberMessage
let generic_number_msg_handler =
ProtocolGenericHandler::new(&p2p, "ProtocolGenericNumber", SESSION_DEFAULT).await;
let broadcast_task = StoppableTask::new();
Ok(Self {
node_id,
p2p,
generic_string_msg_handler,
generic_number_msg_handler,
broadcast_task,
})
}
/// Start all daemon background tasks.
async fn start(&self, executor: &Arc<Executor<'static>>) -> Result<()> {
info!(target: "genericd", "Starting tasks...");
self.generic_string_msg_handler.task.clone().start(
handle_generic_string_msg(self.generic_string_msg_handler.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "genericd", "Failed starting protocol generic string handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.generic_number_msg_handler.task.clone().start(
handle_generic_number_msg(self.generic_number_msg_handler.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "genericd", "Failed starting protocol generic number handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.p2p.clone().start().await?;
self.broadcast_task.clone().start(
broadcast_messages(self.node_id, self.p2p.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "genericd", "Failed starting broadcasting task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
info!(target: "genericd", "All tasks started!");
Ok(())
}
/// Stop all daemon background tasks.
async fn stop(&self) {
info!(target: "genericd", "Terminating tasks...");
self.p2p.stop().await;
self.generic_string_msg_handler.task.stop().await;
self.generic_number_msg_handler.task.stop().await;
info!(target: "genericd", "All tasks terminated!");
}
}
/// Background handler function for GenericStringMessage.
async fn handle_generic_string_msg(
handler: ProtocolGenericHandlerPtr<GenericStringMessage>,
) -> Result<()> {
let mut seen = HashSet::new();
loop {
// Wait for a new message
let (channel, msg) = handler.receiver.recv().await?;
if seen.contains(&msg.msg) {
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
info!("Received string message from channel {channel}: {}", msg.msg);
seen.insert(msg.msg);
handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
}
}
/// Background handler function for GenericNumberMessage.
async fn handle_generic_number_msg(
handler: ProtocolGenericHandlerPtr<GenericNumberMessage>,
) -> Result<()> {
let mut seen = HashSet::new();
loop {
// Wait for a new message
let (channel, msg) = handler.receiver.recv().await?;
if seen.contains(&msg.num) {
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
info!("Received string message from channel {channel}: {}", msg.num);
seen.insert(msg.num);
handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
}
}
/// Background function to send messages at random intervals.
async fn broadcast_messages(node_id: u64, p2p: P2pPtr) -> Result<()> {
let mut counter = 0;
loop {
let sleep_time = OsRng.gen_range(1..=10);
info!("Sleeping {sleep_time} till next broadcast...");
sleep(sleep_time).await;
info!("Broacasting messages...");
let string_msg =
GenericStringMessage { msg: format!("Hello from node {node_id}({counter})!") };
let number_msg = GenericNumberMessage { num: node_id + counter };
p2p.broadcast(&string_msg).await;
p2p.broadcast(&number_msg).await;
counter += 1;
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
info!(target: "generic-node", "Initializing generic node...");
let genericd = Genericd::new(args.node_id, &args.net.into(), &ex).await?;
genericd.start(&ex).await?;
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
info!(target: "generic-node", "Caught termination signal, cleaning up and exiting...");
info!(target: "generic-node", "Stopping genericd...");
genericd.stop().await;
Ok(())
}