mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 01:28:21 -05:00
Make limit on tx packet size soft in tx fetcher (#6201)
This commit is contained in:
@@ -16,7 +16,7 @@ use std::{
|
||||
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
use super::{Peer, PooledTransactions, MAX_FULL_TRANSACTIONS_PACKET_SIZE};
|
||||
use super::{Peer, PooledTransactions, FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT};
|
||||
|
||||
/// Maximum concurrent [`GetPooledTxRequest`]s to allow per peer.
|
||||
pub(super) const MAX_CONCURRENT_TX_REQUESTS_PER_PEER: u8 = 1;
|
||||
@@ -175,8 +175,9 @@ impl TransactionFetcher {
|
||||
if let Some(size) = self.eth68_meta.peek(&hash) {
|
||||
let next_acc_size = *acc_size_response + size;
|
||||
|
||||
if next_acc_size <= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
|
||||
// only update accumulated size of tx response if tx will fit in
|
||||
if next_acc_size <= FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
|
||||
// only update accumulated size of tx response if tx will fit in without exceeding
|
||||
// soft limit
|
||||
*acc_size_response = next_acc_size;
|
||||
return true
|
||||
}
|
||||
@@ -202,6 +203,14 @@ impl TransactionFetcher {
|
||||
hashes: &mut Vec<TxHash>,
|
||||
peer_id: PeerId,
|
||||
) -> Vec<TxHash> {
|
||||
if let Some(hash) = hashes.first() {
|
||||
if let Some(size) = self.eth68_meta.get(hash) {
|
||||
if *size >= FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
|
||||
return hashes.split_off(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut acc_size_response = 0;
|
||||
let mut surplus_hashes = vec![];
|
||||
|
||||
@@ -213,7 +222,7 @@ impl TransactionFetcher {
|
||||
hash=%hash,
|
||||
size=self.eth68_meta.peek(&hash).expect("should find size in `eth68-meta`"),
|
||||
acc_size_response=acc_size_response,
|
||||
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
|
||||
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT=FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT,
|
||||
"no space for hash in `GetPooledTransactions` request to peer"
|
||||
);
|
||||
|
||||
@@ -224,7 +233,7 @@ impl TransactionFetcher {
|
||||
|
||||
// all hashes included in request and there is still space
|
||||
// todo: compare free space with min tx size
|
||||
if acc_size_response < MAX_FULL_TRANSACTIONS_PACKET_SIZE {
|
||||
if acc_size_response < FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
|
||||
self.fill_eth68_request_for_peer(hashes, peer_id, &mut acc_size_response);
|
||||
}
|
||||
|
||||
@@ -489,6 +498,10 @@ impl TransactionFetcher {
|
||||
peer_id: PeerId,
|
||||
acc_size_response: &mut usize,
|
||||
) {
|
||||
if *acc_size_response >= FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
|
||||
return
|
||||
}
|
||||
|
||||
debug_assert!(
|
||||
{
|
||||
let mut acc_size = 0;
|
||||
@@ -509,11 +522,11 @@ impl TransactionFetcher {
|
||||
let mut next_acc_size = *acc_size_response;
|
||||
|
||||
// 1. Check acc size against limit, if so stop looping.
|
||||
if next_acc_size >= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
|
||||
if next_acc_size >= FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
acc_size_eth68_response=acc_size_response, // no change acc size
|
||||
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
|
||||
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT=FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT,
|
||||
"request to peer full"
|
||||
);
|
||||
|
||||
@@ -555,7 +568,7 @@ impl TransactionFetcher {
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%hash,
|
||||
acc_size_eth68_response=acc_size_response,
|
||||
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
|
||||
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT=FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT,
|
||||
"found buffered hash for request to peer"
|
||||
);
|
||||
}
|
||||
@@ -616,7 +629,7 @@ impl TransactionFetcher {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%hash,
|
||||
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
|
||||
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT=FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT,
|
||||
"found buffered hash for request to peer"
|
||||
);
|
||||
}
|
||||
@@ -805,12 +818,12 @@ mod test {
|
||||
B256::from_slice(&[6; 32]),
|
||||
];
|
||||
let eth68_hashes_sizes = [
|
||||
MAX_FULL_TRANSACTIONS_PACKET_SIZE - 4,
|
||||
MAX_FULL_TRANSACTIONS_PACKET_SIZE, // this one will not fit
|
||||
2, // this one will fit
|
||||
3, // but now this one won't
|
||||
2, /* this one will, no more txns will fit
|
||||
* after this */
|
||||
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT - 4,
|
||||
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT, // this one will not fit
|
||||
2, // this one will fit
|
||||
3, // but now this one won't
|
||||
2, /* this one will, no more txns will fit
|
||||
* after this */
|
||||
1,
|
||||
];
|
||||
|
||||
|
||||
@@ -74,8 +74,8 @@ const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10;
|
||||
/// Soft limit for NewPooledTransactions
|
||||
const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;
|
||||
|
||||
/// The target size for the message of full transactions in bytes.
|
||||
const MAX_FULL_TRANSACTIONS_PACKET_SIZE: usize = 100 * 1024;
|
||||
/// Soft limit for the message of full transactions in bytes.
|
||||
const FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT: usize = 100 * 1024;
|
||||
|
||||
/// Softlimit for the response size of a GetPooledTransactions message (2MB)
|
||||
const GET_POOLED_TRANSACTION_SOFT_LIMIT_SIZE: GetPooledTransactionLimit =
|
||||
@@ -1112,7 +1112,7 @@ impl PropagateTransaction {
|
||||
}
|
||||
|
||||
/// Helper type for constructing the full transaction message that enforces the
|
||||
/// `MAX_FULL_TRANSACTIONS_PACKET_SIZE`
|
||||
/// `FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT`
|
||||
#[derive(Default)]
|
||||
struct FullTransactionsBuilder {
|
||||
total_size: usize,
|
||||
@@ -1122,10 +1122,10 @@ struct FullTransactionsBuilder {
|
||||
// === impl FullTransactionsBuilder ===
|
||||
|
||||
impl FullTransactionsBuilder {
|
||||
/// Append a transaction to the list if it doesn't exceed the maximum size.
|
||||
/// Append a transaction to the list if it doesn't exceed the maximum target size.
|
||||
fn push(&mut self, transaction: &PropagateTransaction) {
|
||||
let new_size = self.total_size + transaction.size;
|
||||
if new_size > MAX_FULL_TRANSACTIONS_PACKET_SIZE {
|
||||
if new_size > FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1789,8 +1789,10 @@ mod tests {
|
||||
let peer_id = PeerId::new([1; 64]);
|
||||
let eth_version = EthVersion::Eth68;
|
||||
let unseen_eth68_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
|
||||
let unseen_eth68_hashes_sizes =
|
||||
[MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2, MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2 - 4];
|
||||
let unseen_eth68_hashes_sizes = [
|
||||
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT / 2,
|
||||
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT / 2 - 4,
|
||||
];
|
||||
// hashes and sizes to buffer in reverse order so that seen_eth68_hashes[0] and
|
||||
// seen_eth68_hashes_sizes[0] are lru
|
||||
let seen_eth68_hashes =
|
||||
@@ -1826,7 +1828,7 @@ mod tests {
|
||||
let mut backups = default_cache();
|
||||
backups.insert(peer_id_other);
|
||||
tx_fetcher.unknown_hashes.insert(hash_other, (0, backups));
|
||||
tx_fetcher.eth68_meta.insert(hash_other, MAX_FULL_TRANSACTIONS_PACKET_SIZE - 2); // a big tx
|
||||
tx_fetcher.eth68_meta.insert(hash_other, FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT - 2); // a big tx
|
||||
tx_fetcher.buffered_hashes.insert(hash_other);
|
||||
|
||||
let (peer, mut to_mock_session_rx) = new_mock_session(peer_id, eth_version);
|
||||
|
||||
Reference in New Issue
Block a user