Meter tx manager poll duration (#6688)

Co-authored-by: Oliver Nordbjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
Emilia Hane
2024-02-23 00:19:51 +01:00
committed by GitHub
parent e03ab418b0
commit 9f91c6ad94
3 changed files with 325 additions and 175 deletions

174
Cargo.lock generated
View File

@@ -88,9 +88,9 @@ dependencies = [
[[package]]
name = "ahash"
version = "0.8.8"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42cd52102d3df161c77a887b608d7a4897d7cc112886a9537b738a887a03aaff"
checksum = "d713b3834d76b85304d4d525563c1276e2e30dc97cc67bfb4585a4a29fc2c89f"
dependencies = [
"cfg-if",
"getrandom 0.2.12",
@@ -260,7 +260,7 @@ checksum = "1a047897373be4bbb0224c1afdabca92648dc57a9c9ef6e7b0be3aff7a859c83"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -321,7 +321,7 @@ dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
"syn-solidity",
"tiny-keccak",
]
@@ -390,9 +390,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anstream"
version = "0.6.11"
version = "0.6.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5"
checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -438,9 +438,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.79"
version = "1.0.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca"
checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1"
[[package]]
name = "aquamarine"
@@ -453,7 +453,7 @@ dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -670,7 +670,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -681,7 +681,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -724,7 +724,7 @@ checksum = "823b8bb275161044e2ac7a25879cb3e2480cb403e3943022c7c769c599b756aa"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -735,11 +735,11 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backon"
version = "0.4.1"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9"
checksum = "c79c8ef183b8b663e8cb19cf92fb7d98c56739977bd47eae2de2717bd5de2c2c"
dependencies = [
"fastrand 1.9.0",
"fastrand 2.0.1",
"futures-core",
"pin-project",
"tokio",
@@ -849,7 +849,7 @@ dependencies = [
"regex",
"rustc-hash",
"shlex",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -869,7 +869,7 @@ dependencies = [
"regex",
"rustc-hash",
"shlex",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -1073,7 +1073,7 @@ checksum = "005fa0c5bd20805466dda55eb34cd709bb31a2592bb26927b47714eeed6914d8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
"synstructure",
]
@@ -1161,9 +1161,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.15.0"
version = "3.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d32a994c2b3ca201d9b263612a374263f05e7adde37c4707f693dcd375076d1f"
checksum = "a3b1be7772ee4501dba05acbe66bb1e8760f6a6c474a36035631638e4415f130"
[[package]]
name = "byte-slice-cast"
@@ -1232,7 +1232,7 @@ checksum = "2d886547e41f740c616ae73108f6eb70afe6d940c7bc697cb30f13daec073037"
dependencies = [
"camino",
"cargo-platform",
"semver 1.0.21",
"semver 1.0.22",
"serde",
"serde_json",
"thiserror",
@@ -1252,11 +1252,10 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.0.83"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
checksum = "7f9fa1897e4325be0d68d48df6aa1a71ac2ed4d27723887e7754192705350730"
dependencies = [
"jobserver",
"libc",
]
@@ -1378,7 +1377,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -1412,7 +1411,7 @@ dependencies = [
"quote",
"serde",
"similar-asserts",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -1813,7 +1812,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -1920,7 +1919,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.10.0",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -1953,7 +1952,7 @@ checksum = "c5a91391accf613803c2a9bf9abccdbaa07c54b4244a5b64883f9c3c137c86be"
dependencies = [
"darling_core 0.20.6",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -2039,7 +2038,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -2196,7 +2195,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -2398,7 +2397,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -2411,7 +2410,7 @@ dependencies = [
"num-traits",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -2422,7 +2421,7 @@ checksum = "6fd000fd6988e73bbe993ea3db9b1aa64906ab88766d654973924340c8cddb42"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -2570,7 +2569,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"syn 2.0.49",
"syn 2.0.50",
"toml",
"walkdir",
]
@@ -2588,7 +2587,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_json",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -2614,7 +2613,7 @@ dependencies = [
"serde",
"serde_json",
"strum 0.25.0",
"syn 2.0.49",
"syn 2.0.50",
"tempfile",
"thiserror",
"tiny-keccak",
@@ -2953,7 +2952,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -3836,15 +3835,6 @@ dependencies = [
"libc",
]
[[package]]
name = "jobserver"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.68"
@@ -4314,7 +4304,7 @@ checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -4644,7 +4634,7 @@ dependencies = [
"proc-macro-crate 1.3.1",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -4656,7 +4646,7 @@ dependencies = [
"proc-macro-crate 3.1.0",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -4980,7 +4970,7 @@ dependencies = [
"phf_shared",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -5009,7 +4999,7 @@ checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -5204,7 +5194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5"
dependencies = [
"proc-macro2",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -6225,7 +6215,7 @@ dependencies = [
"quote",
"regex",
"serial_test",
"syn 2.0.49",
"syn 2.0.50",
"trybuild",
]
@@ -7278,7 +7268,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver 1.0.21",
"semver 1.0.22",
]
[[package]]
@@ -7357,9 +7347,9 @@ dependencies = [
[[package]]
name = "ryu"
version = "1.0.16"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]]
name = "ryu-js"
@@ -7543,9 +7533,9 @@ dependencies = [
[[package]]
name = "semver"
version = "1.0.21"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0"
checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca"
dependencies = [
"serde",
]
@@ -7573,9 +7563,9 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
[[package]]
name = "serde"
version = "1.0.196"
version = "1.0.197"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32"
checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2"
dependencies = [
"serde_derive",
]
@@ -7591,20 +7581,20 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.196"
version = "1.0.197"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67"
checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
name = "serde_json"
version = "1.0.113"
version = "1.0.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79"
checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0"
dependencies = [
"indexmap 2.2.3",
"itoa",
@@ -7671,7 +7661,7 @@ dependencies = [
"darling 0.20.6",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -7696,7 +7686,7 @@ checksum = "b93fb4adc70021ac1b47f7d45e8cc4169baaa7ea58483bc5b721d19a26202212"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -8025,7 +8015,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -8038,7 +8028,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -8115,9 +8105,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.49"
version = "2.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "915aea9e586f80826ee59f8453c1101f9d1c4b3964cd2460185ee8e299ada496"
checksum = "74f1bdc9872430ce9b75da68329d1c1746faf50ffac5f19e02b71e37ff881ffb"
dependencies = [
"proc-macro2",
"quote",
@@ -8133,7 +8123,7 @@ dependencies = [
"paste",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -8150,7 +8140,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -8242,7 +8232,7 @@ dependencies = [
"prettyplease",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -8281,14 +8271,14 @@ checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
name = "thread_local"
version = "1.1.7"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if",
"once_cell",
@@ -8408,7 +8398,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -8525,7 +8515,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.6.1",
"winnow 0.6.2",
]
[[package]]
@@ -8635,7 +8625,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -8920,9 +8910,9 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "unicode-normalization"
version = "0.1.22"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5"
dependencies = [
"tinyvec",
]
@@ -9115,7 +9105,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
"wasm-bindgen-shared",
]
@@ -9149,7 +9139,7 @@ checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -9388,9 +9378,9 @@ dependencies = [
[[package]]
name = "winnow"
version = "0.6.1"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d90f4e0f530c4c69f62b80d839e9ef3855edc9cba471a160c4d692deed62b401"
checksum = "7a4191c47f15cc3ec71fcb4913cb83d58def65dd3787610213c649283b5ce178"
dependencies = [
"memchr",
]
@@ -9489,7 +9479,7 @@ checksum = "9e6936f0cce458098a201c245a11bef556c6a0181129c7034d10d76d1ec3a2b8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
"synstructure",
]
@@ -9510,7 +9500,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -9530,7 +9520,7 @@ checksum = "e6a647510471d372f2e6c2e6b7219e44d8c574d24fdc11c610a61455782f18c3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
"synstructure",
]
@@ -9551,7 +9541,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]
@@ -9574,7 +9564,7 @@ checksum = "7a4a1638a1934450809c2266a70362bfc96cd90550c073f5b8a55014d1010157"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.49",
"syn 2.0.50",
]
[[package]]

View File

@@ -109,6 +109,60 @@ pub struct TransactionsManagerMetrics {
pub(crate) egress_peer_channel_full: Counter,
/// Total number of hashes pending fetch.
pub(crate) hashes_pending_fetch: Gauge,
/* ================ POLL DURATION ================ */
/* -- Total poll duration of `TransactionsManager` future -- */
/// Duration in seconds of call to
/// [`TransactionsManager`](crate::transactions::TransactionsManager)'s poll function.
///
/// Updating metrics could take time, so the true duration of this call could
/// be longer than the sum of the accumulated durations of polling nested streams.
pub(crate) duration_poll_tx_manager: Gauge,
/* -- Poll duration of items nested in `TransactionsManager` future -- */
/// Accumulated time spent streaming session updates and updating peers accordingly, in
/// one call to poll the [`TransactionsManager`](crate::transactions::TransactionsManager)
/// future.
///
/// Duration in seconds.
pub(crate) acc_duration_poll_network_events: Gauge,
/// Accumulated time spent flushing the queue of batched pending pool imports into pool, in
/// one call to poll the [`TransactionsManager`](crate::transactions::TransactionsManager)
/// future.
///
/// Duration in seconds.
pub(crate) acc_duration_poll_pending_pool_imports: Gauge,
/// Accumulated time spent streaming transaction and announcement broadcast, queueing for
/// pool import or requesting respectively, in one call to poll the
/// [`TransactionsManager`](crate::transactions::TransactionsManager) future.
///
/// Duration in seconds.
pub(crate) acc_duration_poll_transaction_events: Gauge,
/// Accumulated time spent streaming fetch events, queueing for pool import on successful
/// fetch, in one call to poll the
/// [`TransactionsManager`](crate::transactions::TransactionsManager) future.
///
/// Duration in seconds.
pub(crate) acc_duration_poll_fetch_events: Gauge,
/// Accumulated time spent streaming and propagating transactions that were successfully
/// imported into the pool, in one call to poll the
/// [`TransactionsManager`](crate::transactions::TransactionsManager) future.
///
/// Duration in seconds.
pub(crate) acc_duration_poll_imported_transactions: Gauge,
/// Accumulated time spent assembling and sending requests for hashes fetching pending, in
/// one call to poll the [`TransactionsManager`](crate::transactions::TransactionsManager)
/// future.
///
/// Duration in seconds.
pub(crate) acc_duration_fetch_pending_hashes: Gauge,
/// Accumulated time spent streaming commands and propagating, fetching and serving
/// transactions accordingly, in one call to poll the
/// [`TransactionsManager`](crate::transactions::TransactionsManager) future.
///
/// Duration in seconds.
pub(crate) acc_duration_poll_commands: Gauge,
}
/// Metrics for Disconnection types

View File

@@ -63,6 +63,7 @@ use std::{
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
@@ -316,6 +317,32 @@ where
self.metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
}
#[inline]
fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
let metrics = &self.metrics;
let TxManagerPollDurations {
acc_network_events,
acc_pending_imports,
acc_tx_events,
acc_imported_txns,
acc_fetch_events,
acc_pending_fetch,
acc_cmds,
} = poll_durations;
// update metrics for whole poll function
metrics.duration_poll_tx_manager.set(start.elapsed());
// update poll metrics for nested streams
metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
}
/// Request handler for an incoming request for transactions
fn on_get_pooled_transactions(
&mut self,
@@ -1083,6 +1110,29 @@ where
}
}
/// Measures the duration of executing the given code block. The duration is added to the given
/// accumulator value passed as a mutable reference.
macro_rules! duration_metered_exec {
($code:block, $acc:ident) => {
let start = Instant::now();
$code;
*$acc += start.elapsed();
};
}
#[derive(Debug, Default)]
struct TxManagerPollDurations {
acc_network_events: Duration,
acc_pending_imports: Duration,
acc_tx_events: Duration,
acc_imported_txns: Duration,
acc_fetch_events: Duration,
acc_pending_fetch: Duration,
acc_cmds: Duration,
}
/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
/// [`crate::NetworkManager`] for more context on the design pattern.
///
@@ -1094,6 +1144,9 @@ where
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let start = Instant::now();
let mut poll_durations = TxManagerPollDurations::default();
let this = self.get_mut();
// If the budget is exhausted we manually yield back control to tokio. See
@@ -1103,113 +1156,166 @@ where
loop {
let mut some_ready = false;
// drain network/peer related events
if let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) {
this.on_network_event(event);
some_ready = true;
}
let acc = &mut poll_durations.acc_network_events;
duration_metered_exec!(
{
// advance network/peer related events
if let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) {
this.on_network_event(event);
some_ready = true;
}
},
acc
);
if this.has_capacity_for_fetching_pending_hashes() {
// try drain buffered transactions.
let info = &this.pending_pool_imports_info;
let max_pending_pool_imports = info.max_pending_pool_imports;
let has_capacity_wrt_pending_pool_imports =
|divisor| info.has_capacity(max_pending_pool_imports / divisor);
let acc = &mut poll_durations.acc_pending_fetch;
duration_metered_exec!(
{
if this.has_capacity_for_fetching_pending_hashes() {
// try drain transaction hashes pending fetch
let info = &this.pending_pool_imports_info;
let max_pending_pool_imports = info.max_pending_pool_imports;
let has_capacity_wrt_pending_pool_imports =
|divisor| info.has_capacity(max_pending_pool_imports / divisor);
let metrics = &this.metrics;
let metrics_increment_egress_peer_channel_full =
|| metrics.egress_peer_channel_full.increment(1);
let metrics = &this.metrics;
let metrics_increment_egress_peer_channel_full =
|| metrics.egress_peer_channel_full.increment(1);
this.transaction_fetcher.on_fetch_pending_hashes(
&this.peers,
has_capacity_wrt_pending_pool_imports,
metrics_increment_egress_peer_channel_full,
);
}
// drain commands
if let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
this.on_command(cmd);
some_ready = true;
}
// drain incoming transaction events
if let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) {
this.on_network_tx_event(event);
some_ready = true;
}
this.update_fetch_metrics();
// drain fetching transaction events
if let Poll::Ready(Some(fetch_event)) = this.transaction_fetcher.poll_next_unpin(cx) {
match fetch_event {
FetchEvent::TransactionsFetched { peer_id, transactions } => {
this.import_transactions(
peer_id,
transactions,
TransactionSource::Response,
this.transaction_fetcher.on_fetch_pending_hashes(
&this.peers,
has_capacity_wrt_pending_pool_imports,
metrics_increment_egress_peer_channel_full,
);
}
FetchEvent::FetchError { peer_id, error } => {
trace!(target: "net::tx", ?peer_id, ?error, "requesting transactions from peer failed");
this.on_request_error(peer_id, error);
},
acc
);
let acc = &mut poll_durations.acc_cmds;
duration_metered_exec!(
{
// advance commands
if let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
this.on_command(cmd);
some_ready = true;
}
}
some_ready = true;
}
},
acc
);
this.update_fetch_metrics();
let acc = &mut poll_durations.acc_tx_events;
duration_metered_exec!(
{
// advance incoming transaction events
if let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) {
this.on_network_tx_event(event);
some_ready = true;
}
},
acc
);
// Advance all imports
if let Poll::Ready(Some(batch_import_res)) = this.pool_imports.poll_next_unpin(cx) {
for res in batch_import_res {
match res {
Ok(hash) => {
this.on_good_import(hash);
}
Err(err) => {
// if we're _currently_ syncing and the transaction is bad we
// ignore it, otherwise we penalize the peer that sent the bad
// transaction with the assumption that the peer should have
// known that this transaction is bad. (e.g. consensus
// rules)
if err.is_bad_transaction() && !this.network.is_syncing() {
debug!(target: "net::tx", ?err, "bad pool transaction import");
this.on_bad_import(err.hash);
continue
let acc = &mut poll_durations.acc_fetch_events;
duration_metered_exec!(
{
this.update_fetch_metrics();
// advance fetching transaction events
if let Poll::Ready(Some(fetch_event)) =
this.transaction_fetcher.poll_next_unpin(cx)
{
match fetch_event {
FetchEvent::TransactionsFetched { peer_id, transactions } => {
this.import_transactions(
peer_id,
transactions,
TransactionSource::Response,
);
}
FetchEvent::FetchError { peer_id, error } => {
trace!(target: "net::tx", ?peer_id, ?error, "requesting transactions from peer failed");
this.on_request_error(peer_id, error);
}
this.on_good_import(err.hash);
}
some_ready = true;
}
}
some_ready = true;
}
this.update_fetch_metrics();
},
acc
);
// handle and propagate new transactions.
//
// higher priority! stream is drained
//
let mut new_txs = Vec::new();
while let Poll::Ready(Some(hash)) = this.pending_transactions.poll_next_unpin(cx) {
new_txs.push(hash);
}
if !new_txs.is_empty() {
this.on_new_transactions(new_txs);
}
let acc = &mut poll_durations.acc_pending_imports;
duration_metered_exec!(
{
// Advance all imports
if let Poll::Ready(Some(batch_import_res)) =
this.pool_imports.poll_next_unpin(cx)
{
for res in batch_import_res {
match res {
Ok(hash) => {
this.on_good_import(hash);
}
Err(err) => {
// if we're _currently_ syncing and the transaction is bad we
// ignore it, otherwise we penalize the peer that sent the bad
// transaction with the assumption that the peer should have
// known that this transaction is bad. (e.g. consensus
// rules)
if err.is_bad_transaction() && !this.network.is_syncing() {
debug!(target: "net::tx", ?err, "bad pool transaction import");
this.on_bad_import(err.hash);
continue
}
this.on_good_import(err.hash);
}
}
}
some_ready = true;
}
},
acc
);
let acc = &mut poll_durations.acc_imported_txns;
duration_metered_exec!(
{
// drain successful pool insertions, handle and propagate transactions.
//
// higher priority! stream is drained
//
let mut new_txs = Vec::new();
while let Poll::Ready(Some(hash)) =
this.pending_transactions.poll_next_unpin(cx)
{
new_txs.push(hash);
}
if !new_txs.is_empty() {
this.on_new_transactions(new_txs);
}
},
acc
);
// all channels are fully drained and import futures pending
if !some_ready {
return Poll::Pending
break
}
budget -= 1;
if budget <= 0 {
// Make sure we're woken up again
cx.waker().wake_by_ref();
return Poll::Pending
break
}
}
this.update_poll_metrics(start, poll_durations);
Poll::Pending
}
}