script/research/dam: better flood control

This commit is contained in:
skoupidi
2025-03-01 16:36:25 +02:00
parent 78109ca8ab
commit 9a9917705f
6 changed files with 64 additions and 22 deletions

View File

@@ -16,5 +16,7 @@ darkfi-serial = "0.4.2"
# Misc
clap = {version = "4.4.11", features = ["derive"]}
log = "0.4.26"
simplelog = "0.12.2"
smol = "2.0.2"
url = "2.5.4"

View File

@@ -20,6 +20,7 @@ use std::sync::Arc;
use clap::{Parser, Subcommand};
use darkfi::{cli_desc, rpc::util::JsonValue, Result};
use simplelog::{Config, LevelFilter, SimpleLogger};
use smol::Executor;
use dam_cli::DamCli;
@@ -48,13 +49,19 @@ enum Subcmd {
},
/// Signal damd to execute a flooding attack against the network
Flood,
Flood {
/// Optional flood messages count limit
limit: Option<u32>,
},
/// Signal damd to stop an ongoing flooding attack
StopFlood,
}
fn main() -> Result<()> {
// Setup terminal logger
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
// Initialize an executor
let executor = Arc::new(Executor::new());
let ex = executor.clone();
@@ -73,11 +80,15 @@ fn main() -> Result<()> {
dam_cli.subscribe(&args.endpoint, &method, &ex).await?;
}
Subcmd::Flood => {
Subcmd::Flood { limit } => {
let limit = match limit {
Some(l) => JsonValue::String(format!("{l}")),
None => JsonValue::String(String::from("0")),
};
dam_cli
.damd_daemon_request(
"flood.switch",
&JsonValue::Array(vec![JsonValue::Boolean(true)]),
&JsonValue::Array(vec![JsonValue::Boolean(true), limit]),
)
.await?;
}
@@ -86,7 +97,10 @@ fn main() -> Result<()> {
dam_cli
.damd_daemon_request(
"flood.switch",
&JsonValue::Array(vec![JsonValue::Boolean(false)]),
&JsonValue::Array(vec![
JsonValue::Boolean(false),
JsonValue::String(String::from("0")),
]),
)
.await?;
}

View File

@@ -27,6 +27,7 @@ use darkfi::{
system::{ExecutorPtr, Publisher, StoppableTask},
Error, Result,
};
use log::{error, info};
use url::Url;
use crate::DamCli;
@@ -34,12 +35,12 @@ use crate::DamCli;
impl DamCli {
/// Auxiliary function to ping configured damd daemon for liveness.
pub async fn ping(&self) -> Result<()> {
println!("Executing ping request to damd...");
info!("Executing ping request to damd...");
let latency = Instant::now();
let rep = self.damd_daemon_request("ping", &JsonValue::Array(vec![])).await?;
let latency = latency.elapsed();
println!("Got reply: {rep:?}");
println!("Latency: {latency:?}");
info!("Got reply: {rep:?}");
info!("Latency: {latency:?}");
Ok(())
}
@@ -52,7 +53,7 @@ impl DamCli {
/// Subscribes to damd's JSON-RPC notification endpoints.
pub async fn subscribe(&self, endpoint: &str, method: &str, ex: &ExecutorPtr) -> Result<()> {
println!("Subscribing to receive notifications for: {method}");
info!("Subscribing to receive notifications for: {method}");
let endpoint = Url::parse(endpoint)?;
let _method = String::from(method);
let publisher = Publisher::new();
@@ -70,7 +71,7 @@ impl DamCli {
match res {
Ok(()) => { /* Do nothing */ }
Err(e) => {
eprintln!("[subscribe] JSON-RPC server error: {e:?}");
error!("[subscribe] JSON-RPC server error: {e:?}");
publisher
.notify(JsonResult::Error(JsonError::new(
ErrorCode::InternalError,
@@ -84,13 +85,13 @@ impl DamCli {
Error::RpcServerStopped,
ex.clone(),
);
println!("Detached subscription to background");
println!("All is good. Waiting for new notifications...");
info!("Detached subscription to background");
info!("All is good. Waiting for new notifications...");
let e = loop {
match subscription.receive().await {
JsonResult::Notification(n) => {
println!("Got notification from subscription");
info!("Got notification from subscription");
if n.method != method {
break Error::UnexpectedJsonRpc(format!(
"Got foreign notification from damd: {}",

View File

@@ -28,7 +28,7 @@ sleep 1
tmux new-window -t $session -n "flood"
tmux send-keys -t $session "$DAMD_CLI0 subscribe protocols.subscribe_foo" Enter
tmux split-window -t $session -v -l 20%
tmux send-keys -t $session "$DAMD_CLI1 flood"
tmux send-keys -t $session "$DAMD_CLI1 flood 10"
tmux select-pane -t 0
tmux split-window -t $session -h
tmux send-keys -t $session "$DAMD_CLI1 subscribe protocols.subscribe_attack_foo" Enter

View File

@@ -60,7 +60,7 @@ impl DamFlooder {
}
/// Start the Denial-of-service Analysis Multitool flooder.
pub async fn start(&self, subscribers: &HashMap<&'static str, JsonSubscriber>) {
pub async fn start(&self, subscribers: &HashMap<&'static str, JsonSubscriber>, limit: u32) {
info!(
target: "damd::flooder::DamFlooder::start",
"Starting the Denial-of-service Analysis Multitool flooder..."
@@ -80,7 +80,7 @@ impl DamFlooder {
for peer in self.p2p.hosts().channels() {
let task = StoppableTask::new();
task.clone().start(
flood_foo(self.p2p.settings().read().await.outbound_connect_timeout, peer, subscribers.get("attack_foo").unwrap().clone()),
flood_foo(self.p2p.settings().read().await.outbound_connect_timeout, peer, subscribers.get("attack_foo").unwrap().clone(), limit),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
@@ -96,7 +96,7 @@ impl DamFlooder {
// Spawn a task for `Bar` messages to broadcast to everyone
let task = StoppableTask::new();
task.clone().start(
flood_bar(self.p2p.clone(), subscribers.get("attack_bar").unwrap().clone()),
flood_bar(self.p2p.clone(), subscribers.get("attack_bar").unwrap().clone(), limit),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
@@ -142,7 +142,12 @@ impl DamFlooder {
}
/// Background flooder function for `ProtocolFoo`.
async fn flood_foo(comms_timeout: u64, peer: ChannelPtr, subscriber: JsonSubscriber) -> Result<()> {
async fn flood_foo(
comms_timeout: u64,
peer: ChannelPtr,
subscriber: JsonSubscriber,
limit: u32,
) -> Result<()> {
debug!(target: "damd::flooder::flood_foo", "START");
// Communication setup
let Ok(response_sub) = peer.subscribe_msg::<FooResponse>().await else {
@@ -182,11 +187,18 @@ async fn flood_foo(comms_timeout: u64, peer: ChannelPtr, subscriber: JsonSubscri
info!(target: "damd::flooder::flood_foo", "{notification}");
subscriber.notify(vec![JsonValue::String(notification)].into()).await;
message_index += 1;
// Check limit
if limit != 0 && message_index > limit {
debug!(target: "damd::flooder::flood_foo", "STOP");
info!(target: "damd::flooder::flood_foo", "Flood limit reached!");
return Ok(())
}
}
}
/// Background flooder function for `ProtocolBar`.
async fn flood_bar(p2p: P2pPtr, subscriber: JsonSubscriber) -> Result<()> {
async fn flood_bar(p2p: P2pPtr, subscriber: JsonSubscriber, limit: u32) -> Result<()> {
debug!(target: "damd::flooder::flood_bar", "START");
// Flood the network, if we are connected to peers
@@ -199,6 +211,13 @@ async fn flood_bar(p2p: P2pPtr, subscriber: JsonSubscriber) -> Result<()> {
subscriber.notify(vec![JsonValue::String(notification)].into()).await;
p2p.broadcast(&Bar { message }).await;
message_index += 1;
// Check limit
if limit != 0 && message_index > limit {
debug!(target: "damd::flooder::flood_bar", "STOP");
info!(target: "damd::flooder::flood_foo", "Flood limit reached!");
return Ok(())
}
}
debug!(target: "damd::flooder::flood_bar", "STOP");

View File

@@ -26,7 +26,7 @@ use darkfi::{
net::P2pPtr,
rpc::{
jsonrpc::{
ErrorCode::{InvalidParams, MethodNotFound},
ErrorCode::{InvalidParams, MethodNotFound, ParseError},
JsonError, JsonRequest, JsonResponse, JsonResult,
},
p2p_method::HandlerP2p,
@@ -179,19 +179,25 @@ impl DamNode {
// Activate or deactivate damd flooder.
// By sending `true`, flooder will be activated, and by sending `false` flooder
// will be deactivated. Returns `true` on success.
// A limit can be passed, defining after how many messages the flooder will
// stop. If its 0, flooder will keep going.
//
// --> {"jsonrpc": "2.0", "method": "flood", "params": [true], "id": 42}
// --> {"jsonrpc": "2.0", "method": "flood", "params": [true, "100"], "id": 42}
// <-- {"jsonrpc": "2.0", "result": true, "id": 42}
async fn flood_switch(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 1 || !params[0].is_bool() {
if params.len() != 2 || !params[0].is_bool() || !params[1].is_string() {
return JsonError::new(InvalidParams, None, id).into()
}
let switch = params[0].get::<bool>().unwrap();
let limit = match params[1].get::<String>().unwrap().parse::<u32>() {
Ok(v) => v,
Err(_) => return JsonError::new(ParseError, None, id).into(),
};
if *switch {
self.flooder.start(&self.subscribers).await;
self.flooder.start(&self.subscribers, limit).await;
} else {
self.flooder.stop().await;
}