mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 09:38:24 -05:00
Fix bug, limit for broadcast message (#6325)
This commit is contained in:
@@ -17,7 +17,8 @@ use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}
|
||||
use tracing::{debug, trace};
|
||||
|
||||
use super::{
|
||||
AnnouncementFilter, Peer, PooledTransactions, POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE,
|
||||
AnnouncementFilter, Peer, PooledTransactions,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE,
|
||||
};
|
||||
|
||||
/// Maximum concurrent [`GetPooledTxRequest`]s to allow per peer.
|
||||
@@ -179,7 +180,7 @@ impl TransactionFetcher {
|
||||
if let Some(size) = self.eth68_meta.peek(&hash) {
|
||||
let next_acc_size = *acc_size_response + size;
|
||||
|
||||
if next_acc_size <= POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE {
|
||||
if next_acc_size <= SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE {
|
||||
// only update accumulated size of tx response if tx will fit in without exceeding
|
||||
// soft limit
|
||||
*acc_size_response = next_acc_size;
|
||||
@@ -209,7 +210,7 @@ impl TransactionFetcher {
|
||||
) -> Vec<TxHash> {
|
||||
if let Some(hash) = hashes.first() {
|
||||
if let Some(size) = self.eth68_meta.get(hash) {
|
||||
if *size >= POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE {
|
||||
if *size >= SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE {
|
||||
return hashes.split_off(1)
|
||||
}
|
||||
}
|
||||
@@ -227,7 +228,7 @@ impl TransactionFetcher {
|
||||
size=self.eth68_meta.peek(&hash).expect("should find size in `eth68-meta`"),
|
||||
acc_size_response=acc_size_response,
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE=
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE,
|
||||
"no space for hash in `GetPooledTransactions` request to peer"
|
||||
);
|
||||
|
||||
@@ -483,9 +484,12 @@ impl TransactionFetcher {
|
||||
/// limit. It does so by taking buffered eth68 hashes for which peer is listed as fallback
|
||||
/// peer. A mutable reference to a list of hashes to request is passed as parameter.
|
||||
///
|
||||
/// If a single transaction exceeds the soft limit, it's fetched in its own request. Otherwise
|
||||
/// the following applies.
|
||||
///
|
||||
/// Loops through buffered hashes and does:
|
||||
///
|
||||
/// 1. Check acc size against limit, if so stop looping.
|
||||
/// 1. Check if acc size exceeds limit or if hashes count exceeds limit, if so stop looping.
|
||||
/// 2. Check if this buffered hash is an eth68 hash, else skip to next iteration.
|
||||
/// 3. Check if hash can be included with respect to size metadata and acc size copy.
|
||||
/// 4. Check if peer is fallback peer for hash and remove, else skip to next iteration.
|
||||
@@ -497,7 +501,7 @@ impl TransactionFetcher {
|
||||
peer_id: PeerId,
|
||||
acc_size_response: &mut usize,
|
||||
) {
|
||||
if *acc_size_response >= POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE / 2 {
|
||||
if *acc_size_response >= SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE / 2 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -530,12 +534,12 @@ impl TransactionFetcher {
|
||||
let mut next_acc_size = *acc_size_response;
|
||||
|
||||
// 1. Check acc size against limit, if so stop looping.
|
||||
if next_acc_size >= 2 * POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE / 3 {
|
||||
if next_acc_size >= 2 * SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE / 3 {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
acc_size_eth68_response=acc_size_response, // no change acc size
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE=
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE,
|
||||
"request to peer full"
|
||||
);
|
||||
|
||||
@@ -578,7 +582,7 @@ impl TransactionFetcher {
|
||||
hash=%hash,
|
||||
acc_size_eth68_response=acc_size_response,
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE=
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE,
|
||||
"found buffered hash for request to peer"
|
||||
);
|
||||
}
|
||||
@@ -640,7 +644,7 @@ impl TransactionFetcher {
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%hash,
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE=
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE,
|
||||
"found buffered hash for request to peer"
|
||||
);
|
||||
}
|
||||
@@ -830,14 +834,15 @@ mod test {
|
||||
B256::from_slice(&[6; 32]),
|
||||
];
|
||||
let eth68_hashes_sizes = [
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE - 4,
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE, // this one will not fit
|
||||
2, // this one will fit
|
||||
3, // but now this one won't
|
||||
2, /* this one will, no more txns
|
||||
* will
|
||||
* fit
|
||||
* after this */
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE - 4,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE, // this one will not fit
|
||||
2, // this one will fit
|
||||
3, // but now this one won't
|
||||
2, /* this one will, no more
|
||||
* txns
|
||||
* will
|
||||
* fit
|
||||
* after this */
|
||||
1,
|
||||
];
|
||||
|
||||
|
||||
@@ -75,13 +75,16 @@ pub use validation::*;
|
||||
const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10;
|
||||
|
||||
/// Soft limit for NewPooledTransactions
|
||||
const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;
|
||||
const SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_MEMPOOL_PACKET: usize = 4096;
|
||||
|
||||
/// Soft limit for the response size of a GetPooledTransactions message (2MB) in bytes. Standard
|
||||
/// maximum response size. See specs
|
||||
/// Soft limit for the message of full transactions in bytes.
|
||||
const SOFT_LIMIT_BYTE_SIZE_FULL_TRANSACTIONS_MEMPOOL_MESSAGE: usize = 128 * 1024;
|
||||
|
||||
/// Soft limit for the response size of a [`GetPooledTransactions`] message (128 KiB) in bytes.
|
||||
/// Standard maximum response size is 2 MiB. See specs
|
||||
///
|
||||
/// <https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages>.
|
||||
const POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE: usize = 2 * 1024 * 1024;
|
||||
const SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE: usize = 128 * 1024;
|
||||
|
||||
/// The future for inserting a function into the pool
|
||||
pub type PoolImportFuture =
|
||||
@@ -301,7 +304,7 @@ where
|
||||
let transactions = self.pool.get_pooled_transaction_elements(
|
||||
request.0,
|
||||
GetPooledTransactionLimit::ResponseSizeSoftLimit(
|
||||
POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE,
|
||||
),
|
||||
);
|
||||
|
||||
@@ -399,7 +402,9 @@ where
|
||||
if peer_idx > max_num_full || full_transactions.is_empty() {
|
||||
// enforce tx soft limit per message for the (unlikely) event the number of
|
||||
// hashes exceeds it
|
||||
new_pooled_hashes.truncate(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT);
|
||||
new_pooled_hashes.truncate(
|
||||
SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_MEMPOOL_PACKET,
|
||||
);
|
||||
|
||||
for hash in new_pooled_hashes.iter_hashes().copied() {
|
||||
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
|
||||
@@ -922,8 +927,9 @@ where
|
||||
|
||||
let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
|
||||
|
||||
let pooled_txs =
|
||||
self.pool.pooled_transactions_max(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT);
|
||||
let pooled_txs = self.pool.pooled_transactions_max(
|
||||
SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_MEMPOOL_PACKET,
|
||||
);
|
||||
if pooled_txs.is_empty() {
|
||||
// do not send a message if there are no transactions in the pool
|
||||
return
|
||||
@@ -1215,7 +1221,7 @@ impl PropagateTransaction {
|
||||
}
|
||||
|
||||
/// Helper type for constructing the full transaction message that enforces the
|
||||
/// [`POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE`].
|
||||
/// [`SOFT_LIMIT_BYTE_SIZE_FULL_TRANSACTIONS_MEMPOOL_MESSAGE`].
|
||||
#[derive(Default)]
|
||||
struct FullTransactionsBuilder {
|
||||
total_size: usize,
|
||||
@@ -1225,10 +1231,15 @@ struct FullTransactionsBuilder {
|
||||
// === impl FullTransactionsBuilder ===
|
||||
|
||||
impl FullTransactionsBuilder {
|
||||
/// Append a transaction to the list if it doesn't exceed the maximum target size.
|
||||
/// Append a transaction to the list if the total message bytes size doesn't exceed the soft
|
||||
/// maximum target byte size. The limit is soft, meaning if one single transaction goes over
|
||||
/// the limit, it will be broadcasted in its own [`Transactions`] message. The same pattern is
|
||||
/// followed in filling a [`GetPooledTransactions`] request in
|
||||
/// [`TransactionFetcher::fill_eth68_request_for_peer`].
|
||||
fn push(&mut self, transaction: &PropagateTransaction) {
|
||||
let new_size = self.total_size + transaction.size;
|
||||
if new_size > POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE {
|
||||
if new_size > SOFT_LIMIT_BYTE_SIZE_FULL_TRANSACTIONS_MEMPOOL_MESSAGE && self.total_size > 0
|
||||
{
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user