mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 09:08:05 -05:00
perf: improve initial pooled tx exchange (#1615)
This commit is contained in:
@@ -19,7 +19,8 @@ use reth_primitives::{
|
||||
};
|
||||
use reth_rlp::Encodable;
|
||||
use reth_transaction_pool::{
|
||||
error::PoolResult, PropagateKind, PropagatedTransactions, TransactionPool,
|
||||
error::PoolResult, PoolTransaction, PropagateKind, PropagatedTransactions, TransactionPool,
|
||||
ValidPoolTransaction,
|
||||
};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
@@ -37,7 +38,7 @@ use tracing::trace;
|
||||
const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10;
|
||||
|
||||
/// Soft limit for NewPooledTransactions
|
||||
const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMT: usize = 4096;
|
||||
const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;
|
||||
|
||||
/// The future for inserting a function into the pool
|
||||
pub type PoolImportFuture = Pin<Box<dyn Future<Output = PoolResult<TxHash>> + Send + 'static>>;
|
||||
@@ -340,7 +341,7 @@ where
|
||||
self.peers.remove(&peer_id);
|
||||
}
|
||||
NetworkEvent::SessionEstablished { peer_id, messages, version, .. } => {
|
||||
// insert a new peer
|
||||
// insert a new peer into the peerset
|
||||
self.peers.insert(
|
||||
peer_id,
|
||||
Peer {
|
||||
@@ -352,31 +353,26 @@ where
|
||||
},
|
||||
);
|
||||
|
||||
// Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the
|
||||
// Send a `NewPooledTransactionHashes` to the peer with up to
|
||||
// `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the
|
||||
// pool
|
||||
if !self.network.is_syncing() {
|
||||
let mut hashes = PooledTransactionsHashesBuilder::new(version);
|
||||
let to_propogate = self.pool.pooled_transactions().into_iter().map(|tx| {
|
||||
let tx = Arc::new(tx.transaction.to_recovered_transaction().into_signed());
|
||||
PropagateTransaction::new(tx)
|
||||
});
|
||||
let peer = self.peers.get_mut(&peer_id).expect("is present; qed");
|
||||
|
||||
for tx in to_propogate.take(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMT) {
|
||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||
let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
|
||||
|
||||
if peer.transactions.insert(tx.hash()) {
|
||||
hashes.push(&tx);
|
||||
}
|
||||
for pooled_tx in self
|
||||
.pool
|
||||
.pooled_transactions()
|
||||
.into_iter()
|
||||
.take(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT)
|
||||
{
|
||||
peer.transactions.insert(*pooled_tx.hash());
|
||||
msg_builder.push_pooled(pooled_tx);
|
||||
}
|
||||
|
||||
let hashes = hashes.build();
|
||||
|
||||
self.network.send_transactions_hashes(peer_id, hashes);
|
||||
// let msg = NewPooledTransactionHashes66(self.pool.pooled_transactions());
|
||||
// self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes {
|
||||
// peer_id,
|
||||
// msg,
|
||||
// })
|
||||
let msg = msg_builder.build();
|
||||
self.network.send_transactions_hashes(peer_id, msg);
|
||||
}
|
||||
}
|
||||
// TODO Add remaining events
|
||||
@@ -536,7 +532,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// A transaction that's about to be propagated
|
||||
/// A transaction that's about to be propagated to multiple peers.
|
||||
struct PropagateTransaction {
|
||||
tx_type: u8,
|
||||
length: usize,
|
||||
@@ -565,6 +561,18 @@ enum PooledTransactionsHashesBuilder {
|
||||
// === impl PooledTransactionsHashesBuilder ===
|
||||
|
||||
impl PooledTransactionsHashesBuilder {
|
||||
/// Push a transaction from the pool to the list.
|
||||
fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
|
||||
match self {
|
||||
PooledTransactionsHashesBuilder::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
|
||||
PooledTransactionsHashesBuilder::Eth68(msg) => {
|
||||
msg.hashes.push(*pooled_tx.hash());
|
||||
msg.sizes.push(pooled_tx.encoded_length);
|
||||
msg.types.push(pooled_tx.transaction.tx_type());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&mut self, tx: &PropagateTransaction) {
|
||||
match self {
|
||||
PooledTransactionsHashesBuilder::Eth66(msg) => msg.0.push(tx.hash()),
|
||||
|
||||
@@ -86,13 +86,12 @@ pub use crate::{
|
||||
BestTransactions, OnNewBlockEvent, PoolTransaction, PropagateKind, PropagatedTransactions,
|
||||
TransactionOrigin, TransactionPool,
|
||||
},
|
||||
validate::{TransactionValidationOutcome, TransactionValidator},
|
||||
validate::{TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction},
|
||||
};
|
||||
use crate::{
|
||||
error::PoolResult,
|
||||
pool::PoolInner,
|
||||
traits::{NewTransactionEvent, PoolSize},
|
||||
validate::ValidPoolTransaction,
|
||||
};
|
||||
use reth_primitives::{TxHash, U256};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
@@ -106,6 +106,11 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
|
||||
self.transaction.hash()
|
||||
}
|
||||
|
||||
/// Returns the type identifier of the transaction
|
||||
pub fn tx_type(&self) -> u8 {
|
||||
self.transaction.tx_type()
|
||||
}
|
||||
|
||||
/// Returns the address of the sender
|
||||
pub fn sender(&self) -> Address {
|
||||
self.transaction.sender()
|
||||
|
||||
Reference in New Issue
Block a user