From 02f3427dae29d66ba1eac49265f7b450ee613982 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 26 Nov 2024 23:25:42 +0100 Subject: [PATCH] feat: introduce networkprimitives in transition fetcher (#12889) --- .../net/network/src/transactions/fetcher.rs | 43 ++++++++++--------- crates/net/network/src/transactions/mod.rs | 12 +++--- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 0833f67740..180a619fff 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -45,6 +45,7 @@ use reth_eth_wire::{ DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, PartiallyValidData, RequestTxHashes, ValidAnnouncementData, }; +use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives}; use reth_network_api::PeerRequest; use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_peers::PeerId; @@ -68,7 +69,7 @@ use validation::FilterOutcome; /// new requests on announced hashes. #[derive(Debug)] #[pin_project] -pub struct TransactionFetcher { +pub struct TransactionFetcher { /// All peers with to which a [`GetPooledTransactions`] request is inflight. pub active_peers: LruMap, /// All currently active [`GetPooledTransactions`] requests. @@ -77,7 +78,7 @@ pub struct TransactionFetcher { /// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to /// be fetched. #[pin] - pub inflight_requests: FuturesUnordered, + pub inflight_requests: FuturesUnordered>, /// Hashes that are awaiting an idle fallback peer so they can be fetched. /// /// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for @@ -93,9 +94,7 @@ pub struct TransactionFetcher { metrics: TransactionFetcherMetrics, } -// === impl TransactionFetcher === - -impl TransactionFetcher { +impl TransactionFetcher { /// Removes the peer from the active set. pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) { self.active_peers.remove(peer_id); @@ -429,7 +428,7 @@ impl TransactionFetcher { /// the request by checking the transactions seen by the peer against the buffer. pub fn on_fetch_pending_hashes( &mut self, - peers: &HashMap, + peers: &HashMap>, has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool, ) { let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info); @@ -632,7 +631,7 @@ impl TransactionFetcher { pub fn request_transactions_from_peer( &mut self, new_announced_hashes: RequestTxHashes, - peer: &PeerMetadata, + peer: &PeerMetadata, ) -> Option { let peer_id: PeerId = peer.request_tx.peer_id; let conn_eth_version = peer.version; @@ -896,7 +895,9 @@ impl TransactionFetcher { approx_capacity_get_pooled_transactions_req_eth66() } } +} +impl TransactionFetcher { /// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a /// [`FetchEvent`], which will then be streamed by /// [`TransactionsManager`](super::TransactionsManager). @@ -1044,7 +1045,7 @@ impl Stream for TransactionFetcher { } } -impl Default for TransactionFetcher { +impl Default for TransactionFetcher { fn default() -> Self { Self { active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS), @@ -1091,13 +1092,13 @@ impl TxFetchMetadata { /// Represents possible events from fetching transactions. #[derive(Debug)] -pub enum FetchEvent { +pub enum FetchEvent { /// Triggered when transactions are successfully fetched. TransactionsFetched { /// The ID of the peer from which transactions were fetched. peer_id: PeerId, /// The transactions that were fetched, if available. - transactions: PooledTransactions, + transactions: PooledTransactions, }, /// Triggered when there is an error in fetching transactions. FetchError { @@ -1115,22 +1116,22 @@ pub enum FetchEvent { /// An inflight request for [`PooledTransactions`] from a peer. #[derive(Debug)] -pub struct GetPooledTxRequest { +pub struct GetPooledTxRequest { peer_id: PeerId, /// Transaction hashes that were requested, for cleanup purposes requested_hashes: RequestTxHashes, - response: oneshot::Receiver>, + response: oneshot::Receiver>>, } /// Upon reception of a response, a [`GetPooledTxRequest`] is deconstructed to form a /// [`GetPooledTxResponse`]. #[derive(Debug)] -pub struct GetPooledTxResponse { +pub struct GetPooledTxResponse { peer_id: PeerId, /// Transaction hashes that were requested, for cleanup purposes, since peer may only return a /// subset of requested hashes. requested_hashes: RequestTxHashes, - result: Result, RecvError>, + result: Result>, RecvError>, } /// Stores the response receiver made by sending a [`GetPooledTransactions`] request to a peer's @@ -1138,24 +1139,24 @@ pub struct GetPooledTxResponse { #[must_use = "futures do nothing unless polled"] #[pin_project::pin_project] #[derive(Debug)] -pub struct GetPooledTxRequestFut { +pub struct GetPooledTxRequestFut { #[pin] - inner: Option, + inner: Option>, } -impl GetPooledTxRequestFut { +impl GetPooledTxRequestFut { #[inline] const fn new( peer_id: PeerId, requested_hashes: RequestTxHashes, - response: oneshot::Receiver>, + response: oneshot::Receiver>>, ) -> Self { Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) } } } -impl Future for GetPooledTxRequestFut { - type Output = GetPooledTxResponse; +impl Future for GetPooledTxRequestFut { + type Output = GetPooledTxResponse; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut req = self.as_mut().project().inner.take().expect("polled after completion"); @@ -1372,7 +1373,7 @@ mod test { // RIG TEST - let tx_fetcher = &mut TransactionFetcher::default(); + let tx_fetcher = &mut TransactionFetcher::::default(); let eth68_hashes = [ B256::from_slice(&[1; 32]), diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index ff76a6d292..d533aee102 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -212,7 +212,7 @@ pub struct TransactionsManager>>, /// Transaction fetcher to handle inflight and missing transaction requests. - transaction_fetcher: TransactionFetcher, + transaction_fetcher: TransactionFetcher, /// All currently pending transactions grouped by peers. /// /// This way we can track incoming transactions and prevent multiple pool imports for the same @@ -235,7 +235,7 @@ pub struct TransactionsManager, /// All the connected peers. - peers: HashMap, + peers: HashMap>, /// Send half for the command channel. /// /// This is kept so that a new [`TransactionsHandle`] can be created at any time. @@ -1731,23 +1731,23 @@ impl TransactionSource { /// Tracks a single peer in the context of [`TransactionsManager`]. #[derive(Debug)] -pub struct PeerMetadata { +pub struct PeerMetadata { /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in /// the sense that transactions are preemptively marked as seen by peer when they are sent to /// the peer. seen_transactions: LruCache, /// A communication channel directly to the peer's session task. - request_tx: PeerRequestSender, + request_tx: PeerRequestSender>, /// negotiated version of the session. version: EthVersion, /// The peer's client version. client_version: Arc, } -impl PeerMetadata { +impl PeerMetadata { /// Returns a new instance of [`PeerMetadata`]. fn new( - request_tx: PeerRequestSender, + request_tx: PeerRequestSender>, version: EthVersion, client_version: Arc, max_transactions_seen_by_peer: u32,