diff --git a/bin/darkfid/src/proto/protocol_proposal.rs b/bin/darkfid/src/proto/protocol_proposal.rs index c6d0d2a44..42a35da60 100644 --- a/bin/darkfid/src/proto/protocol_proposal.rs +++ b/bin/darkfid/src/proto/protocol_proposal.rs @@ -26,6 +26,7 @@ use tinyjson::JsonValue; use darkfi::{ impl_p2p_message, net::{ + metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION}, protocol::protocol_generic::{ ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr, }, @@ -46,7 +47,7 @@ use crate::task::handle_unknown_proposal; #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] pub struct ProposalMessage(pub Proposal); -impl_p2p_message!(ProposalMessage, "proposal", 0); +impl_p2p_message!(ProposalMessage, "proposal", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Atomic pointer to the `ProtocolProposal` handler. pub type ProtocolProposalHandlerPtr = Arc; diff --git a/bin/darkfid/src/proto/protocol_sync.rs b/bin/darkfid/src/proto/protocol_sync.rs index 6245f2785..a72b70f66 100644 --- a/bin/darkfid/src/proto/protocol_sync.rs +++ b/bin/darkfid/src/proto/protocol_sync.rs @@ -25,6 +25,7 @@ use darkfi::{ blockchain::{BlockInfo, Header, HeaderHash}, impl_p2p_message, net::{ + metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION}, protocol::protocol_generic::{ ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr, }, @@ -49,7 +50,7 @@ pub struct TipRequest { pub tip: HeaderHash, } -impl_p2p_message!(TipRequest, "tiprequest", 0); +impl_p2p_message!(TipRequest, "tiprequest", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure representing the response to `TipRequest`, /// containing a boolean flag to indicate if we are synced, @@ -64,7 +65,7 @@ pub struct TipResponse { pub hash: Option, } -impl_p2p_message!(TipResponse, "tipresponse", 0); +impl_p2p_message!(TipResponse, "tipresponse", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure represening a request to ask a node for up to `BATCH` headers before /// the provided header height. @@ -74,7 +75,7 @@ pub struct HeaderSyncRequest { pub height: u32, } -impl_p2p_message!(HeaderSyncRequest, "headersyncrequest", 0); +impl_p2p_message!(HeaderSyncRequest, "headersyncrequest", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure representing the response to `HeaderSyncRequest`, /// containing up to `BATCH` headers before the requested block height. @@ -84,7 +85,7 @@ pub struct HeaderSyncResponse { pub headers: Vec
, } -impl_p2p_message!(HeaderSyncResponse, "headersyncresponse", 0); +impl_p2p_message!(HeaderSyncResponse, "headersyncresponse", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure represening a request to ask a node for up to`BATCH` blocks /// of provided headers. @@ -94,7 +95,7 @@ pub struct SyncRequest { pub headers: Vec, } -impl_p2p_message!(SyncRequest, "syncrequest", 0); +impl_p2p_message!(SyncRequest, "syncrequest", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure representing the response to `SyncRequest`, /// containing up to `BATCH` blocks after the requested block height. @@ -104,7 +105,7 @@ pub struct SyncResponse { pub blocks: Vec, } -impl_p2p_message!(SyncResponse, "syncresponse", 0); +impl_p2p_message!(SyncResponse, "syncresponse", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure represening a request to ask a node a fork sequence. /// If we include a specific fork tip, they have to return its sequence, @@ -119,7 +120,7 @@ pub struct ForkSyncRequest { pub fork_tip: Option, } -impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 0); +impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure representing the response to `ForkSyncRequest`, /// containing the requested fork sequence. @@ -129,7 +130,7 @@ pub struct ForkSyncResponse { pub proposals: Vec, } -impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0); +impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure represening a request to ask a node a fork header for the /// requested height. The fork is identified by the provided header hash. @@ -141,7 +142,13 @@ pub struct ForkHeaderHashRequest { pub fork_header: HeaderHash, } -impl_p2p_message!(ForkHeaderHashRequest, "forkheaderhashrequest", 0); +impl_p2p_message!( + ForkHeaderHashRequest, + "forkheaderhashrequest", + 0, + 0, + DEFAULT_METERING_CONFIGURATION +); /// Structure representing the response to `ForkHeaderHashRequest`, /// containing the requested fork header hash, if it was found. @@ -151,7 +158,13 @@ pub struct ForkHeaderHashResponse { pub fork_header: Option, } -impl_p2p_message!(ForkHeaderHashResponse, "forkheaderhashresponse", 0); +impl_p2p_message!( + ForkHeaderHashResponse, + "forkheaderhashresponse", + 0, + 0, + DEFAULT_METERING_CONFIGURATION +); /// Structure represening a request to ask a node for up to `BATCH` /// fork headers for provided header hashes. The fork is identified @@ -164,7 +177,7 @@ pub struct ForkHeadersRequest { pub fork_header: HeaderHash, } -impl_p2p_message!(ForkHeadersRequest, "forkheadersrequest", 0); +impl_p2p_message!(ForkHeadersRequest, "forkheadersrequest", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure representing the response to `ForkHeadersRequest`, /// containing up to `BATCH` fork headers. @@ -174,7 +187,7 @@ pub struct ForkHeadersResponse { pub headers: Vec
, } -impl_p2p_message!(ForkHeadersResponse, "forkheadersresponse", 0); +impl_p2p_message!(ForkHeadersResponse, "forkheadersresponse", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Structure represening a request to ask a node for up to `BATCH` /// fork proposals for provided header hashes. The fork is identified @@ -187,7 +200,13 @@ pub struct ForkProposalsRequest { pub fork_header: HeaderHash, } -impl_p2p_message!(ForkProposalsRequest, "forkproposalsrequest", 0); +impl_p2p_message!( + ForkProposalsRequest, + "forkproposalsrequest", + 0, + 0, + DEFAULT_METERING_CONFIGURATION +); /// Structure representing the response to `ForkProposalsRequest`, /// containing up to `BATCH` fork headers. @@ -197,7 +216,13 @@ pub struct ForkProposalsResponse { pub proposals: Vec, } -impl_p2p_message!(ForkProposalsResponse, "forkproposalsresponse", 0); +impl_p2p_message!( + ForkProposalsResponse, + "forkproposalsresponse", + 0, + 0, + DEFAULT_METERING_CONFIGURATION +); /// Atomic pointer to the `ProtocolSync` handler. pub type ProtocolSyncHandlerPtr = Arc; diff --git a/bin/dhtd/dhtd/src/proto.rs b/bin/dhtd/dhtd/src/proto.rs index de6865c03..acad33c3c 100644 --- a/bin/dhtd/dhtd/src/proto.rs +++ b/bin/dhtd/dhtd/src/proto.rs @@ -24,6 +24,7 @@ use darkfi::{ dht2::net_hashmap::{NetHashMapInsert, NetHashMapRemove}, impl_p2p_message, net::{ + metering::{DEFAULT_METERING_CONFIGURATION, MeteringConfiguration}, ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr, }, @@ -53,7 +54,7 @@ pub struct ChunkRequest { pub hash: blake3::Hash, } -impl_p2p_message!(ChunkRequest, "dhtchunkrequest", 0); +impl_p2p_message!(ChunkRequest, "dhtchunkrequest", 0, 0, DEFAULT_METERING_CONFIGURATION); #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct ChunkReply { @@ -61,14 +62,14 @@ pub struct ChunkReply { pub data: Vec, } -impl_p2p_message!(ChunkReply, "dhtchunkreply", 0); +impl_p2p_message!(ChunkReply, "dhtchunkreply", 0, 0, DEFAULT_METERING_CONFIGURATION); #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FileRequest { pub hash: blake3::Hash, } -impl_p2p_message!(FileRequest, "dhtfilerequest", 0); +impl_p2p_message!(FileRequest, "dhtfilerequest", 0, 0, DEFAULT_METERING_CONFIGURATION); #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FileReply { @@ -76,7 +77,7 @@ pub struct FileReply { pub chunks: Vec, } -impl_p2p_message!(FileReply, "dhtfilereply", 0); +impl_p2p_message!(FileReply, "dhtfilereply", 0, 0, DEFAULT_METERING_CONFIGURATION); impl ProtocolDht { #[allow(dead_code)] diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs index f4fa5c527..ab5d6beb3 100644 --- a/bin/fud/fud/src/proto.rs +++ b/bin/fud/fud/src/proto.rs @@ -23,6 +23,7 @@ use darkfi::{ geode::MAX_CHUNK_SIZE, impl_p2p_message, net::{ + metering::{DEFAULT_METERING_CONFIGURATION, MeteringConfiguration}, ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr, }, @@ -41,14 +42,14 @@ pub struct FudFilePut { pub file_hash: blake3::Hash, pub chunk_hashes: Vec, } -impl_p2p_message!(FudFilePut, "FudFilePut", 0); +impl_p2p_message!(FudFilePut, "FudFilePut", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a new chunk on the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudChunkPut { pub chunk_hash: blake3::Hash, } -impl_p2p_message!(FudChunkPut, "FudChunkPut", 0); +impl_p2p_message!(FudChunkPut, "FudChunkPut", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a new route for a file on the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -57,7 +58,7 @@ pub struct FudFileRoute { pub chunk_hashes: Vec, pub peer: Url, } -impl_p2p_message!(FudFileRoute, "FudFileRoute", 0); +impl_p2p_message!(FudFileRoute, "FudFileRoute", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a new route for a chunk on the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -65,28 +66,28 @@ pub struct FudChunkRoute { pub chunk_hash: blake3::Hash, pub peer: Url, } -impl_p2p_message!(FudChunkRoute, "FudChunkRoute", 0); +impl_p2p_message!(FudChunkRoute, "FudChunkRoute", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a file request from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudFileRequest { pub file_hash: blake3::Hash, } -impl_p2p_message!(FudFileRequest, "FudFileRequest", 0); +impl_p2p_message!(FudFileRequest, "FudFileRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a file reply from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudFileReply { pub chunk_hashes: Vec, } -impl_p2p_message!(FudFileReply, "FudFileReply", 0); +impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a chunk request from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudChunkRequest { pub chunk_hash: blake3::Hash, } -impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0); +impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a chunk reply from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -94,17 +95,17 @@ pub struct FudChunkReply { // TODO: This sould be a chunk-sized array, but then we need padding? pub chunk: Vec, } -impl_p2p_message!(FudChunkReply, "FudChunkReply", 0); +impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a chunk reply when a file is not found #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudFileNotFound; -impl_p2p_message!(FudFileNotFound, "FudFileNotFound", 0); +impl_p2p_message!(FudFileNotFound, "FudFileNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Message representing a chunk reply when a chunk is not found #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudChunkNotFound; -impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0); +impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION); /// P2P protocol implementation for fud. pub struct ProtocolFud { diff --git a/example/dchat/dchatd/src/dchatmsg.rs b/example/dchat/dchatd/src/dchatmsg.rs index d0300e5a1..e74454846 100644 --- a/example/dchat/dchatd/src/dchatmsg.rs +++ b/example/dchat/dchatd/src/dchatmsg.rs @@ -20,7 +20,13 @@ use smol::lock::Mutex; use std::sync::Arc; -use darkfi::{impl_p2p_message, net::Message}; +use darkfi::{ + impl_p2p_message, + net::{ + metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION}, + Message, + }, +}; use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable}; pub type DchatMsgsBuffer = Arc>>; @@ -30,5 +36,5 @@ pub struct DchatMsg { pub msg: String, } -impl_p2p_message!(DchatMsg, "DchatMsg", 0); +impl_p2p_message!(DchatMsg, "DchatMsg", 0, 0, DEFAULT_METERING_CONFIGURATION); // ANCHOR_END: msg diff --git a/script/research/dam/README.md b/script/research/dam/README.md index dde8365bf..58ea3f93f 100644 --- a/script/research/dam/README.md +++ b/script/research/dam/README.md @@ -23,3 +23,42 @@ and monitor responses. Localnet folder with script and configuration to deploy instances to test with. + +## Flood testing + +Here is a table of flooding scenarios to perfor to verify expected +behavior, based on configured messages parameters.
+ +| # | Description | Configuration | Outcome | +|---|---------------------------------------|----------------|----------------------------------------------------------------------------------------| +| 0 | No metering | Default | All flood messages get propagated instantly | +| 1 | Same metering everywhere | (0,1,6,500,10) | All flood messages eventually get propagated following rate limit rules | +| 2 | `node0` metering, `node1` no metering | (0,1,6,500,10) | node0 disconnects/bans node1 for flooding | +| 3 | `node0` no metering, `node1` metering | (0,1,6,500,10) | All flood messages eventually get propagated following rate limit rules | +| 4 | Only `Bar` metered | (0,1,6,500,10) | `Foo` messages get propagated instantly while `Bar` messages eventually get propagated | + + +### Methodology note + +Message configuration tuple legend: + +| Pos | Description | +|-----|-----------------------------------------| +| 0 | MAX_BYTES | +| 1 | METERING_SCORE | +| 2 | MeteringConfiguration.threshold | +| 3 | MeteringConfiguration.sleep_step (ms) | +| 4 | MeteringConfiguration.expiry_time (sec) | + +When different configurations are used between the two nodes, you +have to manually compile `damd` with the corresponding message +configuration, copy/move/rename the binary and update the localnet +script accordingly.
+Each message can be configured in their corresponding protocol file.
+All paths are relative from this folder. + +| Message | Path | +|---------------|------------------------------------------| +| `Bar` | `damd/src/proto/protocol_bar.rs::L49-55` | +| `FooRequest` | `damd/src/proto/protocol_foo.rs::L49-55` | +| `FooResponse` | `damd/src/proto/protocol_foo.rs::L64-70` | diff --git a/script/research/dam/damd/src/proto/protocol_bar.rs b/script/research/dam/damd/src/proto/protocol_bar.rs index 616c9cf4f..eefa049ed 100644 --- a/script/research/dam/damd/src/proto/protocol_bar.rs +++ b/script/research/dam/damd/src/proto/protocol_bar.rs @@ -25,6 +25,7 @@ use tinyjson::JsonValue; use darkfi::{ impl_p2p_message, net::{ + metering::MeteringConfiguration, protocol::protocol_generic::{ ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr, }, @@ -33,6 +34,7 @@ use darkfi::{ }, rpc::jsonrpc::JsonSubscriber, system::ExecutorPtr, + util::time::NanoTimestamp, Error, Result, }; use darkfi_serial::{SerialDecodable, SerialEncodable}; @@ -44,7 +46,13 @@ pub struct Bar { pub message: String, } -impl_p2p_message!(Bar, "bar", 0); +impl_p2p_message!( + Bar, + "bar", + 0, + 0, + MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp::from_secs(0) } +); /// Atomic pointer to the `ProtocolBar` handler. pub type ProtocolBarHandlerPtr = Arc; diff --git a/script/research/dam/damd/src/proto/protocol_foo.rs b/script/research/dam/damd/src/proto/protocol_foo.rs index 120396933..3f6b85cc5 100644 --- a/script/research/dam/damd/src/proto/protocol_foo.rs +++ b/script/research/dam/damd/src/proto/protocol_foo.rs @@ -25,6 +25,7 @@ use tinyjson::JsonValue; use darkfi::{ impl_p2p_message, net::{ + metering::MeteringConfiguration, protocol::protocol_generic::{ ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr, }, @@ -33,6 +34,7 @@ use darkfi::{ }, rpc::jsonrpc::JsonSubscriber, system::ExecutorPtr, + util::time::NanoTimestamp, Error, Result, }; use darkfi_serial::{SerialDecodable, SerialEncodable}; @@ -44,7 +46,13 @@ pub struct FooRequest { pub message: String, } -impl_p2p_message!(FooRequest, "foorequest", 0); +impl_p2p_message!( + FooRequest, + "foorequest", + 0, + 0, + MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp::from_secs(0) } +); /// Structure representing the response to `FooRequest`. #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] @@ -53,7 +61,13 @@ pub struct FooResponse { pub code: u8, } -impl_p2p_message!(FooResponse, "fooresponse", 0); +impl_p2p_message!( + FooResponse, + "fooresponse", + 0, + 0, + MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp::from_secs(0) } +); /// Atomic pointer to the `ProtocolFoo` handler. pub type ProtocolFooHandlerPtr = Arc; diff --git a/script/research/generic-node/src/main.rs b/script/research/generic-node/src/main.rs index d666f8910..bc5eb7736 100644 --- a/script/research/generic-node/src/main.rs +++ b/script/research/generic-node/src/main.rs @@ -26,6 +26,7 @@ use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; use darkfi::{ async_daemonize, cli_desc, impl_p2p_message, net::{ + metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION}, protocol::protocol_generic::{ ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr, }, @@ -71,25 +72,49 @@ struct Args { struct GenericStringMessage { msg: String, } -impl_p2p_message!(GenericStringMessage, "generic_string_message", 0); +impl_p2p_message!( + GenericStringMessage, + "generic_string_message", + 0, + 0, + DEFAULT_METERING_CONFIGURATION +); #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] struct GenericNumberMessage { num: u64, } -impl_p2p_message!(GenericNumberMessage, "generic_number_message", 0); +impl_p2p_message!( + GenericNumberMessage, + "generic_number_message", + 0, + 0, + DEFAULT_METERING_CONFIGURATION +); #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] struct GenericRequestMessage { msg: String, } -impl_p2p_message!(GenericRequestMessage, "generic_request_message", 0); +impl_p2p_message!( + GenericRequestMessage, + "generic_request_message", + 0, + 0, + DEFAULT_METERING_CONFIGURATION +); #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] struct GenericResponseMessage { msg: String, } -impl_p2p_message!(GenericResponseMessage, "generic_response_message", 0); +impl_p2p_message!( + GenericResponseMessage, + "generic_response_message", + 0, + 0, + DEFAULT_METERING_CONFIGURATION +); /// Generic daemon structure struct Genericd { diff --git a/src/error.rs b/src/error.rs index 6724a0261..3e6e480bd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -158,6 +158,9 @@ pub enum Error { #[error("P2P message is invalid")] MessageInvalid, + #[error("P2P message subsystem over metering limit")] + MeteringLimitExceeded, + #[cfg(feature = "arti-client")] #[error(transparent)] ArtiError(#[from] arti_client::Error), diff --git a/src/event_graph/proto.rs b/src/event_graph/proto.rs index fcfbf61b3..1c2f1eeb6 100644 --- a/src/event_graph/proto.rs +++ b/src/event_graph/proto.rs @@ -29,7 +29,17 @@ use log::{debug, error, trace, warn}; use smol::Executor; use super::{Event, EventGraphPtr, NULL_ID}; -use crate::{impl_p2p_message, net::*, system::msleep, util::time::NanoTimestamp, Error, Result}; +use crate::{ + impl_p2p_message, + net::{ + metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION}, + ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + system::msleep, + util::time::NanoTimestamp, + Error, Result, +}; /// Malicious behaviour threshold. If the threshold is reached, we will /// drop the peer from our P2P connection. @@ -116,27 +126,27 @@ pub struct ProtocolEventGraph { /// A P2P message representing publishing an event on the network #[derive(Clone, SerialEncodable, SerialDecodable)] pub struct EventPut(pub Event); -impl_p2p_message!(EventPut, "EventGraph::EventPut", 0); +impl_p2p_message!(EventPut, "EventGraph::EventPut", 0, 0, DEFAULT_METERING_CONFIGURATION); /// A P2P message representing an event request #[derive(Clone, SerialEncodable, SerialDecodable)] pub struct EventReq(pub Vec); -impl_p2p_message!(EventReq, "EventGraph::EventReq", 0); +impl_p2p_message!(EventReq, "EventGraph::EventReq", 0, 0, DEFAULT_METERING_CONFIGURATION); /// A P2P message representing an event reply #[derive(Clone, SerialEncodable, SerialDecodable)] pub struct EventRep(pub Vec); -impl_p2p_message!(EventRep, "EventGraph::EventRep", 0); +impl_p2p_message!(EventRep, "EventGraph::EventRep", 0, 0, DEFAULT_METERING_CONFIGURATION); /// A P2P message representing a request for a peer's DAG tips #[derive(Clone, SerialEncodable, SerialDecodable)] pub struct TipReq {} -impl_p2p_message!(TipReq, "EventGraph::TipReq", 0); +impl_p2p_message!(TipReq, "EventGraph::TipReq", 0, 0, DEFAULT_METERING_CONFIGURATION); /// A P2P message representing a reply for the peer's DAG tips #[derive(Clone, SerialEncodable, SerialDecodable)] pub struct TipRep(pub BTreeMap>); -impl_p2p_message!(TipRep, "EventGraph::TipRep", 0); +impl_p2p_message!(TipRep, "EventGraph::TipRep", 0, 0, DEFAULT_METERING_CONFIGURATION); #[async_trait] impl ProtocolBase for ProtocolEventGraph { diff --git a/src/net/channel.rs b/src/net/channel.rs index 2e720e7e8..ae5c37c97 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -17,6 +17,7 @@ */ use std::{ + collections::HashMap, fmt, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, @@ -43,6 +44,7 @@ use super::{ message, message::{SerializedMessage, VersionMessage}, message_publisher::{MessageSubscription, MessageSubsystem}, + metering::{MeteringConfiguration, MeteringQueue}, p2p::P2pPtr, session::{ Session, SessionBitFlag, SessionWeakPtr, SESSION_ALL, SESSION_INBOUND, SESSION_REFINE, @@ -51,7 +53,7 @@ use super::{ }; use crate::{ net::BanPolicy, - system::{Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription}, + system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription}, util::time::NanoTimestamp, Error, Result, }; @@ -96,6 +98,9 @@ pub struct Channel { pub version: OnceCell>, /// Channel debug info pub info: ChannelInfo, + /// Map holding a `MeteringQueue` for each [`Message`] to perform + /// rate limiting of propagation towards the stream. + metering_map: AsyncMutex>, } impl Channel { @@ -117,6 +122,7 @@ impl Channel { let start_time = UNIX_EPOCH.elapsed().unwrap().as_secs(); let info = ChannelInfo::new(resolve_addr, connect_addr.clone(), start_time); + let metering_map = AsyncMutex::new(HashMap::new()); Arc::new(Self { reader, @@ -128,6 +134,7 @@ impl Channel { session, version: OnceCell::new(), info, + metering_map, }) } @@ -189,18 +196,60 @@ impl Channel { /// into a `SerializedMessage` and then calls `send_serialized` to send it. /// Returns an error if something goes wrong. pub async fn send(&self, message: &M) -> Result<()> { - self.send_serialized(&SerializedMessage::new(message).await).await + self.send_serialized( + &SerializedMessage::new(message).await, + &M::METERING_SCORE, + &M::METERING_CONFIGURATION, + ) + .await } /// Sends the encoded payload of provided `SerializedMessage` across the channel. - /// Calls `send_message` that creates a new payload and sends it over the - /// network transport as a packet. Returns an error if something goes wrong. - pub async fn send_serialized(&self, message: &SerializedMessage) -> Result<()> { + /// + /// We first check if we should apply some throttling, based on the provided + /// `Message` configuration. We always sleep 2x times more that the exepted one, + /// so we don't flood the peer. + /// Then, calls `send_message` that creates a new payload and sends it over the + /// network transport as a packet. + /// Returns an error if something goes wrong. + pub async fn send_serialized( + &self, + message: &SerializedMessage, + metering_score: &u64, + metering_config: &MeteringConfiguration, + ) -> Result<()> { debug!( target: "net::channel::send()", "[START] command={} {:?}", message.command, self, ); + // Check if we need to initialize a `MeteringQueue` + // for this specific `Message`. + let mut lock = self.metering_map.lock().await; + if !lock.contains_key(&message.command) { + lock.insert(message.command.clone(), MeteringQueue::new(metering_config.clone())); + } + + // Insert metering information and grab potential sleep time. + // It's safe to unwrap here since we initialized the value + // previously. + let queue = lock.get_mut(&message.command).unwrap(); + queue.push(metering_score); + let sleep_time = queue.sleep_time(); + drop(lock); + + // Check if we need to sleep + if let Some(sleep_time) = sleep_time { + let sleep_time = 2 * sleep_time; + warn!( + target: "net::channel::send()", + "[P2P] Channel rate limit is active, sleeping before sending for: {} (ms)", + sleep_time, + ); + msleep(sleep_time).await; + } + + // Check if the channel is stopped, so we can abort if self.is_stopped() { return Err(Error::ChannelStopped) } @@ -382,7 +431,9 @@ impl Channel { // Send result to our publishers match self.message_subsystem.notify(&command, reader).await { Ok(()) => {} - Err(Error::MissingDispatcher) | Err(Error::MessageInvalid) => { + Err(Error::MissingDispatcher) | + Err(Error::MessageInvalid) | + Err(Error::MeteringLimitExceeded) => { // If we're getting messages without dispatchers or its invalid, // it's spam. We therefore ban this channel if: // @@ -402,7 +453,7 @@ impl Channel { if self.session.upgrade().unwrap().type_id() != SESSION_REFINE { warn!( target: "net::channel::main_receive_loop()", - "MissingDispatcher for command={}, channel={:?}", + "MissingDispatcher|MessageInvalid|MeteringLimitExcheeded for command={}, channel={:?}", command, self ); diff --git a/src/net/message.rs b/src/net/message.rs index c95b555cf..c0a2c340c 100644 --- a/src/net/message.rs +++ b/src/net/message.rs @@ -16,18 +16,27 @@ * along with this program. If not, see . */ +use std::net::Ipv6Addr; + use darkfi_serial::{ async_trait, serialize_async, AsyncDecodable, AsyncEncodable, SerialDecodable, SerialEncodable, }; -use std::net::Ipv6Addr; use url::{Host, Url}; +use crate::net::metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION}; + /// Generic message template. pub trait Message: 'static + Send + Sync + AsyncDecodable + AsyncEncodable { const NAME: &'static str; /// Message bytes vector length limit. /// Set to 0 for no limit. const MAX_BYTES: u64; + /// Message metering score value. + /// Set to 0 for no impact in metering. + const METERING_SCORE: u64; + /// Message metering configuration for rate limit. + /// Use `MeteringConfiguration::default()` for no limit. + const METERING_CONFIGURATION: MeteringConfiguration; } /// Generic serialized message template. @@ -44,10 +53,12 @@ impl SerializedMessage { #[macro_export] macro_rules! impl_p2p_message { - ($st:ty, $nm:expr, $mb:expr) => { + ($st:ty, $nm:expr, $mb:expr, $ms:expr, $mc:expr) => { impl Message for $st { const NAME: &'static str = $nm; const MAX_BYTES: u64 = $mb; + const METERING_SCORE: u64 = $ms; + const METERING_CONFIGURATION: MeteringConfiguration = $mc; } }; } @@ -57,14 +68,14 @@ macro_rules! impl_p2p_message { pub struct PingMessage { pub nonce: u16, } -impl_p2p_message!(PingMessage, "ping", 0); +impl_p2p_message!(PingMessage, "ping", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Inbound keepalive message. #[derive(Debug, Copy, Clone, SerialEncodable, SerialDecodable)] pub struct PongMessage { pub nonce: u16, } -impl_p2p_message!(PongMessage, "pong", 0); +impl_p2p_message!(PongMessage, "pong", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Requests address of outbound connecction. #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -77,7 +88,7 @@ pub struct GetAddrsMessage { /// Preferred addresses transports pub transports: Vec, } -impl_p2p_message!(GetAddrsMessage, "getaddr", 0); +impl_p2p_message!(GetAddrsMessage, "getaddr", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Sends address information to inbound connection. #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -85,7 +96,7 @@ pub struct AddrsMessage { pub addrs: Vec<(Url, u64)>, } -impl_p2p_message!(AddrsMessage, "addr", 0); +impl_p2p_message!(AddrsMessage, "addr", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Requests version information of outbound connection. #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -109,7 +120,7 @@ pub struct VersionMessage { /// to be enabled for this connection pub features: Vec<(String, u32)>, } -impl_p2p_message!(VersionMessage, "version", 0); +impl_p2p_message!(VersionMessage, "version", 0, 0, DEFAULT_METERING_CONFIGURATION); impl VersionMessage { pub(in crate::net) fn get_ipv6_addr(&self) -> Option { @@ -129,4 +140,4 @@ pub struct VerackMessage { /// App version pub app_version: semver::Version, } -impl_p2p_message!(VerackMessage, "verack", 0); +impl_p2p_message!(VerackMessage, "verack", 0, 0, DEFAULT_METERING_CONFIGURATION); diff --git a/src/net/message_publisher.rs b/src/net/message_publisher.rs index 48a4f7df0..001bc8032 100644 --- a/src/net/message_publisher.rs +++ b/src/net/message_publisher.rs @@ -25,26 +25,43 @@ use rand::{rngs::OsRng, Rng}; use smol::{io::AsyncReadExt, lock::Mutex}; use super::message::Message; -use crate::{net::transport::PtStream, system::timeout::timeout, Error, Result}; +use crate::{ + net::{metering::MeteringQueue, transport::PtStream}, + system::{msleep, timeout::timeout}, + Error, Result, +}; use darkfi_serial::{AsyncDecodable, VarInt}; /// 64-bit identifier for message subscription. pub type MessageSubscriptionId = u64; type MessageResult = Result>; +/// Dispatcher subscriptions HashMap type. +type DispatcherSubscriptionsMap = + Mutex, Option)>>>; + /// A dispatcher that is unique to every [`Message`]. +/// /// Maintains a list of subscriptions to a unique Message /// type and handles sending messages across these /// subscriptions. +/// +/// Additionally, holds a `MeteringQueue` using the +/// [`Message`] configuration to perform rate limiting +/// of propagation towards the subscriptions. #[derive(Debug)] struct MessageDispatcher { - subs: Mutex>>>, + subs: DispatcherSubscriptionsMap, + metering_queue: Mutex, } impl MessageDispatcher { /// Create a new message dispatcher fn new() -> Self { - Self { subs: Mutex::new(HashMap::new()) } + Self { + subs: Mutex::new(HashMap::new()), + metering_queue: Mutex::new(MeteringQueue::new(M::METERING_CONFIGURATION)), + } } /// Create a random ID. @@ -84,12 +101,19 @@ impl MessageDispatcher { async fn _trigger_all(&self, message: MessageResult) { let mut subs = self.subs.lock().await; + let msg_result_type = if message.is_ok() { "Ok" } else { "Err" }; debug!( target: "net::message_publisher::_trigger_all()", "START msg={}({}), subs={}", - if message.is_ok() { "Ok" } else {"Err"}, + msg_result_type, M::NAME, subs.len(), ); + // Insert metering information and grab potential sleep time + let mut queue = self.metering_queue.lock().await; + queue.push(&M::METERING_SCORE); + let sleep_time = queue.sleep_time(); + drop(queue); + let mut futures = FuturesUnordered::new(); let mut garbage_ids = vec![]; @@ -99,7 +123,7 @@ impl MessageDispatcher { let sub = sub.clone(); let message = message.clone(); futures.push(async move { - match sub.send(message).await { + match sub.send((message, sleep_time)).await { Ok(res) => Ok((sub_id, res)), Err(err) => Err((sub_id, err)), } @@ -120,7 +144,7 @@ impl MessageDispatcher { debug!( target: "net::message_publisher::_trigger_all()", "END msg={}({}), subs={}", - if message.is_ok() { "Ok" } else { "Err" }, + msg_result_type, M::NAME, subs.len(), ); } @@ -131,17 +155,28 @@ impl MessageDispatcher { #[derive(Debug)] pub struct MessageSubscription { id: MessageSubscriptionId, - recv_queue: smol::channel::Receiver>, + recv_queue: smol::channel::Receiver<(MessageResult, Option)>, parent: Arc>, } impl MessageSubscription { /// Start receiving messages. + /// Sender also provides with a sleep time, + /// in case rate limit has started. pub async fn receive(&self) -> MessageResult { - match self.recv_queue.recv().await { - Ok(message) => message, + let (message, sleep_time) = match self.recv_queue.recv().await { + Ok(pair) => pair, Err(e) => panic!("MessageSubscription::receive(): recv_queue failed! {}", e), + }; + + // Check if we need to sleep + if message.is_ok() { + if let Some(sleep_time) = sleep_time { + msleep(sleep_time).await; + } } + + message } /// Start receiving messages with timeout. @@ -150,12 +185,22 @@ impl MessageSubscription { let Ok(res) = timeout(dur, self.recv_queue.recv()).await else { return Err(Error::ConnectTimeout) }; - match res { - Ok(message) => message, + + let (message, sleep_time) = match res { + Ok(pair) => pair, Err(e) => { panic!("MessageSubscription::receive_with_timeout(): recv_queue failed! {}", e) } + }; + + // Check if we need to sleep + if message.is_ok() { + if let Some(sleep_time) = sleep_time { + msleep(sleep_time).await; + } } + + message } /// Cleans existing items from the receiver channel. @@ -185,6 +230,8 @@ trait MessageDispatcherInterface: Send + Sync { async fn trigger_error(&self, err: Error); + async fn metering_score(&self) -> u64; + fn as_any(self: Arc) -> Arc; } @@ -248,6 +295,14 @@ impl MessageDispatcherInterface for MessageDispatcher { self._trigger_all(Err(err)).await; } + /// Internal function to retrieve metering queue current total score, + /// after prunning expired metering information. + async fn metering_score(&self) -> u64 { + let mut lock = self.metering_queue.lock().await; + lock.clean(); + lock.total() + } + /// Converts to `Any` trait. Enables the dynamic modification of static types. fn as_any(self: Arc) -> Arc { self @@ -255,22 +310,34 @@ impl MessageDispatcherInterface for MessageDispatcher { } /// Generic publish/subscribe class that maintains a list of dispatchers. +/// /// Dispatchers transmit messages to subscribers and are specific to one /// message type. +/// +/// Additionally, holds a global metering limit, which is the sum of each +/// dispatcher `MeteringQueue` threshold, to drop the connection if passed. #[derive(Default)] pub struct MessageSubsystem { dispatchers: Mutex>>, + metering_limit: Mutex, } impl MessageSubsystem { /// Create a new message subsystem. pub fn new() -> Self { - Self { dispatchers: Mutex::new(HashMap::new()) } + Self { dispatchers: Mutex::new(HashMap::new()), metering_limit: Mutex::new(0) } } /// Add a new dispatcher for specified [`Message`]. pub async fn add_dispatch(&self) { - self.dispatchers.lock().await.insert(M::NAME, Arc::new(MessageDispatcher::::new())); + // First lock the dispatchers + let mut lock = self.dispatchers.lock().await; + + // Update the metering limit + *self.metering_limit.lock().await += M::METERING_CONFIGURATION.threshold; + + // Insert the new dispatcher + lock.insert(M::NAME, Arc::new(MessageDispatcher::::new())); } /// Subscribes to a [`Message`]. Using the Message name, the method @@ -305,11 +372,32 @@ impl MessageSubsystem { command: &str, reader: &mut smol::io::ReadHalf>, ) -> Result<()> { - let Some(dispatcher) = self.dispatchers.lock().await.get(command).cloned() else { - return Err(Error::MissingDispatcher) - }; + // Iterate over dispatchers and keep track of their current + // metering score + let mut found = false; + let mut total_score = 0; + for (name, dispatcher) in self.dispatchers.lock().await.iter() { + // If dispatcher is the command one, trasmit the message + if name == &command { + dispatcher.trigger(reader).await?; + found = true; + } - dispatcher.trigger(reader).await + // Grab its total score + total_score += dispatcher.metering_score().await; + } + + // Check if dispatcher was found + if !found { + return Err(Error::MissingDispatcher) + } + + // Check if we are over the global metering limit + if total_score > *self.metering_limit.lock().await { + return Err(Error::MeteringLimitExceeded) + } + + Ok(()) } /// Concurrently transmits an error message across dispatchers. diff --git a/src/net/metering.rs b/src/net/metering.rs new file mode 100644 index 000000000..4fad64060 --- /dev/null +++ b/src/net/metering.rs @@ -0,0 +1,227 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2025 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::collections::VecDeque; + +use log::debug; + +use crate::util::time::NanoTimestamp; + +/// Struct representing metering configuration parameters. +#[derive(Clone, Debug)] +pub struct MeteringConfiguration { + /// Defines the threshold after which rate limit kicks in. + /// Set to 0 for no threshold. + /// + /// If we don't use raw count as our metric, it should be calculated + /// by multiplying the median increase of the measured item with the + /// "max" number of items we want before rate limit starts. + /// For example, if we measure some item that increases our total + /// measurement by ~5 and want to rate limit after about 10, this + /// should be set as 50. + pub threshold: u64, + /// Sleep time for each unit over the threshold, in milliseconds. + /// + /// This is used to calculate sleep time when ratelimit is active. + /// The computed sleep time when we are over the threshold will be: + /// sleep_time = (total - threshold) * sleep_step + pub sleep_step: u64, + /// Parameter defining the expiration of each item, for time based + /// decay, in nano seconds. Set to 0 for no expiration. + pub expiry_time: NanoTimestamp, +} + +impl MeteringConfiguration { + /// Generate a new `MeteringConfiguration` for provided threshold, + /// sleep step and expiration time (seconds). + pub fn new(threshold: u64, sleep_step: u64, expiry_time: u128) -> Self { + Self { threshold, sleep_step, expiry_time: NanoTimestamp::from_secs(expiry_time) } + } +} + +impl Default for MeteringConfiguration { + fn default() -> Self { + Self { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) } + } +} + +/// Default `MeteringConfiguration` as a constant, +/// so it can be used in trait macros. +pub const DEFAULT_METERING_CONFIGURATION: MeteringConfiguration = + MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) }; + +/// Struct to keep track of some sequential metered actions and compute +/// rate limits. +/// +/// The queue uses a time based decay and prunes metering information +/// after corresponding expiration time has passed. +#[derive(Debug)] +pub struct MeteringQueue { + /// Metering configuration of the queue. + config: MeteringConfiguration, + /// Ring buffer keeping track of action execution timestamp and + /// its metered value. + queue: VecDeque<(NanoTimestamp, u64)>, +} + +impl MeteringQueue { + /// Generate a new `MeteringQueue` for provided `MeteringConfiguration`. + pub fn new(config: MeteringConfiguration) -> Self { + Self { config, queue: VecDeque::new() } + } + + /// Prune expired metering information from the queue. + pub fn clean(&mut self) { + // Check if expiration has been set + if self.config.expiry_time.0 == 0 { + return + } + + // Iterate the queue to cleanup expired elements + while let Some((ts, _)) = self.queue.front() { + // This is an edge case where system reports a future timestamp + // therefore elapsed computation fails. + let Ok(elapsed) = ts.elapsed() else { + debug!(target: "net::metering::MeteringQueue::clean()", "Timestamp [{}] is in future. Removing...", ts); + let _ = self.queue.pop_front(); + continue + }; + + // Check if elapsed time is over the expiration limit + if elapsed < self.config.expiry_time { + break + } + + // Remove element + let _ = self.queue.pop_front(); + } + } + + /// Add new metering value to the queue, after + /// prunning expired metering information. + /// If no thresshold has been set, the insert is + /// ignored. + pub fn push(&mut self, value: &u64) { + // Check if threshold has been set + if self.config.threshold == 0 { + return + } + + // Prune expired elements + self.clean(); + + // Push the new value + self.queue.push_back((NanoTimestamp::current_time(), *value)); + } + + /// Compute the current metered values total. + pub fn total(&self) -> u64 { + let mut total = 0; + for (_, value) in &self.queue { + total += value; + } + total + } + + /// Compute sleep time for current metered values total, based on + /// the metering configuration. + /// + /// The sleep time increases linearly, based on configuration sleep + /// step. For example, in a raw count metering model, if we set the + /// configuration with threshold = 6 and sleep_step = 250, when + /// total = 10, returned sleep time will be 1000 ms. + /// + /// Sleep times table for the above example: + /// + /// | Total | Sleep Time (ms) | + /// |-------|-----------------| + /// | 0 | 0 | + /// | 4 | 0 | + /// | 6 | 0 | + /// | 7 | 250 | + /// | 8 | 500 | + /// | 9 | 750 | + /// | 10 | 1000 | + /// | 14 | 2000 | + /// | 18 | 3000 | + pub fn sleep_time(&self) -> Option { + // Check if threshold has been set + if self.config.threshold == 0 { + return None + } + + // Check if we are over the threshold + let total = self.total(); + if total < self.config.threshold { + return None + } + + // Compute the actual sleep time + Some((total - self.config.threshold) * self.config.sleep_step) + } +} + +#[test] +fn test_net_metering_queue_default() { + let mut queue = MeteringQueue::new(MeteringConfiguration::default()); + for _ in 0..100 { + queue.push(&1); + assert!(queue.queue.is_empty()); + assert_eq!(queue.total(), 0); + assert!(queue.sleep_time().is_none()); + } +} + +#[test] +fn test_net_metering_queue_raw_count() { + let threshold = 6; + let sleep_step = 250; + let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0); + let mut queue = MeteringQueue::new(metering_configuration); + for i in 1..threshold { + queue.push(&1); + assert_eq!(queue.total(), i); + assert!(queue.sleep_time().is_none()); + } + for i in threshold..100 { + queue.push(&1); + assert_eq!(queue.total(), i); + assert_eq!(queue.sleep_time(), Some((i - threshold) * sleep_step)); + } +} + +#[test] +fn test_net_metering_queue_sleep_time() { + let metered_value_median = 5; + let threshold_items = 10; + let threshold = metered_value_median * threshold_items; + let sleep_step = 50; + let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0); + let mut queue = MeteringQueue::new(metering_configuration); + for i in 1..threshold_items { + queue.push(&metered_value_median); + assert_eq!(queue.total(), (i * metered_value_median)); + assert!(queue.sleep_time().is_none()); + } + for i in threshold_items..100 { + queue.push(&metered_value_median); + let expected_total = i * metered_value_median; + assert_eq!(queue.total(), expected_total); + assert_eq!(queue.sleep_time(), Some((expected_total - threshold) * sleep_step)); + } +} diff --git a/src/net/mod.rs b/src/net/mod.rs index 1ab66cfb2..92dd8f76c 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -128,3 +128,6 @@ pub use settings::{BanPolicy, Settings}; /// and then call `p2p.dnet_sub()` to start receiving events. #[macro_use] pub mod dnet; + +/// Metering related definitions. +pub mod metering; diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 2cc0c4f6a..fe42d41fa 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -195,23 +195,12 @@ impl P2p { return } + // Serialize the provided message let message = SerializedMessage::new(message).await; - let futures = FuturesUnordered::new(); - for channel in channel_list { - futures.push(channel.send_serialized(&message).map_err(|e| { - error!( - target: "net::p2p::broadcast()", - "[P2P] Broadcasting message to {} failed: {}", - channel.address(), e - ); - // If the channel is stopped then it should automatically die - // and the session will remove it from p2p. - assert!(channel.is_stopped()); - })); - } - - let _results: Vec<_> = futures.collect().await; + // Spawn a detached task to actually send the message to the channels, + // so we don't block wiating channels that are rate limited. + self.executor.spawn(broadcast_serialized_to::(message, channel_list.to_vec())).detach(); } /// Check whether this node has connections to any peers. This method will @@ -297,3 +286,30 @@ impl P2p { self.hosts.get_channel(id) } } + +/// Auxiliary function to broadcast a serialized message concurrently to all given peers. +async fn broadcast_serialized_to( + message: SerializedMessage, + channel_list: Vec, +) { + let futures = FuturesUnordered::new(); + + for channel in &channel_list { + futures.push( + channel + .send_serialized(&message, &M::METERING_SCORE, &M::METERING_CONFIGURATION) + .map_err(|e| { + error!( + target: "net::p2p::broadcast()", + "[P2P] Broadcasting message to {} failed: {}", + channel.address(), e + ); + // If the channel is stopped then it should automatically die + // and the session will remove it from p2p. + assert!(channel.is_stopped()); + }), + ); + } + + let _results: Vec<_> = futures.collect().await; +} diff --git a/src/tx/mod.rs b/src/tx/mod.rs index a73fffea5..c9cb892ef 100644 --- a/src/tx/mod.rs +++ b/src/tx/mod.rs @@ -230,10 +230,13 @@ impl std::fmt::Debug for Transaction { } #[cfg(feature = "net")] -use crate::net::Message; +use crate::net::{ + metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION}, + Message, +}; #[cfg(feature = "net")] -crate::impl_p2p_message!(Transaction, "tx", 0); +crate::impl_p2p_message!(Transaction, "tx", 0, 0, DEFAULT_METERING_CONFIGURATION); /// Calls tree bounds definitions // TODO: increase min to 2 when fees are implement