diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 5e261eca7f..7a42a73a49 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -1,3 +1,30 @@ +//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching +//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is +//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash +//! is already seen in a previous announcement. The hashes that remain from an announcement are +//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes +//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other +//! hand, space remains, hashes that the peer has previously announced are taken out of buffered +//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the +//! peer's session, this marks the peer as active with respect to +//! `MAX_CONCURRENT_TX_REQUESTS_PER_PEER`. +//! +//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes` +//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is +//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer, +//! filling it from the buffered hashes. It does so until there are no more idle peers or until +//! the hashes buffer is empty. +//! +//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are +//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request +//! resolves with partial success, that is some of the requested hashes are not in the response, +//! these are then buffered. +//! +//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip +//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long +//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large +//! enough to buffer many hashes during network failure, to allow for recovery. + use crate::{ cache::{LruCache, LruMap}, message::PeerRequest, diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 5980182d6e..b65eb58c03 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1,31 +1,4 @@ //! Transactions management for the p2p network. -//! -//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching -//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is -//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash -//! is already seen in a previous announcement. The hashes that remain from an announcement are -//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes -//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other -//! hand, space remains, hashes that the peer has previously announced are taken out of buffered -//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the -//! peer's session, this marks the peer as active with respect to -//! `MAX_CONCURRENT_TX_REQUESTS_PER_PEER`. -//! -//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes` -//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is -//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer, -//! filling it from the buffered hashes. It does so until there are no more idle peers or until -//! the hashes buffer is empty. -//! -//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are -//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request -//! resolves with partial success, that is some of the requested hashes are not in the response, -//! these are then buffered. -//! -//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip -//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long -//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large -//! enough to buffer many hashes during network failure, to allow for recovery. use crate::{ cache::LruCache, @@ -1180,6 +1153,9 @@ struct TxManagerPollDurations { /// [`crate::NetworkManager`] for more context on the design pattern. /// /// This should be spawned or used as part of `tokio::select!`. +// +// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and +// `NetworkConfig::start_network`(reth_network::NetworkConfig) impl Future for TransactionsManager where Pool: TransactionPool + Unpin + 'static, @@ -1202,7 +1178,7 @@ where let acc = &mut poll_durations.acc_network_events; duration_metered_exec!( { - // advance network/peer related events + // Advance network/peer related events (update peers map). if let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) { this.on_network_event(event); some_ready = true; @@ -1214,6 +1190,10 @@ where let acc = &mut poll_durations.acc_pending_fetch; duration_metered_exec!( { + // Tries to drain hashes pending fetch cache if the tx manager currently has + // capacity for this (fetch txns). + // + // Sends at most one request. if this.has_capacity_for_fetching_pending_hashes() { // try drain transaction hashes pending fetch let info = &this.pending_pool_imports_info; @@ -1238,7 +1218,7 @@ where let acc = &mut poll_durations.acc_cmds; duration_metered_exec!( { - // advance commands + // Advance commands (propagate/fetch/serve txns). if let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) { this.on_command(cmd); some_ready = true; @@ -1250,7 +1230,20 @@ where let acc = &mut poll_durations.acc_tx_events; duration_metered_exec!( { - // advance incoming transaction events + // Advance incoming transaction events (stream new txns/announcements from + // network manager and queue for import to pool/fetch txns). + // + // This will potentially remove hashes from hashes pending fetch, it the event + // is an announcement (if same hashes are announced that didn't fit into a + // previous request). + // + // The smallest decodable transaction is an empty legacy transaction, 10 bytes + // (128 KiB / 10 bytes > 13k transactions). + // + // If this is an event with `Transactions` message, since transactions aren't + // validated until they are inserted into the pool, this can potentially queue + // >13k transactions for insertion to pool. More if the message size is bigger + // than the soft limit on a `Transactions` broadcast message, which is 128 KiB. if let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) { this.on_network_tx_event(event); some_ready = true; @@ -1264,7 +1257,16 @@ where { this.update_fetch_metrics(); - // advance fetching transaction events + // Advance fetching transaction events (flush transaction fetcher and queue for + // import to pool). + // + // The smallest decodable transaction is an empty legacy transaction, 10 bytes + // (2 MiB / 10 bytes > 200k transactions). + // + // Since transactions aren't validated until they are inserted into the pool, + // this can potentially queue >200k transactions for insertion to pool. More + // if the message size is bigger than the soft limit on a `PooledTransactions` + // response which is 2 MiB. if let Poll::Ready(Some(fetch_event)) = this.transaction_fetcher.poll_next_unpin(cx) { @@ -1295,7 +1297,20 @@ where let acc = &mut poll_durations.acc_pending_imports; duration_metered_exec!( { - // Advance all imports + // Advance pool imports (flush txns to pool). + // + // Note, this is done in batches. A batch is filled from one `Transactions` + // broadcast messages or one `PooledTransactions` response at a time. The + // minimum batch size is 1 transaction (and might often be the case with blob + // transactions). + // + // The smallest decodable transaction is an empty legacy transaction, 10 bytes + // (2 MiB / 10 bytes > 200k transactions). + // + // Since transactions aren't validated until they are inserted into the pool, + // this can potentially validate >200k transactions. More if the message size + // is bigger than the soft limit on a `PooledTransactions` response which is + // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB). if let Poll::Ready(Some(batch_import_res)) = this.pool_imports.poll_next_unpin(cx) { @@ -1329,9 +1344,11 @@ where let acc = &mut poll_durations.acc_imported_txns; duration_metered_exec!( { - // drain new __pending__ transactions handle and propagate transactions. - // we drain this to batch the transactions in a single message. - // we don't expect this buffer to be large, since only pending transactions are + // Drain new __pending__ transactions handle and propagate transactions. + // + // We drain this to batch the transactions in a single message. + // + // We don't expect this buffer to be large, since only pending transactions are // emitted here. let mut new_txs = Vec::new(); while let Poll::Ready(Some(hash)) =