mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
Introduce async budget in transaction manager (#6295)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
This commit is contained in:
@@ -738,8 +738,8 @@ where
|
||||
|
||||
// fill the request with other buffered hashes that have been announced by the peer
|
||||
let Some(peer) = self.peers.get(&peer_id) else { return };
|
||||
|
||||
let Some(hash) = hashes.first() else { return };
|
||||
|
||||
let mut eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied();
|
||||
if let Some(ref mut size) = eth68_size {
|
||||
self.transaction_fetcher.fill_eth68_request_for_peer(&mut hashes, peer_id, size);
|
||||
@@ -1070,7 +1070,8 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// An endless future.
|
||||
/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
|
||||
/// [`crate::NetworkManager`] for more context on the design pattern.
|
||||
///
|
||||
/// This should be spawned or used as part of `tokio::select!`.
|
||||
impl<Pool> Future for TransactionsManager<Pool>
|
||||
@@ -1082,85 +1083,113 @@ where
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
// drain network/peer related events
|
||||
while let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) {
|
||||
this.on_network_event(event);
|
||||
}
|
||||
// If the budget is exhausted we manually yield back control to tokio. See
|
||||
// `NetworkManager` for more context on the design pattern.
|
||||
let mut budget = 1024;
|
||||
|
||||
// drain commands
|
||||
while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
|
||||
this.on_command(cmd);
|
||||
}
|
||||
loop {
|
||||
let mut some_ready = false;
|
||||
|
||||
// drain incoming transaction events
|
||||
while let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) {
|
||||
this.on_network_tx_event(event);
|
||||
}
|
||||
|
||||
this.update_request_metrics();
|
||||
|
||||
// drain fetching transaction events
|
||||
while let Poll::Ready(Some(fetch_event)) = this.transaction_fetcher.poll_next_unpin(cx) {
|
||||
match fetch_event {
|
||||
FetchEvent::TransactionsFetched { peer_id, transactions } => {
|
||||
this.import_transactions(peer_id, transactions, TransactionSource::Response);
|
||||
}
|
||||
FetchEvent::FetchError { peer_id, error } => {
|
||||
trace!(target: "net::tx", ?peer_id, ?error, "requesting transactions from peer failed");
|
||||
this.on_request_error(peer_id, error);
|
||||
}
|
||||
// drain network/peer related events
|
||||
if let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) {
|
||||
this.on_network_event(event);
|
||||
some_ready = true;
|
||||
}
|
||||
}
|
||||
|
||||
// try drain buffered transactions
|
||||
this.request_buffered_hashes();
|
||||
// drain commands
|
||||
if let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
|
||||
this.on_command(cmd);
|
||||
some_ready = true;
|
||||
}
|
||||
|
||||
this.update_request_metrics();
|
||||
// drain incoming transaction events
|
||||
if let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) {
|
||||
this.on_network_tx_event(event);
|
||||
some_ready = true;
|
||||
}
|
||||
|
||||
// Advance all imports
|
||||
while let Poll::Ready(Some(import_res)) = this.pool_imports.poll_next_unpin(cx) {
|
||||
let import_res = match import_res {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
debug!(target: "net::tx", ?err, "bad pool transaction batch import");
|
||||
continue
|
||||
this.update_request_metrics();
|
||||
|
||||
// drain fetching transaction events
|
||||
if let Poll::Ready(Some(fetch_event)) = this.transaction_fetcher.poll_next_unpin(cx) {
|
||||
match fetch_event {
|
||||
FetchEvent::TransactionsFetched { peer_id, transactions } => {
|
||||
this.import_transactions(
|
||||
peer_id,
|
||||
transactions,
|
||||
TransactionSource::Response,
|
||||
);
|
||||
}
|
||||
FetchEvent::FetchError { peer_id, error } => {
|
||||
trace!(target: "net::tx", ?peer_id, ?error, "requesting transactions from peer failed");
|
||||
this.on_request_error(peer_id, error);
|
||||
}
|
||||
}
|
||||
};
|
||||
some_ready = true;
|
||||
}
|
||||
|
||||
for res in import_res {
|
||||
match res {
|
||||
Ok(hash) => {
|
||||
this.on_good_import(hash);
|
||||
// try drain buffered transactions
|
||||
this.request_buffered_hashes();
|
||||
this.update_request_metrics();
|
||||
|
||||
// Advance all imports
|
||||
if let Poll::Ready(Some(batch_import_res)) = this.pool_imports.poll_next_unpin(cx) {
|
||||
match batch_import_res {
|
||||
Ok(single_import_results) => {
|
||||
for res in single_import_results {
|
||||
match res {
|
||||
Ok(hash) => {
|
||||
this.on_good_import(hash);
|
||||
}
|
||||
Err(err) => {
|
||||
// if we're _currently_ syncing and the transaction is bad we
|
||||
// ignore it, otherwise we penalize the peer that sent the bad
|
||||
// transaction with the assumption that the peer should have
|
||||
// known that this transaction is bad. (e.g. consensus
|
||||
// rules)
|
||||
if err.is_bad_transaction() && !this.network.is_syncing() {
|
||||
debug!(target: "net::tx", ?err, "bad pool transaction import");
|
||||
this.on_bad_import(err.hash);
|
||||
continue
|
||||
}
|
||||
this.on_good_import(err.hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// if we're _currently_ syncing and the transaction is bad we ignore it,
|
||||
// otherwise we penalize the peer that sent the bad
|
||||
// transaction with the assumption that the peer should have
|
||||
// known that this transaction is bad. (e.g. consensus
|
||||
// rules)
|
||||
if err.is_bad_transaction() && !this.network.is_syncing() {
|
||||
debug!(target: "net::tx", ?err, "bad pool transaction import");
|
||||
this.on_bad_import(err.hash);
|
||||
continue
|
||||
}
|
||||
this.on_good_import(err.hash);
|
||||
debug!(target: "net::tx", ?err, "bad pool transaction batch import");
|
||||
}
|
||||
}
|
||||
|
||||
some_ready = true;
|
||||
}
|
||||
|
||||
// handle and propagate new transactions.
|
||||
//
|
||||
// higher priority! stream is drained
|
||||
//
|
||||
let mut new_txs = Vec::new();
|
||||
while let Poll::Ready(Some(hash)) = this.pending_transactions.poll_next_unpin(cx) {
|
||||
new_txs.push(hash);
|
||||
some_ready = true;
|
||||
}
|
||||
if !new_txs.is_empty() {
|
||||
this.on_new_transactions(new_txs);
|
||||
}
|
||||
|
||||
// all channels are fully drained and import futures pending
|
||||
if !some_ready {
|
||||
return Poll::Pending
|
||||
}
|
||||
|
||||
budget -= 1;
|
||||
if budget <= 0 {
|
||||
// Make sure we're woken up again
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
// handle and propagate new transactions
|
||||
let mut new_txs = Vec::new();
|
||||
while let Poll::Ready(Some(hash)) = this.pending_transactions.poll_next_unpin(cx) {
|
||||
new_txs.push(hash);
|
||||
}
|
||||
if !new_txs.is_empty() {
|
||||
this.on_new_transactions(new_txs);
|
||||
}
|
||||
|
||||
// all channels are fully drained and import futures pending
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user