diff --git a/Cargo.lock b/Cargo.lock index aa6f45280d..be475917ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,9 +88,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42cd52102d3df161c77a887b608d7a4897d7cc112886a9537b738a887a03aaff" +checksum = "d713b3834d76b85304d4d525563c1276e2e30dc97cc67bfb4585a4a29fc2c89f" dependencies = [ "cfg-if", "getrandom 0.2.12", @@ -260,7 +260,7 @@ checksum = "1a047897373be4bbb0224c1afdabca92648dc57a9c9ef6e7b0be3aff7a859c83" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -321,7 +321,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", "syn-solidity", "tiny-keccak", ] @@ -390,9 +390,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.11" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" +checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540" dependencies = [ "anstyle", "anstyle-parse", @@ -438,9 +438,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" [[package]] name = "aquamarine" @@ -453,7 +453,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -670,7 +670,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -681,7 +681,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -724,7 +724,7 @@ checksum = "823b8bb275161044e2ac7a25879cb3e2480cb403e3943022c7c769c599b756aa" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -735,11 +735,11 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backon" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" +checksum = "c79c8ef183b8b663e8cb19cf92fb7d98c56739977bd47eae2de2717bd5de2c2c" dependencies = [ - "fastrand 1.9.0", + "fastrand 2.0.1", "futures-core", "pin-project", "tokio", @@ -849,7 +849,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -869,7 +869,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -1073,7 +1073,7 @@ checksum = "005fa0c5bd20805466dda55eb34cd709bb31a2592bb26927b47714eeed6914d8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", "synstructure", ] @@ -1161,9 +1161,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.15.0" +version = "3.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d32a994c2b3ca201d9b263612a374263f05e7adde37c4707f693dcd375076d1f" +checksum = "a3b1be7772ee4501dba05acbe66bb1e8760f6a6c474a36035631638e4415f130" [[package]] name = "byte-slice-cast" @@ -1232,7 +1232,7 @@ checksum = "2d886547e41f740c616ae73108f6eb70afe6d940c7bc697cb30f13daec073037" dependencies = [ "camino", "cargo-platform", - "semver 1.0.21", + "semver 1.0.22", "serde", "serde_json", "thiserror", @@ -1252,11 +1252,10 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +checksum = "7f9fa1897e4325be0d68d48df6aa1a71ac2ed4d27723887e7754192705350730" dependencies = [ - "jobserver", "libc", ] @@ -1378,7 +1377,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -1412,7 +1411,7 @@ dependencies = [ "quote", "serde", "similar-asserts", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -1813,7 +1812,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -1920,7 +1919,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -1953,7 +1952,7 @@ checksum = "c5a91391accf613803c2a9bf9abccdbaa07c54b4244a5b64883f9c3c137c86be" dependencies = [ "darling_core 0.20.6", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -2039,7 +2038,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -2196,7 +2195,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -2398,7 +2397,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -2411,7 +2410,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -2422,7 +2421,7 @@ checksum = "6fd000fd6988e73bbe993ea3db9b1aa64906ab88766d654973924340c8cddb42" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -2570,7 +2569,7 @@ dependencies = [ "regex", "serde", "serde_json", - "syn 2.0.49", + "syn 2.0.50", "toml", "walkdir", ] @@ -2588,7 +2587,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -2614,7 +2613,7 @@ dependencies = [ "serde", "serde_json", "strum 0.25.0", - "syn 2.0.49", + "syn 2.0.50", "tempfile", "thiserror", "tiny-keccak", @@ -2953,7 +2952,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -3836,15 +3835,6 @@ dependencies = [ "libc", ] -[[package]] -name = "jobserver" -version = "0.1.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6" -dependencies = [ - "libc", -] - [[package]] name = "js-sys" version = "0.3.68" @@ -4314,7 +4304,7 @@ checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -4644,7 +4634,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -4656,7 +4646,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -4980,7 +4970,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -5009,7 +4999,7 @@ checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -5204,7 +5194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5" dependencies = [ "proc-macro2", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -6225,7 +6215,7 @@ dependencies = [ "quote", "regex", "serial_test", - "syn 2.0.49", + "syn 2.0.50", "trybuild", ] @@ -7278,7 +7268,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver 1.0.21", + "semver 1.0.22", ] [[package]] @@ -7357,9 +7347,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "ryu-js" @@ -7543,9 +7533,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" +checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" dependencies = [ "serde", ] @@ -7573,9 +7563,9 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" [[package]] name = "serde" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] @@ -7591,20 +7581,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] name = "serde_json" -version = "1.0.113" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "indexmap 2.2.3", "itoa", @@ -7671,7 +7661,7 @@ dependencies = [ "darling 0.20.6", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -7696,7 +7686,7 @@ checksum = "b93fb4adc70021ac1b47f7d45e8cc4169baaa7ea58483bc5b721d19a26202212" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -8025,7 +8015,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -8038,7 +8028,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -8115,9 +8105,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.49" +version = "2.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915aea9e586f80826ee59f8453c1101f9d1c4b3964cd2460185ee8e299ada496" +checksum = "74f1bdc9872430ce9b75da68329d1c1746faf50ffac5f19e02b71e37ff881ffb" dependencies = [ "proc-macro2", "quote", @@ -8133,7 +8123,7 @@ dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -8150,7 +8140,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -8242,7 +8232,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -8281,14 +8271,14 @@ checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] name = "thread_local" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ "cfg-if", "once_cell", @@ -8408,7 +8398,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -8525,7 +8515,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.1", + "winnow 0.6.2", ] [[package]] @@ -8635,7 +8625,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -8920,9 +8910,9 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" dependencies = [ "tinyvec", ] @@ -9115,7 +9105,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", "wasm-bindgen-shared", ] @@ -9149,7 +9139,7 @@ checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -9388,9 +9378,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d90f4e0f530c4c69f62b80d839e9ef3855edc9cba471a160c4d692deed62b401" +checksum = "7a4191c47f15cc3ec71fcb4913cb83d58def65dd3787610213c649283b5ce178" dependencies = [ "memchr", ] @@ -9489,7 +9479,7 @@ checksum = "9e6936f0cce458098a201c245a11bef556c6a0181129c7034d10d76d1ec3a2b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", "synstructure", ] @@ -9510,7 +9500,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -9530,7 +9520,7 @@ checksum = "e6a647510471d372f2e6c2e6b7219e44d8c574d24fdc11c610a61455782f18c3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", "synstructure", ] @@ -9551,7 +9541,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] @@ -9574,7 +9564,7 @@ checksum = "7a4a1638a1934450809c2266a70362bfc96cd90550c073f5b8a55014d1010157" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.50", ] [[package]] diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index 880a302de3..fe3936d894 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -109,6 +109,60 @@ pub struct TransactionsManagerMetrics { pub(crate) egress_peer_channel_full: Counter, /// Total number of hashes pending fetch. pub(crate) hashes_pending_fetch: Gauge, + + /* ================ POLL DURATION ================ */ + + /* -- Total poll duration of `TransactionsManager` future -- */ + /// Duration in seconds of call to + /// [`TransactionsManager`](crate::transactions::TransactionsManager)'s poll function. + /// + /// Updating metrics could take time, so the true duration of this call could + /// be longer than the sum of the accumulated durations of polling nested streams. + pub(crate) duration_poll_tx_manager: Gauge, + + /* -- Poll duration of items nested in `TransactionsManager` future -- */ + /// Accumulated time spent streaming session updates and updating peers accordingly, in + /// one call to poll the [`TransactionsManager`](crate::transactions::TransactionsManager) + /// future. + /// + /// Duration in seconds. + pub(crate) acc_duration_poll_network_events: Gauge, + /// Accumulated time spent flushing the queue of batched pending pool imports into pool, in + /// one call to poll the [`TransactionsManager`](crate::transactions::TransactionsManager) + /// future. + /// + /// Duration in seconds. + pub(crate) acc_duration_poll_pending_pool_imports: Gauge, + /// Accumulated time spent streaming transaction and announcement broadcast, queueing for + /// pool import or requesting respectively, in one call to poll the + /// [`TransactionsManager`](crate::transactions::TransactionsManager) future. + /// + /// Duration in seconds. + pub(crate) acc_duration_poll_transaction_events: Gauge, + /// Accumulated time spent streaming fetch events, queueing for pool import on successful + /// fetch, in one call to poll the + /// [`TransactionsManager`](crate::transactions::TransactionsManager) future. + /// + /// Duration in seconds. + pub(crate) acc_duration_poll_fetch_events: Gauge, + /// Accumulated time spent streaming and propagating transactions that were successfully + /// imported into the pool, in one call to poll the + /// [`TransactionsManager`](crate::transactions::TransactionsManager) future. + /// + /// Duration in seconds. + pub(crate) acc_duration_poll_imported_transactions: Gauge, + /// Accumulated time spent assembling and sending requests for hashes fetching pending, in + /// one call to poll the [`TransactionsManager`](crate::transactions::TransactionsManager) + /// future. + /// + /// Duration in seconds. + pub(crate) acc_duration_fetch_pending_hashes: Gauge, + /// Accumulated time spent streaming commands and propagating, fetching and serving + /// transactions accordingly, in one call to poll the + /// [`TransactionsManager`](crate::transactions::TransactionsManager) future. + /// + /// Duration in seconds. + pub(crate) acc_duration_poll_commands: Gauge, } /// Metrics for Disconnection types diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 489f53ce8c..65727c2357 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -63,6 +63,7 @@ use std::{ Arc, }, task::{Context, Poll}, + time::{Duration, Instant}, }; use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; @@ -316,6 +317,32 @@ where self.metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch); } + #[inline] + fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) { + let metrics = &self.metrics; + + let TxManagerPollDurations { + acc_network_events, + acc_pending_imports, + acc_tx_events, + acc_imported_txns, + acc_fetch_events, + acc_pending_fetch, + acc_cmds, + } = poll_durations; + + // update metrics for whole poll function + metrics.duration_poll_tx_manager.set(start.elapsed()); + // update poll metrics for nested streams + metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64()); + metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64()); + metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64()); + metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64()); + metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64()); + metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64()); + metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64()); + } + /// Request handler for an incoming request for transactions fn on_get_pooled_transactions( &mut self, @@ -1083,6 +1110,29 @@ where } } +/// Measures the duration of executing the given code block. The duration is added to the given +/// accumulator value passed as a mutable reference. +macro_rules! duration_metered_exec { + ($code:block, $acc:ident) => { + let start = Instant::now(); + + $code; + + *$acc += start.elapsed(); + }; +} + +#[derive(Debug, Default)] +struct TxManagerPollDurations { + acc_network_events: Duration, + acc_pending_imports: Duration, + acc_tx_events: Duration, + acc_imported_txns: Duration, + acc_fetch_events: Duration, + acc_pending_fetch: Duration, + acc_cmds: Duration, +} + /// An endless future. Preemption ensure that future is non-blocking, nonetheless. See /// [`crate::NetworkManager`] for more context on the design pattern. /// @@ -1094,6 +1144,9 @@ where type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let start = Instant::now(); + let mut poll_durations = TxManagerPollDurations::default(); + let this = self.get_mut(); // If the budget is exhausted we manually yield back control to tokio. See @@ -1103,113 +1156,166 @@ where loop { let mut some_ready = false; - // drain network/peer related events - if let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) { - this.on_network_event(event); - some_ready = true; - } + let acc = &mut poll_durations.acc_network_events; + duration_metered_exec!( + { + // advance network/peer related events + if let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) { + this.on_network_event(event); + some_ready = true; + } + }, + acc + ); - if this.has_capacity_for_fetching_pending_hashes() { - // try drain buffered transactions. - let info = &this.pending_pool_imports_info; - let max_pending_pool_imports = info.max_pending_pool_imports; - let has_capacity_wrt_pending_pool_imports = - |divisor| info.has_capacity(max_pending_pool_imports / divisor); + let acc = &mut poll_durations.acc_pending_fetch; + duration_metered_exec!( + { + if this.has_capacity_for_fetching_pending_hashes() { + // try drain transaction hashes pending fetch + let info = &this.pending_pool_imports_info; + let max_pending_pool_imports = info.max_pending_pool_imports; + let has_capacity_wrt_pending_pool_imports = + |divisor| info.has_capacity(max_pending_pool_imports / divisor); - let metrics = &this.metrics; - let metrics_increment_egress_peer_channel_full = - || metrics.egress_peer_channel_full.increment(1); + let metrics = &this.metrics; + let metrics_increment_egress_peer_channel_full = + || metrics.egress_peer_channel_full.increment(1); - this.transaction_fetcher.on_fetch_pending_hashes( - &this.peers, - has_capacity_wrt_pending_pool_imports, - metrics_increment_egress_peer_channel_full, - ); - } - // drain commands - if let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) { - this.on_command(cmd); - some_ready = true; - } - - // drain incoming transaction events - if let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) { - this.on_network_tx_event(event); - some_ready = true; - } - - this.update_fetch_metrics(); - - // drain fetching transaction events - if let Poll::Ready(Some(fetch_event)) = this.transaction_fetcher.poll_next_unpin(cx) { - match fetch_event { - FetchEvent::TransactionsFetched { peer_id, transactions } => { - this.import_transactions( - peer_id, - transactions, - TransactionSource::Response, + this.transaction_fetcher.on_fetch_pending_hashes( + &this.peers, + has_capacity_wrt_pending_pool_imports, + metrics_increment_egress_peer_channel_full, ); } - FetchEvent::FetchError { peer_id, error } => { - trace!(target: "net::tx", ?peer_id, ?error, "requesting transactions from peer failed"); - this.on_request_error(peer_id, error); + }, + acc + ); + + let acc = &mut poll_durations.acc_cmds; + duration_metered_exec!( + { + // advance commands + if let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) { + this.on_command(cmd); + some_ready = true; } - } - some_ready = true; - } + }, + acc + ); - this.update_fetch_metrics(); + let acc = &mut poll_durations.acc_tx_events; + duration_metered_exec!( + { + // advance incoming transaction events + if let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) { + this.on_network_tx_event(event); + some_ready = true; + } + }, + acc + ); - // Advance all imports - if let Poll::Ready(Some(batch_import_res)) = this.pool_imports.poll_next_unpin(cx) { - for res in batch_import_res { - match res { - Ok(hash) => { - this.on_good_import(hash); - } - Err(err) => { - // if we're _currently_ syncing and the transaction is bad we - // ignore it, otherwise we penalize the peer that sent the bad - // transaction with the assumption that the peer should have - // known that this transaction is bad. (e.g. consensus - // rules) - if err.is_bad_transaction() && !this.network.is_syncing() { - debug!(target: "net::tx", ?err, "bad pool transaction import"); - this.on_bad_import(err.hash); - continue + let acc = &mut poll_durations.acc_fetch_events; + duration_metered_exec!( + { + this.update_fetch_metrics(); + + // advance fetching transaction events + if let Poll::Ready(Some(fetch_event)) = + this.transaction_fetcher.poll_next_unpin(cx) + { + match fetch_event { + FetchEvent::TransactionsFetched { peer_id, transactions } => { + this.import_transactions( + peer_id, + transactions, + TransactionSource::Response, + ); + } + FetchEvent::FetchError { peer_id, error } => { + trace!(target: "net::tx", ?peer_id, ?error, "requesting transactions from peer failed"); + this.on_request_error(peer_id, error); } - this.on_good_import(err.hash); } + some_ready = true; } - } - some_ready = true; - } + this.update_fetch_metrics(); + }, + acc + ); - // handle and propagate new transactions. - // - // higher priority! stream is drained - // - let mut new_txs = Vec::new(); - while let Poll::Ready(Some(hash)) = this.pending_transactions.poll_next_unpin(cx) { - new_txs.push(hash); - } - if !new_txs.is_empty() { - this.on_new_transactions(new_txs); - } + let acc = &mut poll_durations.acc_pending_imports; + duration_metered_exec!( + { + // Advance all imports + if let Poll::Ready(Some(batch_import_res)) = + this.pool_imports.poll_next_unpin(cx) + { + for res in batch_import_res { + match res { + Ok(hash) => { + this.on_good_import(hash); + } + Err(err) => { + // if we're _currently_ syncing and the transaction is bad we + // ignore it, otherwise we penalize the peer that sent the bad + // transaction with the assumption that the peer should have + // known that this transaction is bad. (e.g. consensus + // rules) + if err.is_bad_transaction() && !this.network.is_syncing() { + debug!(target: "net::tx", ?err, "bad pool transaction import"); + this.on_bad_import(err.hash); + continue + } + this.on_good_import(err.hash); + } + } + } + + some_ready = true; + } + }, + acc + ); + + let acc = &mut poll_durations.acc_imported_txns; + duration_metered_exec!( + { + // drain successful pool insertions, handle and propagate transactions. + // + // higher priority! stream is drained + // + let mut new_txs = Vec::new(); + while let Poll::Ready(Some(hash)) = + this.pending_transactions.poll_next_unpin(cx) + { + new_txs.push(hash); + } + if !new_txs.is_empty() { + this.on_new_transactions(new_txs); + } + }, + acc + ); // all channels are fully drained and import futures pending if !some_ready { - return Poll::Pending + break } budget -= 1; if budget <= 0 { // Make sure we're woken up again cx.waker().wake_by_ref(); - return Poll::Pending + break } } + + this.update_poll_metrics(start, poll_durations); + + Poll::Pending } }