net/message: optional metering infra for rate limitting added

This commit is contained in:
skoupidi
2025-03-06 20:16:41 +02:00
parent a4cc48a819
commit 6c02b4a450
18 changed files with 625 additions and 93 deletions

View File

@@ -26,6 +26,7 @@ use tinyjson::JsonValue;
use darkfi::{
impl_p2p_message,
net::{
metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
@@ -46,7 +47,7 @@ use crate::task::handle_unknown_proposal;
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ProposalMessage(pub Proposal);
impl_p2p_message!(ProposalMessage, "proposal", 0);
impl_p2p_message!(ProposalMessage, "proposal", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Atomic pointer to the `ProtocolProposal` handler.
pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;

View File

@@ -25,6 +25,7 @@ use darkfi::{
blockchain::{BlockInfo, Header, HeaderHash},
impl_p2p_message,
net::{
metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
@@ -49,7 +50,7 @@ pub struct TipRequest {
pub tip: HeaderHash,
}
impl_p2p_message!(TipRequest, "tiprequest", 0);
impl_p2p_message!(TipRequest, "tiprequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure representing the response to `TipRequest`,
/// containing a boolean flag to indicate if we are synced,
@@ -64,7 +65,7 @@ pub struct TipResponse {
pub hash: Option<HeaderHash>,
}
impl_p2p_message!(TipResponse, "tipresponse", 0);
impl_p2p_message!(TipResponse, "tipresponse", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure represening a request to ask a node for up to `BATCH` headers before
/// the provided header height.
@@ -74,7 +75,7 @@ pub struct HeaderSyncRequest {
pub height: u32,
}
impl_p2p_message!(HeaderSyncRequest, "headersyncrequest", 0);
impl_p2p_message!(HeaderSyncRequest, "headersyncrequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure representing the response to `HeaderSyncRequest`,
/// containing up to `BATCH` headers before the requested block height.
@@ -84,7 +85,7 @@ pub struct HeaderSyncResponse {
pub headers: Vec<Header>,
}
impl_p2p_message!(HeaderSyncResponse, "headersyncresponse", 0);
impl_p2p_message!(HeaderSyncResponse, "headersyncresponse", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure represening a request to ask a node for up to`BATCH` blocks
/// of provided headers.
@@ -94,7 +95,7 @@ pub struct SyncRequest {
pub headers: Vec<HeaderHash>,
}
impl_p2p_message!(SyncRequest, "syncrequest", 0);
impl_p2p_message!(SyncRequest, "syncrequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure representing the response to `SyncRequest`,
/// containing up to `BATCH` blocks after the requested block height.
@@ -104,7 +105,7 @@ pub struct SyncResponse {
pub blocks: Vec<BlockInfo>,
}
impl_p2p_message!(SyncResponse, "syncresponse", 0);
impl_p2p_message!(SyncResponse, "syncresponse", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure represening a request to ask a node a fork sequence.
/// If we include a specific fork tip, they have to return its sequence,
@@ -119,7 +120,7 @@ pub struct ForkSyncRequest {
pub fork_tip: Option<HeaderHash>,
}
impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 0);
impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure representing the response to `ForkSyncRequest`,
/// containing the requested fork sequence.
@@ -129,7 +130,7 @@ pub struct ForkSyncResponse {
pub proposals: Vec<Proposal>,
}
impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0);
impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure represening a request to ask a node a fork header for the
/// requested height. The fork is identified by the provided header hash.
@@ -141,7 +142,13 @@ pub struct ForkHeaderHashRequest {
pub fork_header: HeaderHash,
}
impl_p2p_message!(ForkHeaderHashRequest, "forkheaderhashrequest", 0);
impl_p2p_message!(
ForkHeaderHashRequest,
"forkheaderhashrequest",
0,
0,
DEFAULT_METERING_CONFIGURATION
);
/// Structure representing the response to `ForkHeaderHashRequest`,
/// containing the requested fork header hash, if it was found.
@@ -151,7 +158,13 @@ pub struct ForkHeaderHashResponse {
pub fork_header: Option<HeaderHash>,
}
impl_p2p_message!(ForkHeaderHashResponse, "forkheaderhashresponse", 0);
impl_p2p_message!(
ForkHeaderHashResponse,
"forkheaderhashresponse",
0,
0,
DEFAULT_METERING_CONFIGURATION
);
/// Structure represening a request to ask a node for up to `BATCH`
/// fork headers for provided header hashes. The fork is identified
@@ -164,7 +177,7 @@ pub struct ForkHeadersRequest {
pub fork_header: HeaderHash,
}
impl_p2p_message!(ForkHeadersRequest, "forkheadersrequest", 0);
impl_p2p_message!(ForkHeadersRequest, "forkheadersrequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure representing the response to `ForkHeadersRequest`,
/// containing up to `BATCH` fork headers.
@@ -174,7 +187,7 @@ pub struct ForkHeadersResponse {
pub headers: Vec<Header>,
}
impl_p2p_message!(ForkHeadersResponse, "forkheadersresponse", 0);
impl_p2p_message!(ForkHeadersResponse, "forkheadersresponse", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Structure represening a request to ask a node for up to `BATCH`
/// fork proposals for provided header hashes. The fork is identified
@@ -187,7 +200,13 @@ pub struct ForkProposalsRequest {
pub fork_header: HeaderHash,
}
impl_p2p_message!(ForkProposalsRequest, "forkproposalsrequest", 0);
impl_p2p_message!(
ForkProposalsRequest,
"forkproposalsrequest",
0,
0,
DEFAULT_METERING_CONFIGURATION
);
/// Structure representing the response to `ForkProposalsRequest`,
/// containing up to `BATCH` fork headers.
@@ -197,7 +216,13 @@ pub struct ForkProposalsResponse {
pub proposals: Vec<Proposal>,
}
impl_p2p_message!(ForkProposalsResponse, "forkproposalsresponse", 0);
impl_p2p_message!(
ForkProposalsResponse,
"forkproposalsresponse",
0,
0,
DEFAULT_METERING_CONFIGURATION
);
/// Atomic pointer to the `ProtocolSync` handler.
pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;

View File

@@ -24,6 +24,7 @@ use darkfi::{
dht2::net_hashmap::{NetHashMapInsert, NetHashMapRemove},
impl_p2p_message,
net::{
metering::{DEFAULT_METERING_CONFIGURATION, MeteringConfiguration},
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
@@ -53,7 +54,7 @@ pub struct ChunkRequest {
pub hash: blake3::Hash,
}
impl_p2p_message!(ChunkRequest, "dhtchunkrequest", 0);
impl_p2p_message!(ChunkRequest, "dhtchunkrequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ChunkReply {
@@ -61,14 +62,14 @@ pub struct ChunkReply {
pub data: Vec<u8>,
}
impl_p2p_message!(ChunkReply, "dhtchunkreply", 0);
impl_p2p_message!(ChunkReply, "dhtchunkreply", 0, 0, DEFAULT_METERING_CONFIGURATION);
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FileRequest {
pub hash: blake3::Hash,
}
impl_p2p_message!(FileRequest, "dhtfilerequest", 0);
impl_p2p_message!(FileRequest, "dhtfilerequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FileReply {
@@ -76,7 +77,7 @@ pub struct FileReply {
pub chunks: Vec<blake3::Hash>,
}
impl_p2p_message!(FileReply, "dhtfilereply", 0);
impl_p2p_message!(FileReply, "dhtfilereply", 0, 0, DEFAULT_METERING_CONFIGURATION);
impl ProtocolDht {
#[allow(dead_code)]

View File

@@ -23,6 +23,7 @@ use darkfi::{
geode::MAX_CHUNK_SIZE,
impl_p2p_message,
net::{
metering::{DEFAULT_METERING_CONFIGURATION, MeteringConfiguration},
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
@@ -41,14 +42,14 @@ pub struct FudFilePut {
pub file_hash: blake3::Hash,
pub chunk_hashes: Vec<blake3::Hash>,
}
impl_p2p_message!(FudFilePut, "FudFilePut", 0);
impl_p2p_message!(FudFilePut, "FudFilePut", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a new chunk on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkPut {
pub chunk_hash: blake3::Hash,
}
impl_p2p_message!(FudChunkPut, "FudChunkPut", 0);
impl_p2p_message!(FudChunkPut, "FudChunkPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a new route for a file on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -57,7 +58,7 @@ pub struct FudFileRoute {
pub chunk_hashes: Vec<blake3::Hash>,
pub peer: Url,
}
impl_p2p_message!(FudFileRoute, "FudFileRoute", 0);
impl_p2p_message!(FudFileRoute, "FudFileRoute", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a new route for a chunk on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -65,28 +66,28 @@ pub struct FudChunkRoute {
pub chunk_hash: blake3::Hash,
pub peer: Url,
}
impl_p2p_message!(FudChunkRoute, "FudChunkRoute", 0);
impl_p2p_message!(FudChunkRoute, "FudChunkRoute", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a file request from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFileRequest {
pub file_hash: blake3::Hash,
}
impl_p2p_message!(FudFileRequest, "FudFileRequest", 0);
impl_p2p_message!(FudFileRequest, "FudFileRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a file reply from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFileReply {
pub chunk_hashes: Vec<blake3::Hash>,
}
impl_p2p_message!(FudFileReply, "FudFileReply", 0);
impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a chunk request from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkRequest {
pub chunk_hash: blake3::Hash,
}
impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0);
impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a chunk reply from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -94,17 +95,17 @@ pub struct FudChunkReply {
// TODO: This sould be a chunk-sized array, but then we need padding?
pub chunk: Vec<u8>,
}
impl_p2p_message!(FudChunkReply, "FudChunkReply", 0);
impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a chunk reply when a file is not found
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFileNotFound;
impl_p2p_message!(FudFileNotFound, "FudFileNotFound", 0);
impl_p2p_message!(FudFileNotFound, "FudFileNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a chunk reply when a chunk is not found
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudChunkNotFound;
impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0);
impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// P2P protocol implementation for fud.
pub struct ProtocolFud {

View File

@@ -20,7 +20,13 @@
use smol::lock::Mutex;
use std::sync::Arc;
use darkfi::{impl_p2p_message, net::Message};
use darkfi::{
impl_p2p_message,
net::{
metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
Message,
},
};
use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
pub type DchatMsgsBuffer = Arc<Mutex<Vec<DchatMsg>>>;
@@ -30,5 +36,5 @@ pub struct DchatMsg {
pub msg: String,
}
impl_p2p_message!(DchatMsg, "DchatMsg", 0);
impl_p2p_message!(DchatMsg, "DchatMsg", 0, 0, DEFAULT_METERING_CONFIGURATION);
// ANCHOR_END: msg

View File

@@ -23,3 +23,42 @@ and monitor responses.
Localnet folder with script and configuration to deploy
instances to test with.
## Flood testing
Here is a table of flooding scenarios to perfor to verify expected
behavior, based on configured messages parameters.<br>
| # | Description | Configuration | Outcome |
|---|---------------------------------------|----------------|----------------------------------------------------------------------------------------|
| 0 | No metering | Default | All flood messages get propagated instantly |
| 1 | Same metering everywhere | (0,1,6,500,10) | All flood messages eventually get propagated following rate limit rules |
| 2 | `node0` metering, `node1` no metering | (0,1,6,500,10) | node0 disconnects/bans node1 for flooding |
| 3 | `node0` no metering, `node1` metering | (0,1,6,500,10) | All flood messages eventually get propagated following rate limit rules |
| 4 | Only `Bar` metered | (0,1,6,500,10) | `Foo` messages get propagated instantly while `Bar` messages eventually get propagated |
### Methodology note
Message configuration tuple legend:
| Pos | Description |
|-----|-----------------------------------------|
| 0 | MAX_BYTES |
| 1 | METERING_SCORE |
| 2 | MeteringConfiguration.threshold |
| 3 | MeteringConfiguration.sleep_step (ms) |
| 4 | MeteringConfiguration.expiry_time (sec) |
When different configurations are used between the two nodes, you
have to manually compile `damd` with the corresponding message
configuration, copy/move/rename the binary and update the localnet
script accordingly.<br>
Each message can be configured in their corresponding protocol file.<br>
All paths are relative from this folder.
| Message | Path |
|---------------|------------------------------------------|
| `Bar` | `damd/src/proto/protocol_bar.rs::L49-55` |
| `FooRequest` | `damd/src/proto/protocol_foo.rs::L49-55` |
| `FooResponse` | `damd/src/proto/protocol_foo.rs::L64-70` |

View File

@@ -25,6 +25,7 @@ use tinyjson::JsonValue;
use darkfi::{
impl_p2p_message,
net::{
metering::MeteringConfiguration,
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
@@ -33,6 +34,7 @@ use darkfi::{
},
rpc::jsonrpc::JsonSubscriber,
system::ExecutorPtr,
util::time::NanoTimestamp,
Error, Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
@@ -44,7 +46,13 @@ pub struct Bar {
pub message: String,
}
impl_p2p_message!(Bar, "bar", 0);
impl_p2p_message!(
Bar,
"bar",
0,
0,
MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp::from_secs(0) }
);
/// Atomic pointer to the `ProtocolBar` handler.
pub type ProtocolBarHandlerPtr = Arc<ProtocolBarHandler>;

View File

@@ -25,6 +25,7 @@ use tinyjson::JsonValue;
use darkfi::{
impl_p2p_message,
net::{
metering::MeteringConfiguration,
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
@@ -33,6 +34,7 @@ use darkfi::{
},
rpc::jsonrpc::JsonSubscriber,
system::ExecutorPtr,
util::time::NanoTimestamp,
Error, Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
@@ -44,7 +46,13 @@ pub struct FooRequest {
pub message: String,
}
impl_p2p_message!(FooRequest, "foorequest", 0);
impl_p2p_message!(
FooRequest,
"foorequest",
0,
0,
MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp::from_secs(0) }
);
/// Structure representing the response to `FooRequest`.
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
@@ -53,7 +61,13 @@ pub struct FooResponse {
pub code: u8,
}
impl_p2p_message!(FooResponse, "fooresponse", 0);
impl_p2p_message!(
FooResponse,
"fooresponse",
0,
0,
MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp::from_secs(0) }
);
/// Atomic pointer to the `ProtocolFoo` handler.
pub type ProtocolFooHandlerPtr = Arc<ProtocolFooHandler>;

View File

@@ -26,6 +26,7 @@ use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use darkfi::{
async_daemonize, cli_desc, impl_p2p_message,
net::{
metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
@@ -71,25 +72,49 @@ struct Args {
struct GenericStringMessage {
msg: String,
}
impl_p2p_message!(GenericStringMessage, "generic_string_message", 0);
impl_p2p_message!(
GenericStringMessage,
"generic_string_message",
0,
0,
DEFAULT_METERING_CONFIGURATION
);
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct GenericNumberMessage {
num: u64,
}
impl_p2p_message!(GenericNumberMessage, "generic_number_message", 0);
impl_p2p_message!(
GenericNumberMessage,
"generic_number_message",
0,
0,
DEFAULT_METERING_CONFIGURATION
);
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct GenericRequestMessage {
msg: String,
}
impl_p2p_message!(GenericRequestMessage, "generic_request_message", 0);
impl_p2p_message!(
GenericRequestMessage,
"generic_request_message",
0,
0,
DEFAULT_METERING_CONFIGURATION
);
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct GenericResponseMessage {
msg: String,
}
impl_p2p_message!(GenericResponseMessage, "generic_response_message", 0);
impl_p2p_message!(
GenericResponseMessage,
"generic_response_message",
0,
0,
DEFAULT_METERING_CONFIGURATION
);
/// Generic daemon structure
struct Genericd {

View File

@@ -158,6 +158,9 @@ pub enum Error {
#[error("P2P message is invalid")]
MessageInvalid,
#[error("P2P message subsystem over metering limit")]
MeteringLimitExceeded,
#[cfg(feature = "arti-client")]
#[error(transparent)]
ArtiError(#[from] arti_client::Error),

View File

@@ -29,7 +29,17 @@ use log::{debug, error, trace, warn};
use smol::Executor;
use super::{Event, EventGraphPtr, NULL_ID};
use crate::{impl_p2p_message, net::*, system::msleep, util::time::NanoTimestamp, Error, Result};
use crate::{
impl_p2p_message,
net::{
metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
system::msleep,
util::time::NanoTimestamp,
Error, Result,
};
/// Malicious behaviour threshold. If the threshold is reached, we will
/// drop the peer from our P2P connection.
@@ -116,27 +126,27 @@ pub struct ProtocolEventGraph {
/// A P2P message representing publishing an event on the network
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct EventPut(pub Event);
impl_p2p_message!(EventPut, "EventGraph::EventPut", 0);
impl_p2p_message!(EventPut, "EventGraph::EventPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// A P2P message representing an event request
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct EventReq(pub Vec<blake3::Hash>);
impl_p2p_message!(EventReq, "EventGraph::EventReq", 0);
impl_p2p_message!(EventReq, "EventGraph::EventReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// A P2P message representing an event reply
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct EventRep(pub Vec<Event>);
impl_p2p_message!(EventRep, "EventGraph::EventRep", 0);
impl_p2p_message!(EventRep, "EventGraph::EventRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// A P2P message representing a request for a peer's DAG tips
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct TipReq {}
impl_p2p_message!(TipReq, "EventGraph::TipReq", 0);
impl_p2p_message!(TipReq, "EventGraph::TipReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// A P2P message representing a reply for the peer's DAG tips
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct TipRep(pub BTreeMap<u64, HashSet<blake3::Hash>>);
impl_p2p_message!(TipRep, "EventGraph::TipRep", 0);
impl_p2p_message!(TipRep, "EventGraph::TipRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
#[async_trait]
impl ProtocolBase for ProtocolEventGraph {

View File

@@ -17,6 +17,7 @@
*/
use std::{
collections::HashMap,
fmt,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
@@ -43,6 +44,7 @@ use super::{
message,
message::{SerializedMessage, VersionMessage},
message_publisher::{MessageSubscription, MessageSubsystem},
metering::{MeteringConfiguration, MeteringQueue},
p2p::P2pPtr,
session::{
Session, SessionBitFlag, SessionWeakPtr, SESSION_ALL, SESSION_INBOUND, SESSION_REFINE,
@@ -51,7 +53,7 @@ use super::{
};
use crate::{
net::BanPolicy,
system::{Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
util::time::NanoTimestamp,
Error, Result,
};
@@ -96,6 +98,9 @@ pub struct Channel {
pub version: OnceCell<Arc<VersionMessage>>,
/// Channel debug info
pub info: ChannelInfo,
/// Map holding a `MeteringQueue` for each [`Message`] to perform
/// rate limiting of propagation towards the stream.
metering_map: AsyncMutex<HashMap<String, MeteringQueue>>,
}
impl Channel {
@@ -117,6 +122,7 @@ impl Channel {
let start_time = UNIX_EPOCH.elapsed().unwrap().as_secs();
let info = ChannelInfo::new(resolve_addr, connect_addr.clone(), start_time);
let metering_map = AsyncMutex::new(HashMap::new());
Arc::new(Self {
reader,
@@ -128,6 +134,7 @@ impl Channel {
session,
version: OnceCell::new(),
info,
metering_map,
})
}
@@ -189,18 +196,60 @@ impl Channel {
/// into a `SerializedMessage` and then calls `send_serialized` to send it.
/// Returns an error if something goes wrong.
pub async fn send<M: message::Message>(&self, message: &M) -> Result<()> {
self.send_serialized(&SerializedMessage::new(message).await).await
self.send_serialized(
&SerializedMessage::new(message).await,
&M::METERING_SCORE,
&M::METERING_CONFIGURATION,
)
.await
}
/// Sends the encoded payload of provided `SerializedMessage` across the channel.
/// Calls `send_message` that creates a new payload and sends it over the
/// network transport as a packet. Returns an error if something goes wrong.
pub async fn send_serialized(&self, message: &SerializedMessage) -> Result<()> {
///
/// We first check if we should apply some throttling, based on the provided
/// `Message` configuration. We always sleep 2x times more that the exepted one,
/// so we don't flood the peer.
/// Then, calls `send_message` that creates a new payload and sends it over the
/// network transport as a packet.
/// Returns an error if something goes wrong.
pub async fn send_serialized(
&self,
message: &SerializedMessage,
metering_score: &u64,
metering_config: &MeteringConfiguration,
) -> Result<()> {
debug!(
target: "net::channel::send()", "[START] command={} {:?}",
message.command, self,
);
// Check if we need to initialize a `MeteringQueue`
// for this specific `Message`.
let mut lock = self.metering_map.lock().await;
if !lock.contains_key(&message.command) {
lock.insert(message.command.clone(), MeteringQueue::new(metering_config.clone()));
}
// Insert metering information and grab potential sleep time.
// It's safe to unwrap here since we initialized the value
// previously.
let queue = lock.get_mut(&message.command).unwrap();
queue.push(metering_score);
let sleep_time = queue.sleep_time();
drop(lock);
// Check if we need to sleep
if let Some(sleep_time) = sleep_time {
let sleep_time = 2 * sleep_time;
warn!(
target: "net::channel::send()",
"[P2P] Channel rate limit is active, sleeping before sending for: {} (ms)",
sleep_time,
);
msleep(sleep_time).await;
}
// Check if the channel is stopped, so we can abort
if self.is_stopped() {
return Err(Error::ChannelStopped)
}
@@ -382,7 +431,9 @@ impl Channel {
// Send result to our publishers
match self.message_subsystem.notify(&command, reader).await {
Ok(()) => {}
Err(Error::MissingDispatcher) | Err(Error::MessageInvalid) => {
Err(Error::MissingDispatcher) |
Err(Error::MessageInvalid) |
Err(Error::MeteringLimitExceeded) => {
// If we're getting messages without dispatchers or its invalid,
// it's spam. We therefore ban this channel if:
//
@@ -402,7 +453,7 @@ impl Channel {
if self.session.upgrade().unwrap().type_id() != SESSION_REFINE {
warn!(
target: "net::channel::main_receive_loop()",
"MissingDispatcher for command={}, channel={:?}",
"MissingDispatcher|MessageInvalid|MeteringLimitExcheeded for command={}, channel={:?}",
command, self
);

View File

@@ -16,18 +16,27 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::net::Ipv6Addr;
use darkfi_serial::{
async_trait, serialize_async, AsyncDecodable, AsyncEncodable, SerialDecodable, SerialEncodable,
};
use std::net::Ipv6Addr;
use url::{Host, Url};
use crate::net::metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION};
/// Generic message template.
pub trait Message: 'static + Send + Sync + AsyncDecodable + AsyncEncodable {
const NAME: &'static str;
/// Message bytes vector length limit.
/// Set to 0 for no limit.
const MAX_BYTES: u64;
/// Message metering score value.
/// Set to 0 for no impact in metering.
const METERING_SCORE: u64;
/// Message metering configuration for rate limit.
/// Use `MeteringConfiguration::default()` for no limit.
const METERING_CONFIGURATION: MeteringConfiguration;
}
/// Generic serialized message template.
@@ -44,10 +53,12 @@ impl SerializedMessage {
#[macro_export]
macro_rules! impl_p2p_message {
($st:ty, $nm:expr, $mb:expr) => {
($st:ty, $nm:expr, $mb:expr, $ms:expr, $mc:expr) => {
impl Message for $st {
const NAME: &'static str = $nm;
const MAX_BYTES: u64 = $mb;
const METERING_SCORE: u64 = $ms;
const METERING_CONFIGURATION: MeteringConfiguration = $mc;
}
};
}
@@ -57,14 +68,14 @@ macro_rules! impl_p2p_message {
pub struct PingMessage {
pub nonce: u16,
}
impl_p2p_message!(PingMessage, "ping", 0);
impl_p2p_message!(PingMessage, "ping", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Inbound keepalive message.
#[derive(Debug, Copy, Clone, SerialEncodable, SerialDecodable)]
pub struct PongMessage {
pub nonce: u16,
}
impl_p2p_message!(PongMessage, "pong", 0);
impl_p2p_message!(PongMessage, "pong", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Requests address of outbound connecction.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -77,7 +88,7 @@ pub struct GetAddrsMessage {
/// Preferred addresses transports
pub transports: Vec<String>,
}
impl_p2p_message!(GetAddrsMessage, "getaddr", 0);
impl_p2p_message!(GetAddrsMessage, "getaddr", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Sends address information to inbound connection.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -85,7 +96,7 @@ pub struct AddrsMessage {
pub addrs: Vec<(Url, u64)>,
}
impl_p2p_message!(AddrsMessage, "addr", 0);
impl_p2p_message!(AddrsMessage, "addr", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Requests version information of outbound connection.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -109,7 +120,7 @@ pub struct VersionMessage {
/// to be enabled for this connection
pub features: Vec<(String, u32)>,
}
impl_p2p_message!(VersionMessage, "version", 0);
impl_p2p_message!(VersionMessage, "version", 0, 0, DEFAULT_METERING_CONFIGURATION);
impl VersionMessage {
pub(in crate::net) fn get_ipv6_addr(&self) -> Option<Ipv6Addr> {
@@ -129,4 +140,4 @@ pub struct VerackMessage {
/// App version
pub app_version: semver::Version,
}
impl_p2p_message!(VerackMessage, "verack", 0);
impl_p2p_message!(VerackMessage, "verack", 0, 0, DEFAULT_METERING_CONFIGURATION);

View File

@@ -25,26 +25,43 @@ use rand::{rngs::OsRng, Rng};
use smol::{io::AsyncReadExt, lock::Mutex};
use super::message::Message;
use crate::{net::transport::PtStream, system::timeout::timeout, Error, Result};
use crate::{
net::{metering::MeteringQueue, transport::PtStream},
system::{msleep, timeout::timeout},
Error, Result,
};
use darkfi_serial::{AsyncDecodable, VarInt};
/// 64-bit identifier for message subscription.
pub type MessageSubscriptionId = u64;
type MessageResult<M> = Result<Arc<M>>;
/// Dispatcher subscriptions HashMap type.
type DispatcherSubscriptionsMap<M> =
Mutex<HashMap<MessageSubscriptionId, smol::channel::Sender<(MessageResult<M>, Option<u64>)>>>;
/// A dispatcher that is unique to every [`Message`].
///
/// Maintains a list of subscriptions to a unique Message
/// type and handles sending messages across these
/// subscriptions.
///
/// Additionally, holds a `MeteringQueue` using the
/// [`Message`] configuration to perform rate limiting
/// of propagation towards the subscriptions.
#[derive(Debug)]
struct MessageDispatcher<M: Message> {
subs: Mutex<HashMap<MessageSubscriptionId, smol::channel::Sender<MessageResult<M>>>>,
subs: DispatcherSubscriptionsMap<M>,
metering_queue: Mutex<MeteringQueue>,
}
impl<M: Message> MessageDispatcher<M> {
/// Create a new message dispatcher
fn new() -> Self {
Self { subs: Mutex::new(HashMap::new()) }
Self {
subs: Mutex::new(HashMap::new()),
metering_queue: Mutex::new(MeteringQueue::new(M::METERING_CONFIGURATION)),
}
}
/// Create a random ID.
@@ -84,12 +101,19 @@ impl<M: Message> MessageDispatcher<M> {
async fn _trigger_all(&self, message: MessageResult<M>) {
let mut subs = self.subs.lock().await;
let msg_result_type = if message.is_ok() { "Ok" } else { "Err" };
debug!(
target: "net::message_publisher::_trigger_all()", "START msg={}({}), subs={}",
if message.is_ok() { "Ok" } else {"Err"},
msg_result_type,
M::NAME, subs.len(),
);
// Insert metering information and grab potential sleep time
let mut queue = self.metering_queue.lock().await;
queue.push(&M::METERING_SCORE);
let sleep_time = queue.sleep_time();
drop(queue);
let mut futures = FuturesUnordered::new();
let mut garbage_ids = vec![];
@@ -99,7 +123,7 @@ impl<M: Message> MessageDispatcher<M> {
let sub = sub.clone();
let message = message.clone();
futures.push(async move {
match sub.send(message).await {
match sub.send((message, sleep_time)).await {
Ok(res) => Ok((sub_id, res)),
Err(err) => Err((sub_id, err)),
}
@@ -120,7 +144,7 @@ impl<M: Message> MessageDispatcher<M> {
debug!(
target: "net::message_publisher::_trigger_all()", "END msg={}({}), subs={}",
if message.is_ok() { "Ok" } else { "Err" },
msg_result_type,
M::NAME, subs.len(),
);
}
@@ -131,17 +155,28 @@ impl<M: Message> MessageDispatcher<M> {
#[derive(Debug)]
pub struct MessageSubscription<M: Message> {
id: MessageSubscriptionId,
recv_queue: smol::channel::Receiver<MessageResult<M>>,
recv_queue: smol::channel::Receiver<(MessageResult<M>, Option<u64>)>,
parent: Arc<MessageDispatcher<M>>,
}
impl<M: Message> MessageSubscription<M> {
/// Start receiving messages.
/// Sender also provides with a sleep time,
/// in case rate limit has started.
pub async fn receive(&self) -> MessageResult<M> {
match self.recv_queue.recv().await {
Ok(message) => message,
let (message, sleep_time) = match self.recv_queue.recv().await {
Ok(pair) => pair,
Err(e) => panic!("MessageSubscription::receive(): recv_queue failed! {}", e),
};
// Check if we need to sleep
if message.is_ok() {
if let Some(sleep_time) = sleep_time {
msleep(sleep_time).await;
}
}
message
}
/// Start receiving messages with timeout.
@@ -150,12 +185,22 @@ impl<M: Message> MessageSubscription<M> {
let Ok(res) = timeout(dur, self.recv_queue.recv()).await else {
return Err(Error::ConnectTimeout)
};
match res {
Ok(message) => message,
let (message, sleep_time) = match res {
Ok(pair) => pair,
Err(e) => {
panic!("MessageSubscription::receive_with_timeout(): recv_queue failed! {}", e)
}
};
// Check if we need to sleep
if message.is_ok() {
if let Some(sleep_time) = sleep_time {
msleep(sleep_time).await;
}
}
message
}
/// Cleans existing items from the receiver channel.
@@ -185,6 +230,8 @@ trait MessageDispatcherInterface: Send + Sync {
async fn trigger_error(&self, err: Error);
async fn metering_score(&self) -> u64;
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
}
@@ -248,6 +295,14 @@ impl<M: Message> MessageDispatcherInterface for MessageDispatcher<M> {
self._trigger_all(Err(err)).await;
}
/// Internal function to retrieve metering queue current total score,
/// after prunning expired metering information.
async fn metering_score(&self) -> u64 {
let mut lock = self.metering_queue.lock().await;
lock.clean();
lock.total()
}
/// Converts to `Any` trait. Enables the dynamic modification of static types.
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
@@ -255,22 +310,34 @@ impl<M: Message> MessageDispatcherInterface for MessageDispatcher<M> {
}
/// Generic publish/subscribe class that maintains a list of dispatchers.
///
/// Dispatchers transmit messages to subscribers and are specific to one
/// message type.
///
/// Additionally, holds a global metering limit, which is the sum of each
/// dispatcher `MeteringQueue` threshold, to drop the connection if passed.
#[derive(Default)]
pub struct MessageSubsystem {
dispatchers: Mutex<HashMap<&'static str, Arc<dyn MessageDispatcherInterface>>>,
metering_limit: Mutex<u64>,
}
impl MessageSubsystem {
/// Create a new message subsystem.
pub fn new() -> Self {
Self { dispatchers: Mutex::new(HashMap::new()) }
Self { dispatchers: Mutex::new(HashMap::new()), metering_limit: Mutex::new(0) }
}
/// Add a new dispatcher for specified [`Message`].
pub async fn add_dispatch<M: Message>(&self) {
self.dispatchers.lock().await.insert(M::NAME, Arc::new(MessageDispatcher::<M>::new()));
// First lock the dispatchers
let mut lock = self.dispatchers.lock().await;
// Update the metering limit
*self.metering_limit.lock().await += M::METERING_CONFIGURATION.threshold;
// Insert the new dispatcher
lock.insert(M::NAME, Arc::new(MessageDispatcher::<M>::new()));
}
/// Subscribes to a [`Message`]. Using the Message name, the method
@@ -305,11 +372,32 @@ impl MessageSubsystem {
command: &str,
reader: &mut smol::io::ReadHalf<Box<dyn PtStream + 'static>>,
) -> Result<()> {
let Some(dispatcher) = self.dispatchers.lock().await.get(command).cloned() else {
return Err(Error::MissingDispatcher)
};
// Iterate over dispatchers and keep track of their current
// metering score
let mut found = false;
let mut total_score = 0;
for (name, dispatcher) in self.dispatchers.lock().await.iter() {
// If dispatcher is the command one, trasmit the message
if name == &command {
dispatcher.trigger(reader).await?;
found = true;
}
dispatcher.trigger(reader).await
// Grab its total score
total_score += dispatcher.metering_score().await;
}
// Check if dispatcher was found
if !found {
return Err(Error::MissingDispatcher)
}
// Check if we are over the global metering limit
if total_score > *self.metering_limit.lock().await {
return Err(Error::MeteringLimitExceeded)
}
Ok(())
}
/// Concurrently transmits an error message across dispatchers.

227
src/net/metering.rs Normal file
View File

@@ -0,0 +1,227 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::collections::VecDeque;
use log::debug;
use crate::util::time::NanoTimestamp;
/// Struct representing metering configuration parameters.
#[derive(Clone, Debug)]
pub struct MeteringConfiguration {
/// Defines the threshold after which rate limit kicks in.
/// Set to 0 for no threshold.
///
/// If we don't use raw count as our metric, it should be calculated
/// by multiplying the median increase of the measured item with the
/// "max" number of items we want before rate limit starts.
/// For example, if we measure some item that increases our total
/// measurement by ~5 and want to rate limit after about 10, this
/// should be set as 50.
pub threshold: u64,
/// Sleep time for each unit over the threshold, in milliseconds.
///
/// This is used to calculate sleep time when ratelimit is active.
/// The computed sleep time when we are over the threshold will be:
/// sleep_time = (total - threshold) * sleep_step
pub sleep_step: u64,
/// Parameter defining the expiration of each item, for time based
/// decay, in nano seconds. Set to 0 for no expiration.
pub expiry_time: NanoTimestamp,
}
impl MeteringConfiguration {
/// Generate a new `MeteringConfiguration` for provided threshold,
/// sleep step and expiration time (seconds).
pub fn new(threshold: u64, sleep_step: u64, expiry_time: u128) -> Self {
Self { threshold, sleep_step, expiry_time: NanoTimestamp::from_secs(expiry_time) }
}
}
impl Default for MeteringConfiguration {
fn default() -> Self {
Self { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) }
}
}
/// Default `MeteringConfiguration` as a constant,
/// so it can be used in trait macros.
pub const DEFAULT_METERING_CONFIGURATION: MeteringConfiguration =
MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) };
/// Struct to keep track of some sequential metered actions and compute
/// rate limits.
///
/// The queue uses a time based decay and prunes metering information
/// after corresponding expiration time has passed.
#[derive(Debug)]
pub struct MeteringQueue {
/// Metering configuration of the queue.
config: MeteringConfiguration,
/// Ring buffer keeping track of action execution timestamp and
/// its metered value.
queue: VecDeque<(NanoTimestamp, u64)>,
}
impl MeteringQueue {
/// Generate a new `MeteringQueue` for provided `MeteringConfiguration`.
pub fn new(config: MeteringConfiguration) -> Self {
Self { config, queue: VecDeque::new() }
}
/// Prune expired metering information from the queue.
pub fn clean(&mut self) {
// Check if expiration has been set
if self.config.expiry_time.0 == 0 {
return
}
// Iterate the queue to cleanup expired elements
while let Some((ts, _)) = self.queue.front() {
// This is an edge case where system reports a future timestamp
// therefore elapsed computation fails.
let Ok(elapsed) = ts.elapsed() else {
debug!(target: "net::metering::MeteringQueue::clean()", "Timestamp [{}] is in future. Removing...", ts);
let _ = self.queue.pop_front();
continue
};
// Check if elapsed time is over the expiration limit
if elapsed < self.config.expiry_time {
break
}
// Remove element
let _ = self.queue.pop_front();
}
}
/// Add new metering value to the queue, after
/// prunning expired metering information.
/// If no thresshold has been set, the insert is
/// ignored.
pub fn push(&mut self, value: &u64) {
// Check if threshold has been set
if self.config.threshold == 0 {
return
}
// Prune expired elements
self.clean();
// Push the new value
self.queue.push_back((NanoTimestamp::current_time(), *value));
}
/// Compute the current metered values total.
pub fn total(&self) -> u64 {
let mut total = 0;
for (_, value) in &self.queue {
total += value;
}
total
}
/// Compute sleep time for current metered values total, based on
/// the metering configuration.
///
/// The sleep time increases linearly, based on configuration sleep
/// step. For example, in a raw count metering model, if we set the
/// configuration with threshold = 6 and sleep_step = 250, when
/// total = 10, returned sleep time will be 1000 ms.
///
/// Sleep times table for the above example:
///
/// | Total | Sleep Time (ms) |
/// |-------|-----------------|
/// | 0 | 0 |
/// | 4 | 0 |
/// | 6 | 0 |
/// | 7 | 250 |
/// | 8 | 500 |
/// | 9 | 750 |
/// | 10 | 1000 |
/// | 14 | 2000 |
/// | 18 | 3000 |
pub fn sleep_time(&self) -> Option<u64> {
// Check if threshold has been set
if self.config.threshold == 0 {
return None
}
// Check if we are over the threshold
let total = self.total();
if total < self.config.threshold {
return None
}
// Compute the actual sleep time
Some((total - self.config.threshold) * self.config.sleep_step)
}
}
#[test]
fn test_net_metering_queue_default() {
let mut queue = MeteringQueue::new(MeteringConfiguration::default());
for _ in 0..100 {
queue.push(&1);
assert!(queue.queue.is_empty());
assert_eq!(queue.total(), 0);
assert!(queue.sleep_time().is_none());
}
}
#[test]
fn test_net_metering_queue_raw_count() {
let threshold = 6;
let sleep_step = 250;
let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0);
let mut queue = MeteringQueue::new(metering_configuration);
for i in 1..threshold {
queue.push(&1);
assert_eq!(queue.total(), i);
assert!(queue.sleep_time().is_none());
}
for i in threshold..100 {
queue.push(&1);
assert_eq!(queue.total(), i);
assert_eq!(queue.sleep_time(), Some((i - threshold) * sleep_step));
}
}
#[test]
fn test_net_metering_queue_sleep_time() {
let metered_value_median = 5;
let threshold_items = 10;
let threshold = metered_value_median * threshold_items;
let sleep_step = 50;
let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0);
let mut queue = MeteringQueue::new(metering_configuration);
for i in 1..threshold_items {
queue.push(&metered_value_median);
assert_eq!(queue.total(), (i * metered_value_median));
assert!(queue.sleep_time().is_none());
}
for i in threshold_items..100 {
queue.push(&metered_value_median);
let expected_total = i * metered_value_median;
assert_eq!(queue.total(), expected_total);
assert_eq!(queue.sleep_time(), Some((expected_total - threshold) * sleep_step));
}
}

View File

@@ -128,3 +128,6 @@ pub use settings::{BanPolicy, Settings};
/// and then call `p2p.dnet_sub()` to start receiving events.
#[macro_use]
pub mod dnet;
/// Metering related definitions.
pub mod metering;

View File

@@ -195,23 +195,12 @@ impl P2p {
return
}
// Serialize the provided message
let message = SerializedMessage::new(message).await;
let futures = FuturesUnordered::new();
for channel in channel_list {
futures.push(channel.send_serialized(&message).map_err(|e| {
error!(
target: "net::p2p::broadcast()",
"[P2P] Broadcasting message to {} failed: {}",
channel.address(), e
);
// If the channel is stopped then it should automatically die
// and the session will remove it from p2p.
assert!(channel.is_stopped());
}));
}
let _results: Vec<_> = futures.collect().await;
// Spawn a detached task to actually send the message to the channels,
// so we don't block wiating channels that are rate limited.
self.executor.spawn(broadcast_serialized_to::<M>(message, channel_list.to_vec())).detach();
}
/// Check whether this node has connections to any peers. This method will
@@ -297,3 +286,30 @@ impl P2p {
self.hosts.get_channel(id)
}
}
/// Auxiliary function to broadcast a serialized message concurrently to all given peers.
async fn broadcast_serialized_to<M: Message>(
message: SerializedMessage,
channel_list: Vec<ChannelPtr>,
) {
let futures = FuturesUnordered::new();
for channel in &channel_list {
futures.push(
channel
.send_serialized(&message, &M::METERING_SCORE, &M::METERING_CONFIGURATION)
.map_err(|e| {
error!(
target: "net::p2p::broadcast()",
"[P2P] Broadcasting message to {} failed: {}",
channel.address(), e
);
// If the channel is stopped then it should automatically die
// and the session will remove it from p2p.
assert!(channel.is_stopped());
}),
);
}
let _results: Vec<_> = futures.collect().await;
}

View File

@@ -230,10 +230,13 @@ impl std::fmt::Debug for Transaction {
}
#[cfg(feature = "net")]
use crate::net::Message;
use crate::net::{
metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
Message,
};
#[cfg(feature = "net")]
crate::impl_p2p_message!(Transaction, "tx", 0);
crate::impl_p2p_message!(Transaction, "tx", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Calls tree bounds definitions
// TODO: increase min to 2 when fees are implement