diff --git a/Cargo.lock b/Cargo.lock index d191766a5b..8614146260 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -568,7 +568,7 @@ checksum = "ce8849c74c9ca0f5a03da1c865e3eb6f768df816e67dd3721a398a8a7e398011" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -818,7 +818,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -835,7 +835,7 @@ dependencies = [ "proc-macro2", "quote", "sha3", - "syn 2.0.115", + "syn 2.0.116", "syn-solidity", ] @@ -851,7 +851,7 @@ dependencies = [ "macro-string", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", "syn-solidity", ] @@ -983,7 +983,7 @@ dependencies = [ "darling 0.21.3", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1077,7 +1077,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1219,7 +1219,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62945a2f7e6de02a31fe400aa489f0e0f5b2502e69f95f853adb82a96c7a6b60" dependencies = [ "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1257,7 +1257,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1346,7 +1346,7 @@ checksum = "213888f660fddcca0d257e88e54ac05bca01885f258ccdf695bafd77031bb69d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1462,7 +1462,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1473,7 +1473,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1511,7 +1511,7 @@ checksum = "ffdcb70bdbc4d478427380519163274ac86e52916e10f0a8889adf0f96d3fee7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1647,7 +1647,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1854,7 +1854,7 @@ dependencies = [ "cow-utils", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", "synstructure", ] @@ -1910,7 +1910,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -1993,7 +1993,7 @@ checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -2240,7 +2240,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -2787,7 +2787,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -2844,7 +2844,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -2859,7 +2859,7 @@ dependencies = [ "quote", "serde", "strsim", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -2872,7 +2872,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -2883,7 +2883,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ "darling_core 0.20.11", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -2894,7 +2894,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core 0.21.3", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -2905,7 +2905,7 @@ checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d" dependencies = [ "darling_core 0.23.0", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -2947,7 +2947,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3006,7 +3006,7 @@ checksum = "ef941ded77d15ca19b40374869ac6000af1c9f2a4c0f3d4c70926287e6364a8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3017,7 +3017,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3038,7 +3038,7 @@ dependencies = [ "darling 0.20.11", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3048,7 +3048,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" dependencies = [ "derive_builder_core", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3070,7 +3070,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version 0.4.1", - "syn 2.0.115", + "syn 2.0.116", "unicode-xid", ] @@ -3196,7 +3196,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3243,7 +3243,7 @@ checksum = "1ec431cd708430d5029356535259c5d645d60edd3d39c54e5eea9782d46caa7d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3295,7 +3295,7 @@ dependencies = [ "enum-ordinalize", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3402,7 +3402,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3422,7 +3422,7 @@ checksum = "8ca9601fb2d62598ee17836250842873a413586e5d7ed88b356e38ddbb0ec631" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3442,7 +3442,7 @@ checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3524,7 +3524,7 @@ dependencies = [ "darling 0.20.11", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -3536,7 +3536,7 @@ dependencies = [ "darling 0.23.0", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -4031,7 +4031,7 @@ checksum = "6dc7a9cb3326bafb80642c5ce99b39a2c0702d4bfa8ee8a3e773791a6cbe2407" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -4104,9 +4104,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" dependencies = [ "futures-channel", "futures-core", @@ -4119,9 +4119,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", "futures-sink", @@ -4142,15 +4142,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" [[package]] name = "futures-executor" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" dependencies = [ "futures-core", "futures-task", @@ -4159,9 +4159,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" [[package]] name = "futures-lite" @@ -4193,26 +4193,26 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] name = "futures-sink" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" [[package]] name = "futures-task" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" [[package]] name = "futures-timer" @@ -4226,9 +4226,9 @@ dependencies = [ [[package]] name = "futures-util" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-channel", "futures-core", @@ -4238,7 +4238,6 @@ dependencies = [ "futures-task", "memchr", "pin-project-lite", - "pin-utils", "slab", ] @@ -4954,7 +4953,7 @@ checksum = "a0eb5a3343abf848c0984fe4604b2b105da9539376e24fc0a3b0007411ae4fd9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -5073,7 +5072,7 @@ dependencies = [ "indoc", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -5350,7 +5349,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -5757,7 +5756,7 @@ checksum = "1b27834086c65ec3f9387b096d66e99f221cf081c2b738042aa252bcd41204e3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -5781,7 +5780,7 @@ checksum = "757aee279b8bdbb9f9e676796fd459e4207a1f986e87886700abf589f5abf771" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -5801,9 +5800,9 @@ checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "memmap2" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744133e4a0e0a658e1374cf3bf8e415c4052a15a111acd372764c55b4177d490" +checksum = "714098028fe011992e1c3962653c96b2d578c4b4bce9036e15ff220319b1e0e3" dependencies = [ "libc", ] @@ -5835,7 +5834,7 @@ checksum = "161ab904c2c62e7bda0f7562bf22f96440ca35ff79e66c800cbac298f2f4f5ec" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -5968,7 +5967,7 @@ checksum = "59b43b4fd69e3437618106f7754f34021b831a514f9e1a98ae863cabcd8d8dad" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -6224,7 +6223,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -6606,7 +6605,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -6721,7 +6720,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -6750,7 +6749,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -6891,7 +6890,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -6942,7 +6941,7 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -7015,7 +7014,7 @@ checksum = "fb6dc647500e84a25a85b100e76c85b8ace114c209432dc174f20aac11d4ed6c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -7038,7 +7037,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -7433,7 +7432,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -7889,7 +7888,7 @@ dependencies = [ "proc-macro2", "quote", "similar-asserts", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -10351,9 +10350,7 @@ dependencies = [ name = "reth-tasks" version = "1.11.0" dependencies = [ - "auto_impl", "crossbeam-utils", - "dyn-clone", "futures-util", "metrics", "pin-project", @@ -10991,7 +10988,7 @@ dependencies = [ "regex", "relative-path", "rustc_version 0.4.1", - "syn 2.0.115", + "syn 2.0.116", "unicode-ident", ] @@ -11431,7 +11428,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -11508,7 +11505,7 @@ dependencies = [ "darling 0.21.3", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -11821,7 +11818,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -11843,9 +11840,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.115" +version = "2.0.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e614ed320ac28113fa64972c4262d5dbc89deacdfd00c34a3e4cea073243c12" +checksum = "3df424c70518695237746f84cede799c9c58fcb37450d7b23716568cc8bc69cb" dependencies = [ "proc-macro2", "quote", @@ -11861,7 +11858,7 @@ dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -11881,14 +11878,14 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] name = "sysinfo" -version = "0.38.1" +version = "0.38.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5792d209c2eac902426c0c4a166c9f72147db453af548cf9bf3242644c4d4fe3" +checksum = "1efc19935b4b66baa6f654ac7924c192f55b175c00a7ab72410fc24284dacda8" dependencies = [ "libc", "memchr", @@ -11958,7 +11955,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -11969,7 +11966,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", "test-case-core", ] @@ -12009,7 +12006,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -12057,7 +12054,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -12068,7 +12065,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -12215,7 +12212,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -12466,7 +12463,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -12660,7 +12657,7 @@ dependencies = [ "darling 0.20.11", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -12953,7 +12950,7 @@ checksum = "d674d135b4a8c1d7e813e2f8d1c9a58308aee4a680323066025e53132218bd91" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -13066,7 +13063,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", "wasm-bindgen-shared", ] @@ -13286,7 +13283,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -13297,7 +13294,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -13689,7 +13686,7 @@ dependencies = [ "heck", "indexmap 2.13.0", "prettyplease", - "syn 2.0.115", + "syn 2.0.116", "wasm-metadata", "wit-bindgen-core", "wit-component", @@ -13705,7 +13702,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", "wit-bindgen-core", "wit-bindgen-rust", ] @@ -13828,7 +13825,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", "synstructure", ] @@ -13849,7 +13846,7 @@ checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -13869,7 +13866,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", "synstructure", ] @@ -13890,7 +13887,7 @@ checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] @@ -13924,7 +13921,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.116", ] [[package]] diff --git a/crates/cli/commands/src/import_core.rs b/crates/cli/commands/src/import_core.rs index 37e0cf0868..2e9ce23774 100644 --- a/crates/cli/commands/src/import_core.rs +++ b/crates/cli/commands/src/import_core.rs @@ -139,7 +139,7 @@ where total_decoded_blocks += file_client.headers_len(); total_decoded_txns += file_client.total_transactions(); - let (mut pipeline, events) = build_import_pipeline_impl( + let (mut pipeline, events, _runtime) = build_import_pipeline_impl( config, provider_factory.clone(), &consensus, @@ -265,7 +265,11 @@ pub fn build_import_pipeline_impl( static_file_producer: StaticFileProducer>, disable_exec: bool, evm_config: E, -) -> eyre::Result<(Pipeline, impl futures::Stream> + use)> +) -> eyre::Result<( + Pipeline, + impl futures::Stream> + use, + reth_tasks::Runtime, +)> where N: ProviderNodeTypes, C: FullConsensus + 'static, @@ -281,9 +285,12 @@ where .sealed_header(last_block_number)? .ok_or_else(|| ProviderError::HeaderNotFound(last_block_number.into()))?; + let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current()) + .expect("failed to create runtime"); + let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) .build(file_client.clone(), consensus.clone()) - .into_task(); + .into_task_with(&runtime); // TODO: The pipeline should correctly configure the downloader on its own. // Find the possibility to remove unnecessary pre-configuration. header_downloader.update_local_head(local_head); @@ -291,7 +298,7 @@ where let mut body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies) .build(file_client.clone(), consensus.clone(), provider_factory.clone()) - .into_task(); + .into_task_with(&runtime); // TODO: The pipeline should correctly configure the downloader on its own. // Find the possibility to remove unnecessary pre-configuration. body_downloader @@ -326,5 +333,5 @@ where let events = pipeline.events().map(Into::into); - Ok((pipeline, events)) + Ok((pipeline, events, runtime)) } diff --git a/crates/engine/tree/src/backfill.rs b/crates/engine/tree/src/backfill.rs index 53a5ac4f31..61513827d3 100644 --- a/crates/engine/tree/src/backfill.rs +++ b/crates/engine/tree/src/backfill.rs @@ -10,7 +10,7 @@ use futures::FutureExt; use reth_provider::providers::ProviderNodeTypes; use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult}; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use std::task::{ready, Context, Poll}; use tokio::sync::oneshot; use tracing::trace; @@ -80,7 +80,7 @@ pub enum BackfillEvent { #[derive(Debug)] pub struct PipelineSync { /// The type that can spawn the pipeline task. - pipeline_task_spawner: Box, + pipeline_task_spawner: Runtime, /// The current state of the pipeline. /// The pipeline is used for large ranges. pipeline_state: PipelineState, @@ -90,7 +90,7 @@ pub struct PipelineSync { impl PipelineSync { /// Create a new instance. - pub fn new(pipeline: Pipeline, pipeline_task_spawner: Box) -> Self { + pub fn new(pipeline: Pipeline, pipeline_task_spawner: Runtime) -> Self { Self { pipeline_task_spawner, pipeline_state: PipelineState::Idle(Some(Box::new(pipeline))), @@ -140,10 +140,10 @@ impl PipelineSync { let pipeline = pipeline.take().expect("exists"); self.pipeline_task_spawner.spawn_critical_blocking_task( "pipeline task", - Box::pin(async move { + async move { let result = pipeline.run_as_fut(Some(target)).await; let _ = tx.send(result); - }), + }, ); self.pipeline_state = PipelineState::Running(rx); @@ -241,7 +241,7 @@ mod tests { use reth_provider::test_utils::MockNodeTypesWithDB; use reth_stages::ExecOutput; use reth_stages_api::StageCheckpoint; - use reth_tasks::TokioTaskExecutor; + use reth_tasks::Runtime; use std::{collections::VecDeque, future::poll_fn, sync::Arc}; struct TestHarness { @@ -267,7 +267,7 @@ mod tests { })])) .build(chain_spec); - let pipeline_sync = PipelineSync::new(pipeline, Box::::default()); + let pipeline_sync = PipelineSync::new(pipeline, Runtime::test()); let client = TestFullBlockClient::default(); let header = Header { base_fee_per_gas: Some(7), diff --git a/crates/engine/tree/src/launch.rs b/crates/engine/tree/src/launch.rs index 3bfc458d9a..5d1523e017 100644 --- a/crates/engine/tree/src/launch.rs +++ b/crates/engine/tree/src/launch.rs @@ -25,7 +25,7 @@ use reth_provider::{ }; use reth_prune::PrunerWithFactory; use reth_stages_api::{MetricEventsSender, Pipeline}; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use reth_trie_db::ChangesetCache; use std::sync::Arc; @@ -54,7 +54,7 @@ pub fn build_engine_orchestrator( client: Client, incoming_requests: S, pipeline: Pipeline, - pipeline_task_spawner: Box, + pipeline_task_spawner: Runtime, provider: ProviderFactory, blockchain_db: BlockchainProvider, pruner: PrunerWithFactory>, diff --git a/crates/ethereum/node/src/node.rs b/crates/ethereum/node/src/node.rs index 04dd28f8b3..b18060e045 100644 --- a/crates/ethereum/node/src/node.rs +++ b/crates/ethereum/node/src/node.rs @@ -285,7 +285,7 @@ where Arc::new(ctx.node.consensus().clone()), ctx.node.evm_config().clone(), ctx.config.rpc.flashbots_config(), - Box::new(ctx.node.task_executor().clone()), + ctx.node.task_executor().clone(), Arc::new(EthereumEngineValidator::new(ctx.config.chain.clone())), ); diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index 1592d351f5..297af8da46 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -16,7 +16,7 @@ use reth_network_p2p::{ }; use reth_primitives_traits::{size::InMemorySize, Block, SealedHeader}; use reth_storage_api::HeaderProvider; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use reth_tasks::Runtime; use std::{ cmp::Ordering, collections::BinaryHeap, @@ -285,17 +285,9 @@ where C: BodiesClient + 'static, Provider: HeaderProvider
+ Unpin + 'static, { - /// Spawns the downloader task via [`tokio::task::spawn`] - pub fn into_task(self) -> TaskDownloader { - self.into_task_with(&TokioTaskExecutor::default()) - } - - /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given spawner. - pub fn into_task_with(self, spawner: &S) -> TaskDownloader - where - S: TaskSpawner, - { - TaskDownloader::spawn_with(self, spawner) + /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given [`Runtime`]. + pub fn into_task_with(self, runtime: &Runtime) -> TaskDownloader { + TaskDownloader::spawn_with(self, runtime) } } diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index 763fac1812..48a3cd8d02 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -1,13 +1,13 @@ use alloy_primitives::BlockNumber; use futures::Stream; -use futures_util::{FutureExt, StreamExt}; +use futures_util::StreamExt; use pin_project::pin_project; use reth_network_p2p::{ bodies::downloader::{BodyDownloader, BodyDownloaderResult}, error::DownloadResult, }; use reth_primitives_traits::Block; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use reth_tasks::Runtime; use std::{ fmt::Debug, future::Future, @@ -32,50 +32,11 @@ pub struct TaskDownloader { } impl TaskDownloader { - /// Spawns the given `downloader` via [`tokio::task::spawn`] returns a [`TaskDownloader`] that's - /// connected to that task. - /// - /// # Panics - /// - /// This method panics if called outside of a Tokio runtime - /// - /// # Example - /// - /// ``` - /// use reth_consensus::Consensus; - /// use reth_downloaders::bodies::{bodies::BodiesDownloaderBuilder, task::TaskDownloader}; - /// use reth_network_p2p::bodies::client::BodiesClient; - /// use reth_primitives_traits::{Block, InMemorySize}; - /// use reth_storage_api::HeaderProvider; - /// use std::{fmt::Debug, sync::Arc}; - /// - /// fn t< - /// B: Block + 'static, - /// C: BodiesClient + 'static, - /// Provider: HeaderProvider
+ Unpin + 'static, - /// >( - /// client: Arc, - /// consensus: Arc>, - /// provider: Provider, - /// ) { - /// let downloader = - /// BodiesDownloaderBuilder::default().build::(client, consensus, provider); - /// let downloader = TaskDownloader::spawn(downloader); - /// } - /// ``` - pub fn spawn(downloader: T) -> Self - where - T: BodyDownloader + 'static, - { - Self::spawn_with(downloader, &TokioTaskExecutor::default()) - } - - /// Spawns the given `downloader` via the given [`TaskSpawner`] returns a [`TaskDownloader`] + /// Spawns the given `downloader` via the given [`Runtime`] and returns a [`TaskDownloader`] /// that's connected to that task. - pub fn spawn_with(downloader: T, spawner: &S) -> Self + pub fn spawn_with(downloader: T, runtime: &Runtime) -> Self where T: BodyDownloader + 'static, - S: TaskSpawner, { let (bodies_tx, bodies_rx) = mpsc::channel(BODIES_TASK_BUFFER_SIZE); let (to_downloader, updates_rx) = mpsc::unbounded_channel(); @@ -86,7 +47,7 @@ impl TaskDownloader { downloader, }; - spawner.spawn_task(downloader.boxed()); + runtime.spawn_task(downloader); Self { from_downloader: ReceiverStream::new(bodies_rx), to_downloader } } @@ -201,7 +162,8 @@ mod tests { Arc::new(TestConsensus::default()), factory, ); - let mut downloader = TaskDownloader::spawn(downloader); + let runtime = Runtime::test(); + let mut downloader = TaskDownloader::spawn_with(downloader, &runtime); downloader.set_download_range(0..=19).expect("failed to set download range"); @@ -224,7 +186,8 @@ mod tests { Arc::new(TestConsensus::default()), factory, ); - let mut downloader = TaskDownloader::spawn(downloader); + let runtime = Runtime::test(); + let mut downloader = TaskDownloader::spawn_with(downloader, &runtime); downloader.set_download_range(1..=0).expect("failed to set download range"); assert_matches!(downloader.next().await, Some(Err(DownloadError::InvalidBodyRange { .. }))); diff --git a/crates/net/downloaders/src/headers/reverse_headers.rs b/crates/net/downloaders/src/headers/reverse_headers.rs index 059b76834c..6a866f0ad9 100644 --- a/crates/net/downloaders/src/headers/reverse_headers.rs +++ b/crates/net/downloaders/src/headers/reverse_headers.rs @@ -21,7 +21,7 @@ use reth_network_p2p::{ }; use reth_network_peers::PeerId; use reth_primitives_traits::{GotExpected, SealedHeader}; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use reth_tasks::Runtime; use std::{ cmp::{Ordering, Reverse}, collections::{binary_heap::PeekMut, BinaryHeap}, @@ -660,20 +660,12 @@ where H: HeadersClient, Self: HeaderDownloader + 'static, { - /// Spawns the downloader task via [`tokio::task::spawn`] - pub fn into_task(self) -> TaskDownloader<::Header> { - self.into_task_with(&TokioTaskExecutor::default()) - } - - /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given `spawner`. - pub fn into_task_with( + /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given [`Runtime`]. + pub fn into_task_with( self, - spawner: &S, - ) -> TaskDownloader<::Header> - where - S: TaskSpawner, - { - TaskDownloader::spawn_with(self, spawner) + runtime: &Runtime, + ) -> TaskDownloader<::Header> { + TaskDownloader::spawn_with(self, runtime) } } diff --git a/crates/net/downloaders/src/headers/task.rs b/crates/net/downloaders/src/headers/task.rs index 83a2dc76b5..61d17d2fb6 100644 --- a/crates/net/downloaders/src/headers/task.rs +++ b/crates/net/downloaders/src/headers/task.rs @@ -1,5 +1,5 @@ use alloy_primitives::Sealable; -use futures::{FutureExt, Stream}; +use futures::Stream; use futures_util::StreamExt; use pin_project::pin_project; use reth_network_p2p::headers::{ @@ -7,7 +7,7 @@ use reth_network_p2p::headers::{ error::HeadersDownloaderResult, }; use reth_primitives_traits::SealedHeader; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use reth_tasks::Runtime; use std::{ fmt::Debug, future::Future, @@ -33,42 +33,11 @@ pub struct TaskDownloader { // === impl TaskDownloader === impl TaskDownloader { - /// Spawns the given `downloader` via [`tokio::task::spawn`] and returns a [`TaskDownloader`] + /// Spawns the given `downloader` via the given [`Runtime`] and returns a [`TaskDownloader`] /// that's connected to that task. - /// - /// # Panics - /// - /// This method panics if called outside of a Tokio runtime - /// - /// # Example - /// - /// ``` - /// # use std::sync::Arc; - /// # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloader; - /// # use reth_downloaders::headers::task::TaskDownloader; - /// # use reth_consensus::HeaderValidator; - /// # use reth_network_p2p::headers::client::HeadersClient; - /// # use reth_primitives_traits::BlockHeader; - /// # fn t + 'static>(consensus:Arc>, client: Arc) { - /// let downloader = ReverseHeadersDownloader::::builder().build( - /// client, - /// consensus - /// ); - /// let downloader = TaskDownloader::spawn(downloader); - /// # } - pub fn spawn(downloader: T) -> Self + pub fn spawn_with(downloader: T, runtime: &Runtime) -> Self where T: HeaderDownloader
+ 'static, - { - Self::spawn_with(downloader, &TokioTaskExecutor::default()) - } - - /// Spawns the given `downloader` via the given [`TaskSpawner`] returns a [`TaskDownloader`] - /// that's connected to that task. - pub fn spawn_with(downloader: T, spawner: &S) -> Self - where - T: HeaderDownloader
+ 'static, - S: TaskSpawner, { let (headers_tx, headers_rx) = mpsc::channel(HEADERS_TASK_BUFFER_SIZE); let (to_downloader, updates_rx) = mpsc::unbounded_channel(); @@ -78,7 +47,7 @@ impl TaskDownloader { updates: UnboundedReceiverStream::new(updates_rx), downloader, }; - spawner.spawn_task(downloader.boxed()); + runtime.spawn_task(downloader); Self { from_downloader: ReceiverStream::new(headers_rx), to_downloader } } @@ -209,7 +178,8 @@ mod tests { .request_limit(1) .build(Arc::clone(&client), Arc::new(TestConsensus::default())); - let mut downloader = TaskDownloader::spawn(downloader); + let runtime = Runtime::test(); + let mut downloader = TaskDownloader::spawn_with(downloader, &runtime); downloader.update_local_head(p3.clone()); downloader.update_sync_target(SyncTarget::Tip(p0.hash())); diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 93223fabcd..c86861bf42 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -20,7 +20,7 @@ use reth_ethereum_forks::{ForkFilter, Head}; use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer}; use reth_network_types::{PeersConfig, SessionsConfig}; use reth_storage_api::{noop::NoopProvider, BlockNumReader, BlockReader, HeaderProvider}; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use reth_tasks::Runtime; use secp256k1::SECP256K1; use std::{collections::HashSet, net::SocketAddr, sync::Arc}; @@ -76,7 +76,7 @@ pub struct NetworkConfig { /// The default mode of the network. pub network_mode: NetworkMode, /// The executor to use for spawning tasks. - pub executor: Box, + pub executor: Runtime, /// The `Status` message to send to peers at the beginning. pub status: UnifiedStatus, /// Sets the hello message for the p2p handshake in `RLPx` @@ -206,7 +206,7 @@ pub struct NetworkConfigBuilder { /// The default mode of the network. network_mode: NetworkMode, /// The executor to use for spawning tasks. - executor: Option>, + executor: Option, /// Sets the hello message for the p2p handshake in `RLPx` hello_message: Option, /// The executor to use for spawning tasks. @@ -342,7 +342,7 @@ impl NetworkConfigBuilder { /// Sets the executor to use for spawning tasks. /// /// If `None`, then [`tokio::spawn`] is used for spawning tasks. - pub fn with_task_executor(mut self, executor: Box) -> Self { + pub fn with_task_executor(mut self, executor: Runtime) -> Self { self.executor = Some(executor); self } @@ -691,7 +691,11 @@ impl NetworkConfigBuilder { chain_id, block_import: block_import.unwrap_or_else(|| Box::::default()), network_mode, - executor: executor.unwrap_or_else(|| Box::::default()), + executor: executor.unwrap_or_else(|| match tokio::runtime::Handle::try_current() { + Ok(handle) => Runtime::with_existing_handle(handle) + .expect("failed to create runtime with existing handle"), + Err(_) => Runtime::test(), + }), status, hello_message, extra_protocols, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 2ce3c29e3f..7bc8fd8714 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -28,7 +28,7 @@ use reth_metrics::common::mpsc::MeteredPollSender; use reth_network_api::{PeerRequest, PeerRequestSender}; use reth_network_peers::PeerId; use reth_network_types::SessionsConfig; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use rustc_hash::FxHashMap; use secp256k1::SecretKey; use std::{ @@ -87,7 +87,7 @@ pub struct SessionManager { /// Size of the command buffer per session. session_command_buffer: usize, /// The executor for spawned tasks. - executor: Box, + executor: Runtime, /// All pending session that are currently handshaking, exchanging `Hello`s. /// /// Events produced during the authentication phase are reported to this manager. Once the @@ -130,7 +130,7 @@ impl SessionManager { pub fn new( secret_key: SecretKey, config: SessionsConfig, - executor: Box, + executor: Runtime, status: UnifiedStatus, hello_message: HelloMessageWithProtocols, fork_filter: ForkFilter, @@ -229,7 +229,7 @@ impl SessionManager { where F: Future + Send + 'static, { - self.executor.spawn_task(f.boxed()); + self.executor.spawn_task(f); } /// Invoked on a received status update. diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 9671077345..570e03ddfa 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -29,7 +29,7 @@ use reth_network_peers::PeerId; use reth_storage_api::{ noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory, }; -use reth_tasks::TokioTaskExecutor; +use reth_tasks::Runtime; use reth_tokio_util::EventStream; use reth_transaction_pool::{ blobstore::InMemoryBlobStore, @@ -198,7 +198,7 @@ where peer.client.clone(), EthEvmConfig::mainnet(), blob_store.clone(), - TokioTaskExecutor::default(), + Runtime::test(), ); peer.map_transactions_manager(EthTransactionPool::eth_pool( pool, @@ -228,7 +228,7 @@ where peer.client.clone(), EthEvmConfig::mainnet(), blob_store.clone(), - TokioTaskExecutor::default(), + Runtime::test(), ); peer.map_transactions_manager_with( diff --git a/crates/node/api/src/node.rs b/crates/node/api/src/node.rs index d4e6aa4030..8cab662325 100644 --- a/crates/node/api/src/node.rs +++ b/crates/node/api/src/node.rs @@ -98,7 +98,7 @@ pub trait FullNodeComponents: FullNodeTypes + Clone + 'static { /// Returns an executor handle to spawn tasks. /// /// This can be used to spawn critical, blocking tasks or register tasks that should be - /// terminated gracefully. See also [`TaskSpawner`](reth_tasks::TaskSpawner). + /// terminated gracefully. fn task_executor(&self) -> &TaskExecutor; } diff --git a/crates/node/builder/src/builder/mod.rs b/crates/node/builder/src/builder/mod.rs index 09f08ad82b..c98fb37f0d 100644 --- a/crates/node/builder/src/builder/mod.rs +++ b/crates/node/builder/src/builder/mod.rs @@ -986,7 +986,7 @@ impl>> BuilderContext secret_key, default_peers_path, ) - .with_task_executor(Box::new(self.executor.clone())) + .with_task_executor(self.executor.clone()) .set_head(self.head); Ok(builder) diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index c93ffbb4b9..ae80fa876a 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -231,7 +231,7 @@ impl EngineNodeLauncher { network_client.clone(), Box::pin(consensus_engine_stream), pipeline, - Box::new(ctx.task_executor().clone()), + ctx.task_executor().clone(), ctx.provider_factory().clone(), ctx.blockchain_db().clone(), pruner, diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index f7a9e0fc88..ce84852e7c 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -1016,7 +1016,7 @@ where .with_provider(node.provider().clone()) .with_pool(node.pool().clone()) .with_network(node.network().clone()) - .with_executor(Box::new(node.task_executor().clone())) + .with_executor(node.task_executor().clone()) .with_evm_config(node.evm_config().clone()) .with_consensus(node.consensus().clone()) .build_with_auth_server(module_config, engine_api, eth_api, engine_events.clone()); @@ -1403,7 +1403,7 @@ where ctx.beacon_engine_handle.clone(), PayloadStore::new(ctx.node.payload_builder_handle().clone()), ctx.node.pool().clone(), - Box::new(ctx.node.task_executor().clone()), + ctx.node.task_executor().clone(), client, EngineCapabilities::default(), engine_validator, diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 83b644ea39..0877526008 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -20,7 +20,7 @@ use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKin use reth_primitives_traits::{HeaderTy, NodePrimitives, SealedHeader}; use reth_revm::{cached::CachedReads, cancelled::CancelOnDrop}; use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use std::{ fmt, future::Future, @@ -48,11 +48,11 @@ pub type HeaderForPayload

= <

::Primitives as NodePrimitive /// The [`PayloadJobGenerator`] that creates [`BasicPayloadJob`]s. #[derive(Debug)] -pub struct BasicPayloadJobGenerator { +pub struct BasicPayloadJobGenerator { /// The client that can interact with the chain. client: Client, /// The task executor to spawn payload building tasks on. - executor: Tasks, + executor: Runtime, /// The configuration for the job generator. config: BasicPayloadJobGeneratorConfig, /// Restricts how many generator tasks can be executed at once. @@ -67,12 +67,12 @@ pub struct BasicPayloadJobGenerator { // === impl BasicPayloadJobGenerator === -impl BasicPayloadJobGenerator { +impl BasicPayloadJobGenerator { /// Creates a new [`BasicPayloadJobGenerator`] with the given config and custom /// [`PayloadBuilder`] pub fn with_builder( client: Client, - executor: Tasks, + executor: Runtime, config: BasicPayloadJobGeneratorConfig, builder: Builder, ) -> Self { @@ -112,7 +112,7 @@ impl BasicPayloadJobGenerator { } /// Returns a reference to the tasks type - pub const fn tasks(&self) -> &Tasks { + pub const fn tasks(&self) -> &Runtime { &self.executor } @@ -125,20 +125,18 @@ impl BasicPayloadJobGenerator { // === impl BasicPayloadJobGenerator === -impl PayloadJobGenerator - for BasicPayloadJobGenerator +impl PayloadJobGenerator for BasicPayloadJobGenerator where Client: StateProviderFactory + BlockReaderIdExt

> + Clone + Unpin + 'static, - Tasks: TaskSpawner + Clone + Unpin + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, { - type Job = BasicPayloadJob; + type Job = BasicPayloadJob; fn new_payload_job( &self, @@ -299,14 +297,14 @@ impl Default for BasicPayloadJobGeneratorConfig { /// built and this future will wait to be resolved: [`PayloadJob::resolve`] or terminated if the /// deadline is reached. #[derive(Debug)] -pub struct BasicPayloadJob +pub struct BasicPayloadJob where Builder: PayloadBuilder, { /// The configuration for how the payload will be created. config: PayloadConfig>, /// How to spawn building tasks - executor: Tasks, + executor: Runtime, /// The deadline when this job should resolve. deadline: Pin>, /// The interval at which the job should build a new payload after the last. @@ -330,9 +328,8 @@ where builder: Builder, } -impl BasicPayloadJob +impl BasicPayloadJob where - Tasks: TaskSpawner + Clone + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, @@ -349,22 +346,21 @@ where self.metrics.inc_initiated_payload_builds(); let cached_reads = self.cached_reads.take().unwrap_or_default(); let builder = self.builder.clone(); - self.executor.spawn_blocking_task(Box::pin(async move { + self.executor.spawn_blocking_task(async move { // acquire the permit for executing the task let _permit = guard.acquire().await; let args = BuildArguments { cached_reads, config: payload_config, cancel, best_payload }; let result = builder.try_build(args); let _ = tx.send(result); - })); + }); self.pending_block = Some(PendingPayload { _cancel, payload: rx }); } } -impl Future for BasicPayloadJob +impl Future for BasicPayloadJob where - Tasks: TaskSpawner + Clone + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, @@ -425,9 +421,8 @@ where } } -impl PayloadJob for BasicPayloadJob +impl PayloadJob for BasicPayloadJob where - Tasks: TaskSpawner + Clone + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, @@ -495,10 +490,10 @@ where let (tx, rx) = oneshot::channel(); let config = self.config.clone(); let builder = self.builder.clone(); - self.executor.spawn_blocking_task(Box::pin(async move { + self.executor.spawn_blocking_task(async move { let res = builder.build_empty_payload(config); let _ = tx.send(res); - })); + }); empty_payload = Some(rx); } @@ -506,9 +501,9 @@ where debug!(target: "payload_builder", id=%self.config.payload_id(), "racing fallback payload"); // race the in progress job with this job let (tx, rx) = oneshot::channel(); - self.executor.spawn_blocking_task(Box::pin(async move { + self.executor.spawn_blocking_task(async move { let _ = tx.send(job()); - })); + }); empty_payload = Some(rx); } }; diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 93bfd8acbd..8489749237 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -66,6 +66,7 @@ reth-rpc-engine-api.workspace = true reth-tracing.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } reth-node-ethereum.workspace = true +reth-tasks = { workspace = true, features = ["test-utils"] } alloy-primitives.workspace = true alloy-rpc-types-eth.workspace = true diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 1ef0fd33be..d702b0bea5 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -1,7 +1,7 @@ use reth_rpc::{EthFilter, EthPubSub}; use reth_rpc_eth_api::EthApiTypes; use reth_rpc_eth_types::EthConfig; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; /// Handlers for core, filter and pubsub `eth` namespace APIs. #[derive(Debug, Clone)] @@ -21,11 +21,7 @@ where /// Returns a new instance with the additional handlers for the `eth` namespace. /// /// This will spawn all necessary tasks for the additional handlers. - pub fn bootstrap( - config: EthConfig, - executor: Box, - eth_api: EthApi, - ) -> Self { + pub fn bootstrap(config: EthConfig, executor: Runtime, eth_api: EthApi) -> Self { let filter = EthFilter::new(eth_api.clone(), config.filter_config(), executor.clone()); let pubsub = EthPubSub::with_spawner(eth_api.clone(), executor); diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 8cc029e3f9..1df8bf8fd0 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -57,7 +57,7 @@ use reth_storage_api::{ AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, NodePrimitivesProvider, StateProviderFactory, }; -use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor}; +use reth_tasks::{pool::BlockingTaskGuard, Runtime}; use reth_tokio_util::EventSender; use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool}; use serde::{Deserialize, Serialize}; @@ -123,7 +123,7 @@ pub struct RpcModuleBuilder { /// The Network type to when creating all rpc handlers network: Network, /// How additional tasks are spawned, for example in the eth pubsub namespace - executor: Box, + executor: Option, /// Defines how the EVM should be configured before execution. evm_config: EvmConfig, /// The consensus implementation. @@ -142,11 +142,19 @@ impl provider: Provider, pool: Pool, network: Network, - executor: Box, + executor: Runtime, evm_config: EvmConfig, consensus: Consensus, ) -> Self { - Self { provider, pool, network, executor, evm_config, consensus, _primitives: PhantomData } + Self { + provider, + pool, + network, + executor: Some(executor), + evm_config, + consensus, + _primitives: PhantomData, + } } /// Configure the provider instance. @@ -217,22 +225,13 @@ impl } /// Configure the task executor to use for additional tasks. - pub fn with_executor(self, executor: Box) -> Self { - let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self; - Self { provider, network, pool, executor, evm_config, consensus, _primitives } - } - - /// Configure [`TokioTaskExecutor`] as the task executor to use for additional tasks. - /// - /// This will spawn additional tasks directly via `tokio::task::spawn`, See - /// [`TokioTaskExecutor`]. - pub fn with_tokio_executor(self) -> Self { + pub fn with_executor(self, executor: Runtime) -> Self { let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self; Self { provider, network, pool, - executor: Box::new(TokioTaskExecutor::default()), + executor: Some(executor), evm_config, consensus, _primitives, @@ -365,6 +364,8 @@ where EthApi: FullEthApiServer, { let Self { provider, pool, network, executor, consensus, evm_config, .. } = self; + let executor = + executor.expect("RpcModuleBuilder requires a Runtime to be set via `with_executor`"); RpcRegistryInner::new( provider, pool, @@ -408,7 +409,15 @@ where impl Default for RpcModuleBuilder { fn default() -> Self { - Self::new((), (), (), Box::new(TokioTaskExecutor::default()), (), ()) + Self { + provider: (), + pool: (), + network: (), + executor: None, + evm_config: (), + consensus: (), + _primitives: PhantomData, + } } } @@ -486,7 +495,7 @@ pub struct RpcRegistryInner, + executor: Runtime, evm_config: EvmConfig, consensus: Consensus, /// Holds all `eth_` namespace handlers @@ -525,7 +534,7 @@ where provider: Provider, pool: Pool, network: Network, - executor: Box, + executor: Runtime, consensus: Consensus, config: RpcModuleConfig, evm_config: EvmConfig, @@ -578,8 +587,8 @@ where } /// Returns a reference to the tasks type - pub const fn tasks(&self) -> &(dyn TaskSpawner + 'static) { - &*self.executor + pub const fn tasks(&self) -> &Runtime { + &self.executor } /// Returns a reference to the provider @@ -944,7 +953,7 @@ where RethRpcModule::Debug => DebugApi::new( eth_api.clone(), self.blocking_pool_guard.clone(), - &*self.executor, + &self.executor, self.engine_events.new_listener(), ) .into_rpc() diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index b41951b722..5bbca0a591 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -19,7 +19,7 @@ use reth_rpc_builder::{ use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi}; use reth_rpc_layer::JwtSecret; use reth_rpc_server_types::RpcModuleSelection; -use reth_tasks::TokioTaskExecutor; +use reth_tasks::Runtime; use reth_transaction_pool::{ noop::NoopTransactionPool, test_utils::{TestPool, TestPoolBuilder}, @@ -49,7 +49,7 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle { beacon_engine_handle, spawn_test_payload_service().into(), NoopTransactionPool::default(), - Box::::default(), + Runtime::test(), client, EngineCapabilities::default(), EthereumEngineValidator::new(MAINNET.clone()), @@ -134,7 +134,7 @@ pub fn test_rpc_builder( .with_provider(NoopProvider::default()) .with_pool(TestPoolBuilder::default().into()) .with_network(NoopNetwork::default()) - .with_executor(Box::new(TokioTaskExecutor::default())) + .with_executor(Runtime::test()) .with_evm_config(EthEvmConfig::mainnet()) .with_consensus(NoopConsensus::default()) } diff --git a/crates/rpc/rpc-builder/tests/it/ws.rs b/crates/rpc/rpc-builder/tests/it/ws.rs index 29f9818730..6c6af7270a 100644 --- a/crates/rpc/rpc-builder/tests/it/ws.rs +++ b/crates/rpc/rpc-builder/tests/it/ws.rs @@ -145,7 +145,7 @@ async fn test_eth_subscribe_pending_transactions_receives_tx() { use reth_network_api::noop::NoopNetwork; use reth_provider::test_utils::NoopProvider; use reth_rpc_builder::RpcModuleBuilder; - use reth_tasks::TokioTaskExecutor; + use reth_tasks::Runtime; use reth_transaction_pool::{ test_utils::{TestPool, TestPoolBuilder}, PoolTransaction, TransactionOrigin, TransactionPool, @@ -160,7 +160,7 @@ async fn test_eth_subscribe_pending_transactions_receives_tx() { .with_provider(NoopProvider::default()) .with_pool(pool) .with_network(NoopNetwork::default()) - .with_executor(Box::new(TokioTaskExecutor::default())) + .with_executor(Runtime::test()) .with_evm_config(EthEvmConfig::mainnet()) .with_consensus(NoopConsensus::default()); diff --git a/crates/rpc/rpc-engine-api/Cargo.toml b/crates/rpc/rpc-engine-api/Cargo.toml index 2702a40419..ba5d85977b 100644 --- a/crates/rpc/rpc-engine-api/Cargo.toml +++ b/crates/rpc/rpc-engine-api/Cargo.toml @@ -53,5 +53,6 @@ reth-payload-builder = { workspace = true, features = ["test-utils"] } reth-testing-utils.workspace = true alloy-rlp.workspace = true reth-node-ethereum.workspace = true +reth-tasks = { workspace = true, features = ["test-utils"] } assert_matches.workspace = true diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 0aec613660..eea88fba41 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -28,7 +28,7 @@ use reth_payload_primitives::{ use reth_primitives_traits::{Block, BlockBody}; use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule}; use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory}; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use reth_transaction_pool::TransactionPool; use std::{ sync::Arc, @@ -91,7 +91,7 @@ where beacon_consensus: ConsensusEngineHandle, payload_store: PayloadStore, tx_pool: Pool, - task_spawner: Box, + task_spawner: Runtime, client: ClientVersionV1, capabilities: EngineCapabilities, validator: Validator, @@ -532,7 +532,7 @@ where let (tx, rx) = oneshot::channel(); let inner = self.inner.clone(); - self.inner.task_spawner.spawn_blocking_task(Box::pin(async move { + self.inner.task_spawner.spawn_blocking_task(async move { if count > MAX_PAYLOAD_BODIES_LIMIT { tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok(); return; @@ -574,7 +574,7 @@ where }; } tx.send(Ok(result)).ok(); - })); + }); rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))? } @@ -659,7 +659,7 @@ where let (tx, rx) = oneshot::channel(); let inner = self.inner.clone(); - self.inner.task_spawner.spawn_blocking_task(Box::pin(async move { + self.inner.task_spawner.spawn_blocking_task(async move { let mut result = Vec::with_capacity(hashes.len()); for hash in hashes { let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash)); @@ -674,7 +674,7 @@ where } } tx.send(Ok(result)).ok(); - })); + }); rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))? } @@ -1325,7 +1325,7 @@ struct EngineApiInner, /// For spawning and executing async tasks - task_spawner: Box, + task_spawner: Runtime, /// The latency and response type metrics for engine api calls metrics: EngineApiMetrics, /// Identification of the execution client used by the consensus client @@ -1356,7 +1356,7 @@ mod tests { use reth_node_ethereum::EthereumEngineValidator; use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_provider::test_utils::MockEthProvider; - use reth_tasks::TokioTaskExecutor; + use reth_tasks::Runtime; use reth_transaction_pool::noop::NoopTransactionPool; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; @@ -1381,7 +1381,7 @@ mod tests { let provider = Arc::new(MockEthProvider::default()); let payload_store = spawn_test_payload_service(); let (to_engine, engine_rx) = unbounded_channel(); - let task_executor = Box::::default(); + let task_executor = Runtime::test(); let api = EngineApi::new( provider.clone(), chain_spec.clone(), @@ -1488,7 +1488,7 @@ mod tests { ConsensusEngineHandle::new(to_engine), payload_store.into(), NoopTransactionPool::default(), - Box::::default(), + Runtime::test(), ClientVersionV1 { code: ClientCode::RH, name: "Reth".to_string(), diff --git a/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs b/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs index 3908d407c7..de29c93460 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs @@ -5,7 +5,7 @@ use futures::Future; use reth_rpc_eth_types::EthApiError; use reth_tasks::{ pool::{BlockingTaskGuard, BlockingTaskPool}, - TaskSpawner, + Runtime, }; use std::sync::Arc; use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore}; @@ -27,7 +27,7 @@ pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static { /// Returns a handle for spawning IO heavy blocking tasks. /// /// Runtime access in default trait method implementations. - fn io_task_spawner(&self) -> impl TaskSpawner; + fn io_task_spawner(&self) -> &Runtime; /// Returns a handle for spawning __CPU heavy__ blocking tasks, such as tracing requests. /// @@ -163,10 +163,10 @@ pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static { { let (tx, rx) = oneshot::channel(); let this = self.clone(); - self.io_task_spawner().spawn_blocking_task(Box::pin(async move { + self.io_task_spawner().spawn_blocking_task(async move { let res = f(this); let _ = tx.send(res); - })); + }); async move { rx.await.map_err(|_| EthApiError::InternalEthError)? } } @@ -186,10 +186,10 @@ pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static { { let (tx, rx) = oneshot::channel(); let this = self.clone(); - self.io_task_spawner().spawn_blocking_task(Box::pin(async move { + self.io_task_spawner().spawn_blocking_task(async move { let res = f(this).await; let _ = tx.send(res); - })); + }); async move { rx.await.map_err(|_| EthApiError::InternalEthError)? } } diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index d070d1e01e..20c9430f8e 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -11,7 +11,7 @@ use reth_errors::{ProviderError, ProviderResult}; use reth_execution_types::Chain; use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock}; use reth_storage_api::{BlockReader, TransactionVariant}; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use reth_tasks::Runtime; use schnellru::{ByLength, Limiter, LruMap}; use std::{ future::Future, @@ -76,15 +76,15 @@ impl Clone for EthStateCache { impl EthStateCache { /// Creates and returns both [`EthStateCache`] frontend and the memory bound service. - fn create( + fn create( provider: Provider, - action_task_spawner: Tasks, + action_task_spawner: Runtime, max_blocks: u32, max_receipts: u32, max_headers: u32, max_concurrent_db_operations: usize, max_cached_tx_hashes: u32, - ) -> (Self, EthStateCacheService) + ) -> (Self, EthStateCacheService) where Provider: BlockReader, { @@ -113,21 +113,25 @@ impl EthStateCache { where Provider: BlockReader + Clone + Unpin + 'static, { - Self::spawn_with(provider, config, TokioTaskExecutor::default()) + Self::spawn_with( + provider, + config, + Runtime::with_existing_handle(tokio::runtime::Handle::current()) + .expect("failed to create Runtime"), + ) } /// Creates a new async LRU backed cache service task and spawns it to a new task via the given /// spawner. /// /// The cache is memory limited by the given max bytes values. - pub fn spawn_with( + pub fn spawn_with( provider: Provider, config: EthStateCacheConfig, - executor: Tasks, + executor: Runtime, ) -> Self where Provider: BlockReader + Clone + Unpin + 'static, - Tasks: TaskSpawner + Clone + 'static, { let EthStateCacheConfig { max_blocks, @@ -145,7 +149,7 @@ impl EthStateCache { max_concurrent_db_requests, max_cached_tx_hashes, ); - executor.spawn_critical_task("eth state cache", Box::pin(service)); + executor.spawn_critical_task("eth state cache", service); this } @@ -343,10 +347,9 @@ pub(crate) struct EthStateCacheService< tx_hash_index: LruMap, } -impl EthStateCacheService +impl EthStateCacheService where Provider: BlockReader + Clone + Unpin + 'static, - Tasks: TaskSpawner + Clone + 'static, { /// Indexes all transactions in a block by transaction hash. fn index_block_transactions(&mut self, block: &RecoveredBlock) { @@ -449,10 +452,9 @@ where } } -impl Future for EthStateCacheService +impl Future for EthStateCacheService where Provider: BlockReader + Clone + Unpin + 'static, - Tasks: TaskSpawner + Clone + 'static, { type Output = (); @@ -494,21 +496,19 @@ where let rate_limiter = this.rate_limiter.clone(); let mut action_sender = ActionSender::new(CacheKind::Block, block_hash, action_tx); - this.action_task_spawner.spawn_blocking_task(Box::pin( - async move { - // Acquire permit - let _permit = rate_limiter.acquire().await; - // Only look in the database to prevent situations where we - // looking up the tree is blocking - let block_sender = provider - .sealed_block_with_senders( - BlockHashOrNumber::Hash(block_hash), - TransactionVariant::WithHash, - ) - .map(|maybe_block| maybe_block.map(Arc::new)); - action_sender.send_block(block_sender); - }, - )); + this.action_task_spawner.spawn_blocking_task(async move { + // Acquire permit + let _permit = rate_limiter.acquire().await; + // Only look in the database to prevent situations where we + // looking up the tree is blocking + let block_sender = provider + .sealed_block_with_senders( + BlockHashOrNumber::Hash(block_hash), + TransactionVariant::WithHash, + ) + .map(|maybe_block| maybe_block.map(Arc::new)); + action_sender.send_block(block_sender); + }); } } CacheAction::GetReceipts { block_hash, response_tx } => { @@ -525,17 +525,15 @@ where let rate_limiter = this.rate_limiter.clone(); let mut action_sender = ActionSender::new(CacheKind::Receipt, block_hash, action_tx); - this.action_task_spawner.spawn_blocking_task(Box::pin( - async move { - // Acquire permit - let _permit = rate_limiter.acquire().await; - let res = provider - .receipts_by_block(block_hash.into()) - .map(|maybe_receipts| maybe_receipts.map(Arc::new)); + this.action_task_spawner.spawn_blocking_task(async move { + // Acquire permit + let _permit = rate_limiter.acquire().await; + let res = provider + .receipts_by_block(block_hash.into()) + .map(|maybe_receipts| maybe_receipts.map(Arc::new)); - action_sender.send_receipts(res); - }, - )); + action_sender.send_receipts(res); + }); } } CacheAction::GetHeader { block_hash, response_tx } => { @@ -559,19 +557,16 @@ where let rate_limiter = this.rate_limiter.clone(); let mut action_sender = ActionSender::new(CacheKind::Header, block_hash, action_tx); - this.action_task_spawner.spawn_blocking_task(Box::pin( - async move { - // Acquire permit - let _permit = rate_limiter.acquire().await; - let header = - provider.header(block_hash).and_then(|header| { - header.ok_or_else(|| { - ProviderError::HeaderNotFound(block_hash.into()) - }) - }); - action_sender.send_header(header); - }, - )); + this.action_task_spawner.spawn_blocking_task(async move { + // Acquire permit + let _permit = rate_limiter.acquire().await; + let header = provider.header(block_hash).and_then(|header| { + header.ok_or_else(|| { + ProviderError::HeaderNotFound(block_hash.into()) + }) + }); + action_sender.send_header(header); + }); } } CacheAction::ReceiptsResult { block_hash, res } => { diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index fe0dbba420..c9355c14eb 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -35,7 +35,7 @@ use reth_storage_api::{ BlockIdReader, BlockReaderIdExt, HeaderProvider, ProviderBlock, ReceiptProviderIdExt, StateProofProvider, StateProviderFactory, StateRootProvider, TransactionVariant, }; -use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner}; +use reth_tasks::{pool::BlockingTaskGuard, Runtime}; use reth_trie_common::{updates::TrieUpdates, HashedPostState}; use revm::DatabaseCommit; use revm_inspectors::tracing::{DebugInspector, TransactionContext}; @@ -59,7 +59,7 @@ where pub fn new( eth_api: Eth, blocking_task_guard: BlockingTaskGuard, - executor: impl TaskSpawner, + executor: &Runtime, mut stream: impl Stream> + Send + Unpin + 'static, ) -> Self { let bad_block_store = BadBlockStore::default(); @@ -70,7 +70,7 @@ where }); // Spawn a task caching bad blocks - executor.spawn_task(Box::pin(async move { + executor.spawn_task(async move { while let Some(event) = stream.next().await { if let ConsensusEngineEvent::InvalidBlock(block) = event && let Ok(recovered) = @@ -79,7 +79,7 @@ where bad_block_store.insert(recovered); } } - })); + }); Self { inner } } diff --git a/crates/rpc/rpc/src/eth/builder.rs b/crates/rpc/rpc/src/eth/builder.rs index 92ddce510e..5864cbfdce 100644 --- a/crates/rpc/rpc/src/eth/builder.rs +++ b/crates/rpc/rpc/src/eth/builder.rs @@ -18,7 +18,7 @@ use reth_rpc_server_types::constants::{ DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKING_IO_REQUEST, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_PROOF_PERMITS, }; -use reth_tasks::{pool::BlockingTaskPool, TaskSpawner, TokioTaskExecutor}; +use reth_tasks::{pool::BlockingTaskPool, Runtime}; use std::{sync::Arc, time::Duration}; /// A helper to build the `EthApi` handler instance. @@ -39,7 +39,7 @@ pub struct EthApiBuilder { gas_oracle_config: GasPriceOracleConfig, gas_oracle: Option>, blocking_task_pool: Option, - task_spawner: Box, + task_spawner: Runtime, next_env: NextEnv, max_batch_size: usize, max_blocking_io_requests: usize, @@ -147,7 +147,8 @@ where blocking_task_pool: None, fee_history_cache_config: FeeHistoryCacheConfig::default(), proof_permits: DEFAULT_PROOF_PERMITS, - task_spawner: TokioTaskExecutor::default().boxed(), + task_spawner: Runtime::with_existing_handle(tokio::runtime::Handle::current()) + .expect("called outside tokio runtime"), gas_oracle_config: Default::default(), eth_state_cache_config: Default::default(), next_env: Default::default(), @@ -167,8 +168,8 @@ where N: RpcNodeCore, { /// Configures the task spawner used to spawn additional tasks. - pub fn task_spawner(mut self, spawner: impl TaskSpawner + 'static) -> Self { - self.task_spawner = Box::new(spawner); + pub fn task_spawner(mut self, spawner: Runtime) -> Self { + self.task_spawner = spawner; self } @@ -527,9 +528,9 @@ where let cache = eth_cache.clone(); task_spawner.spawn_critical_task( "cache canonical blocks for fee history task", - Box::pin(async move { + async move { fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider, cache).await; - }), + }, ); EthApiInner::new( diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 640d1aa3f5..ac7cfa1ddf 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -27,7 +27,7 @@ use reth_rpc_eth_types::{ use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, ProviderHeader}; use reth_tasks::{ pool::{BlockingTaskGuard, BlockingTaskPool}, - TaskSpawner, TokioTaskExecutor, + Runtime, }; use reth_transaction_pool::{ blobstore::BlobSidecarConverter, noop::NoopTransactionPool, AddedTransactionOutcome, @@ -168,7 +168,8 @@ where eth_proof_window, blocking_task_pool, fee_history_cache, - TokioTaskExecutor::default().boxed(), + Runtime::with_existing_handle(tokio::runtime::Handle::current()) + .expect("called outside tokio runtime"), proof_permits, rpc_converter, (), @@ -254,7 +255,7 @@ where Rpc: RpcConvert, { #[inline] - fn io_task_spawner(&self) -> impl TaskSpawner { + fn io_task_spawner(&self) -> &Runtime { self.inner.task_spawner() } @@ -294,7 +295,7 @@ pub struct EthApiInner { /// The block number at which the node started starting_block: U256, /// The type that can spawn tasks which would otherwise block. - task_spawner: Box, + task_spawner: Runtime, /// Cached pending block if any pending_block: Mutex>>, /// A pool dedicated to CPU heavy blocking tasks. @@ -356,7 +357,7 @@ where eth_proof_window: u64, blocking_task_pool: BlockingTaskPool, fee_history_cache: FeeHistoryCache>, - task_spawner: Box, + task_spawner: Runtime, proof_permits: usize, converter: Rpc, next_env: impl PendingEnvBuilder, @@ -385,7 +386,7 @@ where // Create tx pool insertion batcher let (processor, tx_batch_sender) = BatchTxProcessor::new(components.pool().clone(), max_batch_size); - task_spawner.spawn_critical_task("tx-batcher", Box::pin(processor)); + task_spawner.spawn_critical_task("tx-batcher", processor); Self { components, @@ -454,8 +455,8 @@ where /// Returns a handle to the task spawner. #[inline] - pub const fn task_spawner(&self) -> &dyn TaskSpawner { - &*self.task_spawner + pub const fn task_spawner(&self) -> &Runtime { + &self.task_spawner } /// Returns a handle to the blocking thread pool. diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index dcfa605ca3..a98b4be51e 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -31,7 +31,7 @@ use reth_storage_api::{ BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock, ProviderReceipt, ReceiptProvider, }; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool}; use std::{ collections::{HashMap, VecDeque}, @@ -125,7 +125,7 @@ where /// use reth_network_api::noop::NoopNetwork; /// use reth_provider::noop::NoopProvider; /// use reth_rpc::{EthApi, EthFilter}; - /// use reth_tasks::TokioTaskExecutor; + /// use reth_tasks::Runtime; /// use reth_transaction_pool::noop::NoopTransactionPool; /// let eth_api = EthApi::builder( /// NoopProvider::default(), @@ -134,9 +134,9 @@ where /// EthEvmConfig::mainnet(), /// ) /// .build(); - /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed()); + /// let filter = EthFilter::new(eth_api, Default::default(), Runtime::test()); /// ``` - pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box) -> Self { + pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Runtime) -> Self { let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } = config; let inner = EthFilterInner { @@ -154,9 +154,9 @@ where let this = eth_filter.clone(); eth_filter.inner.task_spawner.spawn_critical_task( "eth-filters_stale-filters-clean", - Box::pin(async move { + async move { this.watch_and_clear_stale_filters().await; - }), + }, ); eth_filter @@ -436,7 +436,7 @@ struct EthFilterInner { /// maximum number of headers to read at once for range filter max_headers_range: u64, /// The type that can spawn tasks. - task_spawner: Box, + task_spawner: Runtime, /// Duration since the last filter poll, after which the filter is considered stale stale_filter_ttl: Duration, } @@ -1320,7 +1320,7 @@ mod tests { use reth_rpc_convert::RpcConverter; use reth_rpc_eth_api::node::RpcNodeCoreAdapter; use reth_rpc_eth_types::receipt::EthReceiptConverter; - use reth_tasks::TokioTaskExecutor; + use reth_tasks::Runtime; use reth_testing_utils::generators; use reth_transaction_pool::test_utils::{testing_pool, TestPool}; use std::{collections::VecDeque, sync::Arc}; @@ -1369,11 +1369,8 @@ mod tests { let provider = MockEthProvider::default(); let eth_api = build_test_eth_api(provider); - let eth_filter = super::EthFilter::new( - eth_api, - EthFilterConfig::default(), - Box::new(TokioTaskExecutor::default()), - ); + let eth_filter = + super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test()); let filter_inner = eth_filter.inner; let headers = vec![]; @@ -1397,11 +1394,8 @@ mod tests { let provider = MockEthProvider::default(); let eth_api = build_test_eth_api(provider); - let eth_filter = super::EthFilter::new( - eth_api, - EthFilterConfig::default(), - Box::new(TokioTaskExecutor::default()), - ); + let eth_filter = + super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test()); let filter_inner = eth_filter.inner; let headers = vec![ @@ -1515,11 +1509,8 @@ mod tests { let provider = MockEthProvider::default(); let eth_api = build_test_eth_api(provider); - let eth_filter = super::EthFilter::new( - eth_api, - EthFilterConfig::default(), - Box::new(TokioTaskExecutor::default()), - ); + let eth_filter = + super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test()); let filter_inner = eth_filter.inner; let headers = vec![SealedHeader::new( @@ -1585,11 +1576,8 @@ mod tests { let eth_api = build_test_eth_api(provider); - let eth_filter = super::EthFilter::new( - eth_api, - EthFilterConfig::default(), - Box::new(TokioTaskExecutor::default()), - ); + let eth_filter = + super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test()); let filter_inner = eth_filter.inner; let headers = vec![ @@ -1679,11 +1667,8 @@ mod tests { let eth_api = build_test_eth_api(provider); - let eth_filter = super::EthFilter::new( - eth_api, - EthFilterConfig::default(), - Box::new(TokioTaskExecutor::default()), - ); + let eth_filter = + super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test()); let filter_inner = eth_filter.inner; let headers = vec![ @@ -1750,11 +1735,8 @@ mod tests { provider.add_receipts(test_block_number, vec![mock_receipt.clone()]); let eth_api = build_test_eth_api(provider); - let eth_filter = super::EthFilter::new( - eth_api, - EthFilterConfig::default(), - Box::new(TokioTaskExecutor::default()), - ); + let eth_filter = + super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test()); let filter_inner = eth_filter.inner; let headers = vec![test_header.clone()]; @@ -1785,11 +1767,8 @@ mod tests { let provider = MockEthProvider::default(); let eth_api = build_test_eth_api(provider); - let eth_filter = super::EthFilter::new( - eth_api, - EthFilterConfig::default(), - Box::new(TokioTaskExecutor::default()), - ); + let eth_filter = + super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test()); let filter_inner = eth_filter.inner; let headers: Vec> = vec![]; @@ -1882,11 +1861,7 @@ mod tests { .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 }); let eth_api = build_test_eth_api(provider); - let eth_filter = EthFilter::new( - eth_api, - EthFilterConfig::default(), - Box::new(TokioTaskExecutor::default()), - ); + let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test()); // Use default filter which will match any non-empty bloom let filter = Filter::default(); diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 1f7374fa5c..234aace0b1 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -20,7 +20,7 @@ use reth_rpc_eth_api::{ use reth_rpc_eth_types::logs_utils; use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err}; use reth_storage_api::BlockNumReader; -use reth_tasks::{TaskSpawner, TokioTaskExecutor}; +use reth_tasks::Runtime; use reth_transaction_pool::{NewTransactionEvent, TransactionPool}; use serde::Serialize; use tokio_stream::{ @@ -45,11 +45,15 @@ impl EthPubSub { /// /// Subscription tasks are spawned via [`tokio::task::spawn`] pub fn new(eth_api: Eth) -> Self { - Self::with_spawner(eth_api, Box::::default()) + Self::with_spawner( + eth_api, + Runtime::with_existing_handle(tokio::runtime::Handle::current()) + .expect("called outside tokio runtime"), + ) } /// Creates a new, shareable instance. - pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box) -> Self { + pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Runtime) -> Self { let inner = EthPubSubInner { eth_api, subscription_task_spawner }; Self { inner: Arc::new(inner) } } @@ -214,9 +218,9 @@ where ) -> jsonrpsee::core::SubscriptionResult { let sink = pending.accept().await?; let pubsub = self.clone(); - self.inner.subscription_task_spawner.spawn_task(Box::pin(async move { + self.inner.subscription_task_spawner.spawn_task(async move { let _ = pubsub.handle_accepted(sink, kind, params).await; - })); + }); Ok(()) } @@ -288,7 +292,7 @@ struct EthPubSubInner { /// The `eth` API. eth_api: EthApi, /// The type that's used to spawn subscription tasks. - subscription_task_spawner: Box, + subscription_task_spawner: Runtime, } // == impl EthPubSubInner === diff --git a/crates/rpc/rpc/src/reth.rs b/crates/rpc/rpc/src/reth.rs index 4963c48689..94a480261f 100644 --- a/crates/rpc/rpc/src/reth.rs +++ b/crates/rpc/rpc/src/reth.rs @@ -15,7 +15,7 @@ use reth_primitives_traits::{NodePrimitives, SealedHeader}; use reth_rpc_api::RethApiServer; use reth_rpc_eth_types::{EthApiError, EthResult}; use reth_storage_api::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory}; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use serde::Serialize; use tokio::sync::oneshot; @@ -35,7 +35,7 @@ impl RethApi { } /// Create a new instance of the [`RethApi`] - pub fn new(provider: Provider, task_spawner: Box) -> Self { + pub fn new(provider: Provider, task_spawner: Runtime) -> Self { let inner = Arc::new(RethApiInner { provider, task_spawner }); Self { inner } } @@ -55,10 +55,10 @@ where let (tx, rx) = oneshot::channel(); let this = self.clone(); let f = c(this); - self.inner.task_spawner.spawn_blocking_task(Box::pin(async move { + self.inner.task_spawner.spawn_blocking_task(async move { let res = f.await; let _ = tx.send(res); - })); + }); rx.await.map_err(|_| EthApiError::InternalEthError)? } @@ -116,7 +116,7 @@ where ) -> jsonrpsee::core::SubscriptionResult { let sink = pending.accept().await?; let stream = self.provider().canonical_state_stream(); - self.inner.task_spawner.spawn_task(Box::pin(pipe_from_stream(sink, stream))); + self.inner.task_spawner.spawn_task(pipe_from_stream(sink, stream)); Ok(()) } @@ -128,7 +128,7 @@ where ) -> jsonrpsee::core::SubscriptionResult { let sink = pending.accept().await?; let stream = self.provider().persisted_block_stream(); - self.inner.task_spawner.spawn_task(Box::pin(pipe_from_stream(sink, stream))); + self.inner.task_spawner.spawn_task(pipe_from_stream(sink, stream)); Ok(()) } @@ -141,11 +141,11 @@ where let sink = pending.accept().await?; let canon_stream = self.provider().canonical_state_stream(); let finalized_stream = self.provider().finalized_block_stream(); - self.inner.task_spawner.spawn_task(Box::pin(finalized_chain_notifications( + self.inner.task_spawner.spawn_task(finalized_chain_notifications( sink, canon_stream, finalized_stream, - ))); + )); Ok(()) } @@ -262,5 +262,5 @@ struct RethApiInner { /// The provider that can interact with the chain. provider: Provider, /// The type that can spawn tasks which would otherwise block. - task_spawner: Box, + task_spawner: Runtime, } diff --git a/crates/rpc/rpc/src/validation.rs b/crates/rpc/rpc/src/validation.rs index 71c1a730ac..ab880b1450 100644 --- a/crates/rpc/rpc/src/validation.rs +++ b/crates/rpc/rpc/src/validation.rs @@ -37,7 +37,7 @@ use reth_revm::{cached::CachedReads, database::StateProviderDatabase}; use reth_rpc_api::BlockSubmissionValidationApiServer; use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err}; use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use revm_primitives::{Address, B256, U256}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -63,7 +63,7 @@ where consensus: Arc>, evm_config: E, config: ValidationApiConfig, - task_spawner: Box, + task_spawner: Runtime, payload_validator: Arc< dyn PayloadValidator::Block>, >, @@ -511,12 +511,12 @@ where let this = self.clone(); let (tx, rx) = oneshot::channel(); - self.task_spawner.spawn_blocking_task(Box::pin(async move { + self.task_spawner.spawn_blocking_task(async move { let result = Self::validate_builder_submission_v3(&this, request) .await .map_err(ErrorObject::from); let _ = tx.send(result); - })); + }); rx.await.map_err(|_| internal_rpc_err("Internal blocking task error"))? } @@ -529,12 +529,12 @@ where let this = self.clone(); let (tx, rx) = oneshot::channel(); - self.task_spawner.spawn_blocking_task(Box::pin(async move { + self.task_spawner.spawn_blocking_task(async move { let result = Self::validate_builder_submission_v4(&this, request) .await .map_err(ErrorObject::from); let _ = tx.send(result); - })); + }); rx.await.map_err(|_| internal_rpc_err("Internal blocking task error"))? } @@ -547,12 +547,12 @@ where let this = self.clone(); let (tx, rx) = oneshot::channel(); - self.task_spawner.spawn_blocking_task(Box::pin(async move { + self.task_spawner.spawn_blocking_task(async move { let result = Self::validate_builder_submission_v5(&this, request) .await .map_err(ErrorObject::from); let _ = tx.send(result); - })); + }); rx.await.map_err(|_| internal_rpc_err("Internal blocking task error"))? } @@ -578,7 +578,7 @@ pub struct ValidationApiInner { /// requests. cached_state: RwLock<(B256, CachedReads)>, /// Task spawner for blocking operations - task_spawner: Box, + task_spawner: Runtime, /// Validation metrics metrics: ValidationMetrics, } diff --git a/crates/stages/stages/tests/pipeline.rs b/crates/stages/stages/tests/pipeline.rs index 22f1448ee0..602ad7768b 100644 --- a/crates/stages/stages/tests/pipeline.rs +++ b/crates/stages/stages/tests/pipeline.rs @@ -153,23 +153,26 @@ fn build_downloaders_from_file_client( provider_factory: reth_provider::ProviderFactory< reth_provider::test_utils::MockNodeTypesWithDB, >, -) -> (impl HeaderDownloader
, impl BodyDownloader) { +) -> (impl HeaderDownloader
, impl BodyDownloader, reth_tasks::Runtime) +{ let tip = file_client.tip().expect("file client should have tip"); let min_block = file_client.min_block().expect("file client should have min block"); let max_block = file_client.max_block().expect("file client should have max block"); + let runtime = reth_tasks::Runtime::test(); + let mut header_downloader = ReverseHeadersDownloaderBuilder::new(stages_config.headers) .build(file_client.clone(), consensus.clone()) - .into_task(); + .into_task_with(&runtime); header_downloader.update_local_head(genesis); header_downloader.update_sync_target(SyncTarget::Tip(tip)); let mut body_downloader = BodiesDownloaderBuilder::new(stages_config.bodies) .build(file_client, consensus, provider_factory) - .into_task(); + .into_task_with(&runtime); body_downloader.set_download_range(min_block..=max_block).expect("set download range"); - (header_downloader, body_downloader) + (header_downloader, body_downloader, runtime) } /// Builds a pipeline with `DefaultStages`. @@ -415,7 +418,7 @@ async fn run_pipeline_forward_and_unwind( let tip = file_client.tip().expect("tip"); let stages_config = StageConfig::default(); - let (header_downloader, body_downloader) = build_downloaders_from_file_client( + let (header_downloader, body_downloader, _runtime) = build_downloaders_from_file_client( file_client, pipeline_genesis, stages_config, @@ -523,16 +526,18 @@ async fn run_pipeline_forward_and_unwind( .sealed_header(unwind_target)? .expect("unwind target header should exist"); + let resync_runtime = reth_tasks::Runtime::test(); + let mut resync_header_downloader = ReverseHeadersDownloaderBuilder::new(resync_stages_config.headers) .build(resync_file_client.clone(), resync_consensus.clone()) - .into_task(); + .into_task_with(&resync_runtime); resync_header_downloader.update_local_head(unwind_head); resync_header_downloader.update_sync_target(SyncTarget::Tip(tip)); let mut resync_body_downloader = BodiesDownloaderBuilder::new(resync_stages_config.bodies) .build(resync_file_client, resync_consensus, pipeline_provider_factory.clone()) - .into_task(); + .into_task_with(&resync_runtime); resync_body_downloader .set_download_range(unwind_target + 1..=max_block) .expect("set download range"); diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index f2423e3697..31a1be9091 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -22,11 +22,9 @@ reth-metrics.workspace = true metrics.workspace = true # misc -auto_impl.workspace = true quanta.workspace = true tracing.workspace = true thiserror.workspace = true -dyn-clone.workspace = true # feature `rayon` rayon = { workspace = true, optional = true } diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 1a0e38c953..cd2f4c5525 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -12,9 +12,7 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg))] -use crate::shutdown::{signal, GracefulShutdown, Shutdown, Signal}; -use dyn_clone::DynClone; -use futures_util::future::BoxFuture; +use crate::shutdown::{signal, Shutdown, Signal}; use std::{ any::Any, fmt::{Display, Formatter}, @@ -29,7 +27,6 @@ use std::{ use tokio::{ runtime::Handle, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - task::JoinHandle, }; use tracing::debug; @@ -98,108 +95,6 @@ where .unwrap_or_else(|e| panic!("failed to spawn scoped thread {name:?}: {e}")) } -/// A type that can spawn tasks. -/// -/// The main purpose of this type is to abstract over [`Runtime`] so it's more convenient to -/// provide default impls for testing. -/// -/// -/// # Examples -/// -/// Use the [`TokioTaskExecutor`] that spawns with [`tokio::task::spawn`] -/// -/// ``` -/// # async fn t() { -/// use reth_tasks::{TaskSpawner, TokioTaskExecutor}; -/// let executor = TokioTaskExecutor::default(); -/// -/// let task = executor.spawn_task(Box::pin(async { -/// // -- snip -- -/// })); -/// task.await.unwrap(); -/// # } -/// ``` -/// -/// Use the [`Runtime`] that spawns task directly onto the tokio runtime via the [Handle]. -/// -/// ``` -/// # use reth_tasks::Runtime; -/// fn t() { -/// use reth_tasks::TaskSpawner; -/// let rt = tokio::runtime::Runtime::new().unwrap(); -/// let runtime = Runtime::with_existing_handle(rt.handle().clone()).unwrap(); -/// let task = TaskSpawner::spawn_task(&runtime, Box::pin(async { -/// // -- snip -- -/// })); -/// rt.block_on(task).unwrap(); -/// # } -/// ``` -/// -/// The [`TaskSpawner`] trait is [`DynClone`] so `Box` are also `Clone`. -#[auto_impl::auto_impl(&, Arc)] -pub trait TaskSpawner: Send + Sync + Unpin + std::fmt::Debug + DynClone { - /// Spawns the task onto the runtime. - /// See also [`Handle::spawn`]. - fn spawn_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>; - - /// This spawns a critical task onto the runtime. - fn spawn_critical_task( - &self, - name: &'static str, - fut: BoxFuture<'static, ()>, - ) -> JoinHandle<()>; - - /// Spawns a blocking task onto the runtime. - fn spawn_blocking_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>; - - /// This spawns a critical blocking task onto the runtime. - fn spawn_critical_blocking_task( - &self, - name: &'static str, - fut: BoxFuture<'static, ()>, - ) -> JoinHandle<()>; -} - -dyn_clone::clone_trait_object!(TaskSpawner); - -/// An [`TaskSpawner`] that uses [`tokio::task::spawn`] to execute tasks -#[derive(Debug, Clone, Default)] -#[non_exhaustive] -pub struct TokioTaskExecutor; - -impl TokioTaskExecutor { - /// Converts the instance to a boxed [`TaskSpawner`]. - pub fn boxed(self) -> Box { - Box::new(self) - } -} - -impl TaskSpawner for TokioTaskExecutor { - fn spawn_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - tokio::task::spawn(fut) - } - - fn spawn_critical_task( - &self, - _name: &'static str, - fut: BoxFuture<'static, ()>, - ) -> JoinHandle<()> { - tokio::task::spawn(fut) - } - - fn spawn_blocking_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - tokio::task::spawn_blocking(move || tokio::runtime::Handle::current().block_on(fut)) - } - - fn spawn_critical_blocking_task( - &self, - _name: &'static str, - fut: BoxFuture<'static, ()>, - ) -> JoinHandle<()> { - tokio::task::spawn_blocking(move || tokio::runtime::Handle::current().block_on(fut)) - } -} - /// Monitors critical tasks for panics and manages graceful shutdown. /// /// The main purpose of this type is to be able to monitor if a critical task panicked, for @@ -218,7 +113,8 @@ pub struct TaskManager { /// /// This is fired when dropped. signal: Option, - /// How many [`GracefulShutdown`] tasks are currently active. + /// How many [`GracefulShutdown`](crate::shutdown::GracefulShutdown) tasks are currently + /// active. graceful_tasks: Arc, } @@ -328,32 +224,6 @@ pub(crate) enum TaskEvent { GracefulShutdown, } -/// `TaskSpawner` with extended behaviour -#[auto_impl::auto_impl(&, Arc)] -pub trait TaskSpawnerExt: Send + Sync + Unpin + std::fmt::Debug + DynClone { - /// This spawns a critical task onto the runtime. - /// - /// If this task panics, the [`TaskManager`] is notified. - /// The [`TaskManager`] will wait until the given future has completed before shutting down. - fn spawn_critical_with_graceful_shutdown_signal( - &self, - name: &'static str, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: std::future::Future + Send + 'static; - - /// This spawns a regular task onto the runtime. - /// - /// The [`TaskManager`] will wait until the given future has completed before shutting down. - fn spawn_with_graceful_shutdown_signal( - &self, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: std::future::Future + Send + 'static; -} - #[cfg(test)] mod tests { use super::*; @@ -362,20 +232,6 @@ mod tests { time::Duration, }; - #[test] - fn test_cloneable() { - #[derive(Clone)] - struct ExecutorWrapper { - _e: Box, - } - - let executor: Box = Box::::default(); - let _e = dyn_clone::clone_box(&*executor); - - let e = ExecutorWrapper { _e }; - let _e2 = e; - } - #[test] fn test_critical() { let runtime = tokio::runtime::Runtime::new().unwrap(); diff --git a/crates/tasks/src/runtime.rs b/crates/tasks/src/runtime.rs index f5f8acdb91..e0887a5b68 100644 --- a/crates/tasks/src/runtime.rs +++ b/crates/tasks/src/runtime.rs @@ -13,10 +13,7 @@ use crate::{ shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown}, PanickedTaskError, TaskEvent, TaskManager, }; -use futures_util::{ - future::{select, BoxFuture}, - Future, FutureExt, TryFutureExt, -}; +use futures_util::{future::select, Future, FutureExt, TryFutureExt}; #[cfg(feature = "rayon")] use std::{num::NonZeroUsize, thread::available_parallelism}; use std::{ @@ -300,17 +297,6 @@ impl std::fmt::Debug for Runtime { } } -#[cfg(any(test, feature = "test-utils"))] -impl Default for Runtime { - fn default() -> Self { - let config = match Handle::try_current() { - Ok(handle) => RuntimeConfig::with_existing_handle(handle), - Err(_) => RuntimeConfig::default(), - }; - RuntimeBuilder::new(config).build().expect("failed to build default Runtime") - } -} - // ── Constructors ────────────────────────────────────────────────────── impl Runtime { @@ -458,6 +444,10 @@ impl Runtime { where F: Future + Send + 'static, { + match task_kind { + TaskKind::Default => self.0.metrics.inc_regular_tasks(), + TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(), + } let on_shutdown = self.0.on_shutdown.clone(); let finished_counter = match task_kind { @@ -533,6 +523,7 @@ impl Runtime { where F: Future + Send + 'static, { + self.0.metrics.inc_critical_tasks(); let panicked_tasks_tx = self.0.task_events_tx.clone(); let on_shutdown = self.0.on_shutdown.clone(); @@ -737,63 +728,6 @@ impl Runtime { } } -// ── TaskSpawner impl ────────────────────────────────────────────────── - -impl crate::TaskSpawner for Runtime { - fn spawn_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - self.0.metrics.inc_regular_tasks(); - Self::spawn_task(self, fut) - } - - fn spawn_critical_task( - &self, - name: &'static str, - fut: BoxFuture<'static, ()>, - ) -> JoinHandle<()> { - self.0.metrics.inc_critical_tasks(); - Self::spawn_critical_task(self, name, fut) - } - - fn spawn_blocking_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - self.0.metrics.inc_regular_blocking_tasks(); - Self::spawn_blocking_task(self, fut) - } - - fn spawn_critical_blocking_task( - &self, - name: &'static str, - fut: BoxFuture<'static, ()>, - ) -> JoinHandle<()> { - self.0.metrics.inc_critical_tasks(); - Self::spawn_critical_blocking_task(self, name, fut) - } -} - -// ── TaskSpawnerExt impl ────────────────────────────────────────────── - -impl crate::TaskSpawnerExt for Runtime { - fn spawn_critical_with_graceful_shutdown_signal( - &self, - name: &'static str, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - Self::spawn_critical_with_graceful_shutdown_signal(self, name, f) - } - - fn spawn_with_graceful_shutdown_signal( - &self, - f: impl FnOnce(GracefulShutdown) -> F, - ) -> JoinHandle<()> - where - F: Future + Send + 'static, - { - Self::spawn_with_graceful_shutdown_signal(self, f) - } -} - // ── RuntimeBuilder ──────────────────────────────────────────────────── /// Builder for constructing a [`Runtime`]. diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 2ba3b6140f..16a76aca43 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -198,7 +198,7 @@ //! ``` //! use reth_chainspec::MAINNET; //! use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; -//! use reth_tasks::TokioTaskExecutor; +//! use reth_tasks::Runtime; //! use reth_chainspec::ChainSpecProvider; //! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool, TransactionPool}; //! use reth_transaction_pool::blobstore::InMemoryBlobStore; @@ -211,8 +211,10 @@ //! Evm: ConfigureEvm> + 'static, //! { //! let blob_store = InMemoryBlobStore::default(); +//! let rt = tokio::runtime::Runtime::new().unwrap(); +//! let runtime = Runtime::with_existing_handle(rt.handle().clone()).unwrap(); //! let pool = Pool::eth_pool( -//! TransactionValidationTaskExecutor::eth(client, evm_config, blob_store.clone(), TokioTaskExecutor::default()), +//! TransactionValidationTaskExecutor::eth(client, evm_config, blob_store.clone(), runtime), //! blob_store, //! Default::default(), //! ); @@ -235,8 +237,6 @@ //! use reth_chain_state::CanonStateNotification; //! use reth_chainspec::{MAINNET, ChainSpecProvider, ChainSpec}; //! use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; -//! use reth_tasks::TokioTaskExecutor; -//! use reth_tasks::TaskSpawner; //! use reth_tasks::Runtime; //! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool}; //! use reth_transaction_pool::blobstore::InMemoryBlobStore; @@ -427,7 +427,7 @@ where /// ``` /// use reth_chainspec::MAINNET; /// use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; - /// use reth_tasks::TokioTaskExecutor; + /// use reth_tasks::Runtime; /// use reth_chainspec::ChainSpecProvider; /// use reth_transaction_pool::{ /// blobstore::InMemoryBlobStore, Pool, TransactionValidationTaskExecutor, @@ -435,7 +435,7 @@ where /// use reth_chainspec::EthereumHardforks; /// use reth_evm::ConfigureEvm; /// use alloy_consensus::Header; - /// # fn t(client: C, evm_config: Evm) + /// # fn t(client: C, evm_config: Evm, runtime: Runtime) /// # where /// # C: ChainSpecProvider + StateProviderFactory + BlockReaderIdExt
+ Clone + 'static, /// # Evm: ConfigureEvm> + 'static, @@ -446,7 +446,7 @@ where /// client, /// evm_config, /// blob_store.clone(), - /// TokioTaskExecutor::default(), + /// runtime, /// ), /// blob_store, /// Default::default(), diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 4a708e8755..70502d1694 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -27,7 +27,7 @@ use reth_primitives_traits::{ transaction::signed::SignedTransaction, NodePrimitives, SealedHeader, }; use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory}; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use serde::{Deserialize, Serialize}; use std::{ borrow::Borrow, @@ -95,11 +95,11 @@ impl LocalTransactionBackupConfig { } /// Returns a spawnable future for maintaining the state of the transaction pool. -pub fn maintain_transaction_pool_future( +pub fn maintain_transaction_pool_future( client: Client, pool: P, events: St, - task_spawner: Tasks, + task_spawner: Runtime, config: MaintainPoolConfig, ) -> BoxFuture<'static, ()> where @@ -112,7 +112,6 @@ where P: TransactionPoolExt, Block = N::Block> + 'static, St: Stream> + Send + Unpin + 'static, - Tasks: TaskSpawner + Clone + 'static, { async move { maintain_transaction_pool(client, pool, events, task_spawner, config).await; @@ -123,11 +122,11 @@ where /// Maintains the state of the transaction pool by handling new blocks and reorgs. /// /// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly -pub async fn maintain_transaction_pool( +pub async fn maintain_transaction_pool( client: Client, pool: P, mut events: St, - task_spawner: Tasks, + task_spawner: Runtime, config: MaintainPoolConfig, ) where N: NodePrimitives, @@ -139,7 +138,6 @@ pub async fn maintain_transaction_pool( P: TransactionPoolExt, Block = N::Block> + 'static, St: Stream> + Send + Unpin + 'static, - Tasks: TaskSpawner + Clone + 'static, { let metrics = MaintainPoolMetrics::default(); let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config; @@ -243,10 +241,10 @@ pub async fn maintain_transaction_pool( pool.delete_blobs(blobs); // and also do periodic cleanup let pool = pool.clone(); - task_spawner.spawn_blocking_task(Box::pin(async move { + task_spawner.spawn_blocking_task(async move { debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store"); pool.cleanup_blobs(); - })); + }); } // outcomes of the futures we are waiting on @@ -517,7 +515,7 @@ pub async fn maintain_transaction_pool( let pool = pool.clone(); let spawner = task_spawner.clone(); let client = client.clone(); - task_spawner.spawn_task(Box::pin(async move { + task_spawner.spawn_task(async move { // Start converting not eaerlier than 4 seconds into current slot to ensure // that our pool only contains valid transactions for the next block (as // it's not Osaka yet). @@ -565,7 +563,7 @@ pub async fn maintain_transaction_pool( let converter = BlobSidecarConverter::new(); let pool = pool.clone(); - spawner.spawn_task(Box::pin(async move { + spawner.spawn_task(async move { // Convert sidecar to EIP-7594 format let Some(sidecar) = converter.convert(sidecar).await else { return; @@ -580,7 +578,7 @@ pub async fn maintain_transaction_pool( return; }; let _ = pool.add_transaction(origin, tx).await; - })); + }); } if last_iteration { @@ -589,7 +587,7 @@ pub async fn maintain_transaction_pool( interval.tick().await; } - })); + }); } } } diff --git a/crates/transaction-pool/src/validate/eth.rs b/crates/transaction-pool/src/validate/eth.rs index fa63902ab9..e470562f5f 100644 --- a/crates/transaction-pool/src/validate/eth.rs +++ b/crates/transaction-pool/src/validate/eth.rs @@ -32,7 +32,7 @@ use reth_primitives_traits::{ SealedBlock, }; use reth_storage_api::{AccountInfoReader, BlockReaderIdExt, BytecodeReader, StateProviderFactory}; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use revm::context_interface::Cfg; use revm_primitives::U256; use std::{ @@ -1252,13 +1252,12 @@ impl EthTransactionValidatorBuilder { /// The validator will spawn `additional_tasks` additional tasks for validation. /// /// By default this will spawn 1 additional task. - pub fn build_with_tasks( + pub fn build_with_tasks( self, - tasks: T, + tasks: Runtime, blob_store: S, ) -> TransactionValidationTaskExecutor> where - T: TaskSpawner, S: BlobStore, { let additional_tasks = self.additional_tasks; @@ -1269,19 +1268,16 @@ impl EthTransactionValidatorBuilder { // Spawn validation tasks, they are blocking because they perform db lookups for _ in 0..additional_tasks { let task = task.clone(); - tasks.spawn_blocking_task(Box::pin(async move { + tasks.spawn_blocking_task(async move { task.run().await; - })); + }); } // we spawn them on critical tasks because validation, especially for EIP-4844 can be quite // heavy - tasks.spawn_critical_blocking_task( - "transaction-validation-service", - Box::pin(async move { - task.run().await; - }), - ); + tasks.spawn_critical_blocking_task("transaction-validation-service", async move { + task.run().await; + }); let to_validation_task = Arc::new(Mutex::new(tx)); diff --git a/crates/transaction-pool/src/validate/task.rs b/crates/transaction-pool/src/validate/task.rs index bf1798c5c0..9520f0bbf0 100644 --- a/crates/transaction-pool/src/validate/task.rs +++ b/crates/transaction-pool/src/validate/task.rs @@ -12,7 +12,7 @@ use reth_chainspec::{ChainSpecProvider, EthereumHardforks}; use reth_evm::ConfigureEvm; use reth_primitives_traits::{HeaderTy, SealedBlock}; use reth_storage_api::BlockReaderIdExt; -use reth_tasks::TaskSpawner; +use reth_tasks::Runtime; use std::{future::Future, pin::Pin, sync::Arc}; use tokio::{ sync, @@ -155,9 +155,8 @@ impl TransactionValidationTaskExecutor(client: Client, evm_config: Evm, blob_store: S, tasks: T) -> Self + pub fn eth(client: Client, evm_config: Evm, blob_store: S, tasks: Runtime) -> Self where - T: TaskSpawner, Client: ChainSpecProvider + BlockReaderIdExt
>, Evm: ConfigureEvm, @@ -174,15 +173,14 @@ impl TransactionValidationTaskExecutor( + pub fn eth_with_additional_tasks( client: Client, evm_config: Evm, blob_store: S, - tasks: T, + tasks: Runtime, num_additional_tasks: usize, ) -> Self where - T: TaskSpawner, Client: ChainSpecProvider + BlockReaderIdExt
>, Evm: ConfigureEvm, diff --git a/examples/custom-payload-builder/src/generator.rs b/examples/custom-payload-builder/src/generator.rs index 324d685b1a..6e446fc643 100644 --- a/examples/custom-payload-builder/src/generator.rs +++ b/examples/custom-payload-builder/src/generator.rs @@ -7,18 +7,18 @@ use reth_ethereum::{ node::api::{Block, PayloadBuilderAttributes}, primitives::SealedHeader, provider::{BlockReaderIdExt, BlockSource, StateProviderFactory}, - tasks::TaskSpawner, + tasks::Runtime, }; use reth_payload_builder::{PayloadBuilderError, PayloadJobGenerator}; use std::sync::Arc; /// The generator type that creates new jobs that builds empty blocks. #[derive(Debug)] -pub struct EmptyBlockPayloadJobGenerator { +pub struct EmptyBlockPayloadJobGenerator { /// The client that can interact with the chain. client: Client, /// How to spawn building tasks - executor: Tasks, + executor: Runtime, /// The configuration for the job generator. _config: BasicPayloadJobGeneratorConfig, /// The type responsible for building payloads. @@ -29,12 +29,12 @@ pub struct EmptyBlockPayloadJobGenerator { // === impl EmptyBlockPayloadJobGenerator === -impl EmptyBlockPayloadJobGenerator { +impl EmptyBlockPayloadJobGenerator { /// Creates a new [EmptyBlockPayloadJobGenerator] with the given config and custom /// [PayloadBuilder] pub fn with_builder( client: Client, - executor: Tasks, + executor: Runtime, config: BasicPayloadJobGeneratorConfig, builder: Builder, ) -> Self { @@ -42,20 +42,18 @@ impl EmptyBlockPayloadJobGenerator PayloadJobGenerator - for EmptyBlockPayloadJobGenerator +impl PayloadJobGenerator for EmptyBlockPayloadJobGenerator where Client: StateProviderFactory + BlockReaderIdExt
> + Clone + Unpin + 'static, - Tasks: TaskSpawner + Clone + Unpin + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, { - type Job = EmptyBlockPayloadJob; + type Job = EmptyBlockPayloadJob; /// This is invoked when the node receives payload attributes from the beacon node via /// `engine_forkchoiceUpdatedV1` diff --git a/examples/custom-payload-builder/src/job.rs b/examples/custom-payload-builder/src/job.rs index abb6e89668..966dd854c8 100644 --- a/examples/custom-payload-builder/src/job.rs +++ b/examples/custom-payload-builder/src/job.rs @@ -2,7 +2,7 @@ use futures_util::Future; use reth_basic_payload_builder::{HeaderForPayload, PayloadBuilder, PayloadConfig}; use reth_ethereum::{ node::api::{PayloadBuilderAttributes, PayloadKind}, - tasks::TaskSpawner, + tasks::Runtime, }; use reth_payload_builder::{KeepPayloadJobAlive, PayloadBuilderError, PayloadJob}; @@ -12,23 +12,22 @@ use std::{ }; /// A [PayloadJob] that builds empty blocks. -pub struct EmptyBlockPayloadJob +pub struct EmptyBlockPayloadJob where Builder: PayloadBuilder, { /// The configuration for how the payload will be created. pub(crate) config: PayloadConfig>, /// How to spawn building tasks - pub(crate) _executor: Tasks, + pub(crate) _executor: Runtime, /// The type responsible for building payloads. /// /// See [PayloadBuilder] pub(crate) builder: Builder, } -impl PayloadJob for EmptyBlockPayloadJob +impl PayloadJob for EmptyBlockPayloadJob where - Tasks: TaskSpawner + Clone + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, @@ -61,9 +60,8 @@ where } /// A [PayloadJob] is a future that's being polled by the `PayloadBuilderService` -impl Future for EmptyBlockPayloadJob +impl Future for EmptyBlockPayloadJob where - Tasks: TaskSpawner + Clone + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 5b391c380a..844dc51491 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -31,7 +31,7 @@ use reth_ethereum::{ builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, TransportRpcModuleConfig}, EthApiBuilder, }, - tasks::{Runtime, TokioTaskExecutor}, + tasks::Runtime, }; // Configuring the network parts, ideally also wouldn't need to think about this. use myrpc_ext::{MyRpcExt, MyRpcExtApiServer}; @@ -55,7 +55,7 @@ async fn main() -> eyre::Result<()> { spec.clone(), StaticFileProvider::read_only(db_path.join("static_files"), true)?, RocksDBProvider::builder(db_path.join("rocksdb")).build().unwrap(), - runtime, + runtime.clone(), )?; // 2. Set up the blockchain provider using only the database provider and a noop for the tree to @@ -68,7 +68,7 @@ async fn main() -> eyre::Result<()> { // Rest is just noops that do nothing .with_noop_pool() .with_noop_network() - .with_executor(Box::new(TokioTaskExecutor::default())) + .with_executor(runtime) .with_evm_config(EthEvmConfig::new(spec.clone())) .with_consensus(EthBeaconConsensus::new(spec.clone()));