Compare commits

..

1 Commits

Author SHA1 Message Date
Matthias Seitz
5c04d1abe1 fix: allow smaller header size 2025-12-16 17:08:53 +01:00
107 changed files with 1241 additions and 3377 deletions

281
Cargo.lock generated
View File

@@ -177,9 +177,9 @@ dependencies = [
[[package]]
name = "alloy-dyn-abi"
version = "1.5.1"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d48a9101f4a67c22fae57489f1ddf3057b8ab4a368d8eac3be088b6e9d9c9d9"
checksum = "3fdff496dd4e98a81f4861e66f7eaf5f2488971848bb42d9c892f871730245c8"
dependencies = [
"alloy-json-abi",
"alloy-primitives",
@@ -329,9 +329,9 @@ dependencies = [
[[package]]
name = "alloy-json-abi"
version = "1.5.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9914c147bb9b25f440eca68a31dc29f5c22298bfa7754aa802965695384122b0"
checksum = "6bfca3dbbcb7498f0f60e67aff2ad6aff57032e22eb2fd03189854be11a22c03"
dependencies = [
"alloy-primitives",
"alloy-sol-type-parser",
@@ -426,9 +426,9 @@ dependencies = [
[[package]]
name = "alloy-primitives"
version = "1.5.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7db950a29746be9e2f2c6288c8bd7a6202a81f999ce109a2933d2379970ec0fa"
checksum = "5c850e6ccbd34b8a463a1e934ffc8fc00e1efc5e5489f2ad82d7797949f3bd4e"
dependencies = [
"alloy-rlp",
"arbitrary",
@@ -436,7 +436,6 @@ dependencies = [
"cfg-if",
"const-hex",
"derive_more",
"fixed-cache",
"foldhash 0.2.0",
"getrandom 0.3.4",
"hashbrown 0.16.1",
@@ -783,9 +782,9 @@ dependencies = [
[[package]]
name = "alloy-sol-macro"
version = "1.5.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3b96d5f5890605ba9907ce1e2158e2701587631dc005bfa582cf92dd6f21147"
checksum = "b2218e3aeb3ee665d117fdf188db0d5acfdc3f7b7502c827421cb78f26a2aec0"
dependencies = [
"alloy-sol-macro-expander",
"alloy-sol-macro-input",
@@ -797,9 +796,9 @@ dependencies = [
[[package]]
name = "alloy-sol-macro-expander"
version = "1.5.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8247b7cca5cde556e93f8b3882b01dbd272f527836049083d240c57bf7b4c15"
checksum = "b231cb8cc48e66dd1c6e11a1402f3ac86c3667cbc13a6969a0ac030ba7bb8c88"
dependencies = [
"alloy-sol-macro-input",
"const-hex",
@@ -815,9 +814,9 @@ dependencies = [
[[package]]
name = "alloy-sol-macro-input"
version = "1.5.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cd54f38512ac7bae10bbc38480eefb1b9b398ca2ce25db9cc0c048c6411c4f1"
checksum = "49a522d79929c1bf0152b07567a38f7eaed3ab149e53e7528afa78ff11994668"
dependencies = [
"const-hex",
"dunce",
@@ -831,9 +830,9 @@ dependencies = [
[[package]]
name = "alloy-sol-type-parser"
version = "1.5.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "444b09815b44899564566d4d56613d14fa9a274b1043a021f00468568752f449"
checksum = "0475c459859c8d9428af6ff3736614655a57efda8cc435a3b8b4796fa5ac1dd0"
dependencies = [
"serde",
"winnow",
@@ -841,9 +840,9 @@ dependencies = [
[[package]]
name = "alloy-sol-types"
version = "1.5.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1038284171df8bfd48befc0c7b78f667a7e2be162f45f07bd1c378078ebe58"
checksum = "35287d9d821d5f26011bcd8d9101340898f761c9933cf50fca689bb7ed62fdeb"
dependencies = [
"alloy-json-abi",
"alloy-primitives",
@@ -1940,9 +1939,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.19.1"
version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510"
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
[[package]]
name = "byte-slice-cast"
@@ -2019,9 +2018,9 @@ dependencies = [
[[package]]
name = "camino"
version = "1.2.2"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e629a66d692cb9ff1a1c664e41771b3dcaf961985a9774c0eb0bd1b51cf60a48"
checksum = "276a59bf2b2c967788139340c9f0c5b12d7fd6630315c15c217e559de85d2609"
dependencies = [
"serde_core",
]
@@ -2132,7 +2131,7 @@ dependencies = [
"num-traits",
"serde",
"wasm-bindgen",
"windows-link",
"windows-link 0.2.1",
]
[[package]]
@@ -3987,15 +3986,6 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "fixed-cache"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba59b6c98ba422a13f17ee1305c995cb5742bba7997f5b4d9af61b2ff0ffb213"
dependencies = [
"equivalent",
]
[[package]]
name = "fixed-hash"
version = "0.8.0"
@@ -4239,17 +4229,16 @@ checksum = "42012b0f064e01aa58b545fe3727f90f7dd4020f4a3ea735b50344965f5a57e9"
[[package]]
name = "generator"
version = "0.8.8"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9"
checksum = "605183a538e3e2a9c1038635cc5c2d194e2ee8fd0d1b66b8349fad7dbacce5a2"
dependencies = [
"cc",
"cfg-if",
"libc",
"log",
"rustversion",
"windows-link",
"windows-result 0.4.1",
"windows 0.61.3",
]
[[package]]
@@ -5028,14 +5017,13 @@ dependencies = [
[[package]]
name = "insta"
version = "1.45.0"
version = "1.44.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b76866be74d68b1595eb8060cb9191dca9c021db2316558e52ddc5d55d41b66c"
checksum = "b5c943d4415edd8153251b6f197de5eb1640e56d84e8d9159bea190421c73698"
dependencies = [
"console",
"once_cell",
"similar",
"tempfile",
]
[[package]]
@@ -5158,9 +5146,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.16"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ee5b5339afb4c41626dde77b7a611bd4f2c202b897852b4bcf5d03eddc61010"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "jni"
@@ -5476,7 +5464,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55"
dependencies = [
"cfg-if",
"windows-link",
"windows-link 0.2.1",
]
[[package]]
@@ -5517,13 +5505,13 @@ dependencies = [
[[package]]
name = "libredox"
version = "0.1.11"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df15f6eac291ed1cf25865b1ee60399f57e7c227e7f51bdbd4c5270396a9ed50"
checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb"
dependencies = [
"bitflags 2.10.0",
"libc",
"redox_syscall 0.6.0",
"redox_syscall",
]
[[package]]
@@ -6032,9 +6020,9 @@ checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d"
[[package]]
name = "ntapi"
version = "0.4.2"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c70f219e21142367c70c0b30c6a9e3a14d55b4d12a204d897fbec83a0363f081"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
@@ -6557,9 +6545,9 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.5.18",
"redox_syscall",
"smallvec",
"windows-link",
"windows-link 0.2.1",
]
[[package]]
@@ -6756,9 +6744,9 @@ dependencies = [
[[package]]
name = "portable-atomic"
version = "1.12.0"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f59e70c4aef1e55797c2e8fd94a4f2a973fc972cfde0e0b05f683667b0cd39dd"
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
[[package]]
name = "potential_utf"
@@ -6830,7 +6818,7 @@ version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983"
dependencies = [
"toml_edit 0.23.10+spec-1.0.0",
"toml_edit 0.23.9",
]
[[package]]
@@ -7224,9 +7212,9 @@ dependencies = [
[[package]]
name = "rapidhash"
version = "4.2.0"
version = "4.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2988730ee014541157f48ce4dcc603940e00915edc3c7f9a8d78092256bb2493"
checksum = "d8e65c75143ce5d47c55b510297eeb1182f3c739b6043c537670e9fc18612dae"
dependencies = [
"rand 0.9.2",
"rustversion",
@@ -7297,15 +7285,6 @@ dependencies = [
"bitflags 2.10.0",
]
[[package]]
name = "redox_syscall"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec96166dafa0886eb81fe1c0a388bece180fbef2135f97c1e2cf8302e74b43b5"
dependencies = [
"bitflags 2.10.0",
]
[[package]]
name = "redox_users"
version = "0.4.6"
@@ -7395,9 +7374,9 @@ checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
[[package]]
name = "reqwest"
version = "0.12.26"
version = "0.12.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b4c14b2d9afca6a60277086b0cc6a6ae0b568f6f7916c943a8cdc79f8be240f"
checksum = "b6eff9328d40131d43bd911d42d79eb6a47312002a4daefc9e37f17e74a7701a"
dependencies = [
"base64 0.22.1",
"bytes",
@@ -8268,7 +8247,6 @@ dependencies = [
"metrics",
"metrics-util",
"mini-moka",
"moka",
"parking_lot",
"proptest",
"rand 0.8.5",
@@ -8302,7 +8280,6 @@ dependencies = [
"reth-stages",
"reth-stages-api",
"reth-static-file",
"reth-storage-errors",
"reth-tasks",
"reth-testing-utils",
"reth-tracing",
@@ -9150,7 +9127,6 @@ dependencies = [
"fdlimit",
"futures",
"jsonrpsee",
"parking_lot",
"rayon",
"reth-basic-payload-builder",
"reth-chain-state",
@@ -9310,7 +9286,6 @@ dependencies = [
"reth-rpc-eth-api",
"reth-rpc-eth-types",
"reth-rpc-server-types",
"reth-stages-types",
"reth-tasks",
"reth-testing-utils",
"reth-tracing",
@@ -9634,7 +9609,6 @@ dependencies = [
"alloy-consensus",
"alloy-genesis",
"alloy-network",
"alloy-op-hardforks",
"alloy-primitives",
"alloy-rpc-types-engine",
"alloy-rpc-types-eth",
@@ -9675,7 +9649,6 @@ dependencies = [
"reth-rpc-engine-api",
"reth-rpc-eth-types",
"reth-rpc-server-types",
"reth-stages-types",
"reth-tasks",
"reth-tracing",
"reth-transaction-pool",
@@ -10419,7 +10392,6 @@ dependencies = [
"reth-ethereum-engine-primitives",
"reth-ethereum-primitives",
"reth-metrics",
"reth-network-api",
"reth-node-ethereum",
"reth-payload-builder",
"reth-payload-builder-primitives",
@@ -10606,7 +10578,6 @@ dependencies = [
"reth-stages-api",
"reth-static-file",
"reth-static-file-types",
"reth-storage-api",
"reth-storage-errors",
"reth-testing-utils",
"reth-trie",
@@ -10851,7 +10822,6 @@ dependencies = [
"tracing-appender",
"tracing-journald",
"tracing-logfmt",
"tracing-samply",
"tracing-subscriber 0.3.22",
]
@@ -11615,9 +11585,9 @@ dependencies = [
[[package]]
name = "rustls-pki-types"
version = "1.13.2"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282"
checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c"
dependencies = [
"web-time",
"zeroize",
@@ -11681,9 +11651,9 @@ dependencies = [
[[package]]
name = "ryu"
version = "1.0.21"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62049b2877bf12821e8f9ad256ee38fdc31db7387ec2d3b3f403024de2034aea"
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
[[package]]
name = "ryu-js"
@@ -12381,9 +12351,9 @@ dependencies = [
[[package]]
name = "syn-solidity"
version = "1.5.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6b1d2e2059056b66fec4a6bb2b79511d5e8d76196ef49c38996f4b48db7662f"
checksum = "60ceeb7c95a4536de0c0e1649bd98d1a72a4bb9590b1f3e45a8a0bfdb7c188c0"
dependencies = [
"paste",
"proc-macro2",
@@ -12841,9 +12811,9 @@ dependencies = [
[[package]]
name = "toml_datetime"
version = "0.7.5+spec-1.1.0"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347"
checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533"
dependencies = [
"serde_core",
]
@@ -12864,21 +12834,21 @@ dependencies = [
[[package]]
name = "toml_edit"
version = "0.23.10+spec-1.0.0"
version = "0.23.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269"
checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832"
dependencies = [
"indexmap 2.12.1",
"toml_datetime 0.7.5+spec-1.1.0",
"toml_datetime 0.7.3",
"toml_parser",
"winnow",
]
[[package]]
name = "toml_parser"
version = "1.0.6+spec-1.1.0"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44"
checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e"
dependencies = [
"winnow",
]
@@ -12991,9 +12961,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
version = "0.1.44"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647"
dependencies = [
"log",
"pin-project-lite",
@@ -13026,9 +12996,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.36"
version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c"
dependencies = [
"once_cell",
"valuable",
@@ -13097,22 +13067,6 @@ dependencies = [
"web-time",
]
[[package]]
name = "tracing-samply"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c175f7ecc002b6ef04776a39f440503e4e788790ddbdbfac8259b7a069526334"
dependencies = [
"cfg-if",
"itoa",
"libc",
"mach2",
"memmap2",
"smallvec",
"tracing-core",
"tracing-subscriber 0.3.22",
]
[[package]]
name = "tracing-serde"
version = "0.2.0"
@@ -13724,16 +13678,38 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows"
version = "0.61.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893"
dependencies = [
"windows-collections 0.2.0",
"windows-core 0.61.2",
"windows-future 0.2.1",
"windows-link 0.1.3",
"windows-numerics 0.2.0",
]
[[package]]
name = "windows"
version = "0.62.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580"
dependencies = [
"windows-collections",
"windows-collections 0.3.2",
"windows-core 0.62.2",
"windows-future",
"windows-numerics",
"windows-future 0.3.2",
"windows-numerics 0.3.1",
]
[[package]]
name = "windows-collections"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8"
dependencies = [
"windows-core 0.61.2",
]
[[package]]
@@ -13757,6 +13733,19 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
dependencies = [
"windows-implement 0.60.2",
"windows-interface 0.59.3",
"windows-link 0.1.3",
"windows-result 0.3.4",
"windows-strings 0.4.2",
]
[[package]]
name = "windows-core"
version = "0.62.2"
@@ -13765,9 +13754,20 @@ checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb"
dependencies = [
"windows-implement 0.60.2",
"windows-interface 0.59.3",
"windows-link",
"windows-link 0.2.1",
"windows-result 0.4.1",
"windows-strings",
"windows-strings 0.5.1",
]
[[package]]
name = "windows-future"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e"
dependencies = [
"windows-core 0.61.2",
"windows-link 0.1.3",
"windows-threading 0.1.0",
]
[[package]]
@@ -13777,8 +13777,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb"
dependencies = [
"windows-core 0.62.2",
"windows-link",
"windows-threading",
"windows-link 0.2.1",
"windows-threading 0.2.1",
]
[[package]]
@@ -13825,12 +13825,28 @@ dependencies = [
"syn 2.0.111",
]
[[package]]
name = "windows-link"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-numerics"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1"
dependencies = [
"windows-core 0.61.2",
"windows-link 0.1.3",
]
[[package]]
name = "windows-numerics"
version = "0.3.1"
@@ -13838,7 +13854,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26"
dependencies = [
"windows-core 0.62.2",
"windows-link",
"windows-link 0.2.1",
]
[[package]]
@@ -13850,13 +13866,31 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-result"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
dependencies = [
"windows-link 0.1.3",
]
[[package]]
name = "windows-result"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5"
dependencies = [
"windows-link",
"windows-link 0.2.1",
]
[[package]]
name = "windows-strings"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
dependencies = [
"windows-link 0.1.3",
]
[[package]]
@@ -13865,7 +13899,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091"
dependencies = [
"windows-link",
"windows-link 0.2.1",
]
[[package]]
@@ -13919,7 +13953,7 @@ version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc"
dependencies = [
"windows-link",
"windows-link 0.2.1",
]
[[package]]
@@ -13974,7 +14008,7 @@ version = "0.53.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3"
dependencies = [
"windows-link",
"windows-link 0.2.1",
"windows_aarch64_gnullvm 0.53.1",
"windows_aarch64_msvc 0.53.1",
"windows_i686_gnu 0.53.1",
@@ -13985,13 +14019,22 @@ dependencies = [
"windows_x86_64_msvc 0.53.1",
]
[[package]]
name = "windows-threading"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6"
dependencies = [
"windows-link 0.1.3",
]
[[package]]
name = "windows-threading"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37"
dependencies = [
"windows-link",
"windows-link 0.2.1",
]
[[package]]

View File

@@ -587,7 +587,6 @@ url = { version = "2.3", default-features = false }
zstd = "0.13"
byteorder = "1"
mini-moka = "0.10"
moka = "0.12"
tar-no-std = { version = "0.3.2", default-features = false }
miniz_oxide = { version = "0.8.4", default-features = false }
chrono = "0.4.41"
@@ -730,7 +729,6 @@ socket2 = { version = "0.5", default-features = false }
sysinfo = { version = "0.33", default-features = false }
tracing-journald = "0.3"
tracing-logfmt = "0.3.3"
tracing-samply = "0.1"
tracing-subscriber = { version = "0.3", default-features = false }
triehash = "0.8"
typenum = "1.15.0"

View File

@@ -18,7 +18,7 @@ FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
# Build profile, release by default
ARG BUILD_PROFILE=maxperf
ARG BUILD_PROFILE=release
ENV BUILD_PROFILE=$BUILD_PROFILE
# Extra Cargo flags

View File

@@ -14,7 +14,7 @@ RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
ARG BUILD_PROFILE=maxperf
ARG BUILD_PROFILE=release
ENV BUILD_PROFILE=$BUILD_PROFILE
ARG RUSTFLAGS=""

View File

@@ -329,7 +329,6 @@ pub(crate) async fn run_comparison(args: Args, _ctx: CliContext) -> Result<()> {
output_dir.clone(),
git_manager.clone(),
args.features.clone(),
args.profile,
)?;
// Initialize node manager
let mut node_manager = NodeManager::new(&args);

View File

@@ -14,7 +14,6 @@ pub(crate) struct CompilationManager {
output_dir: PathBuf,
git_manager: GitManager,
features: String,
enable_profiling: bool,
}
impl CompilationManager {
@@ -24,9 +23,8 @@ impl CompilationManager {
output_dir: PathBuf,
git_manager: GitManager,
features: String,
enable_profiling: bool,
) -> Result<Self> {
Ok(Self { repo_root, output_dir, git_manager, features, enable_profiling })
Ok(Self { repo_root, output_dir, git_manager, features })
}
/// Detect if the RPC endpoint is an Optimism chain
@@ -102,18 +100,9 @@ impl CompilationManager {
let mut cmd = Command::new("cargo");
cmd.arg("build").arg("--profile").arg("profiling");
// Append samply feature when profiling to enable tracing span markers.
// NOTE: The `samply` feature must exist in the branch being compiled. If comparing
// against an older branch that predates the samply integration, compilation will fail
// or markers won't appear. In that case, omit --profile or ensure both branches
// include the samply feature support.
let features = if self.enable_profiling && !self.features.contains("samply") {
format!("{},samply", self.features)
} else {
self.features.clone()
};
cmd.arg("--features").arg(&features);
info!("Using features: {}", features);
// Add features
cmd.arg("--features").arg(&self.features);
info!("Using features: {}", self.features);
// Add bin-specific arguments for optimism
if is_optimism {

View File

@@ -81,16 +81,12 @@ backon.workspace = true
tempfile.workspace = true
[features]
default = ["jemalloc", "otlp", "reth-revm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"]
default = ["jemalloc", "otlp", "reth-revm/portable", "js-tracer", "keccak-cache-global"]
otlp = [
"reth-ethereum-cli/otlp",
"reth-node-core/otlp",
]
samply = [
"reth-ethereum-cli/samply",
"reth-node-core/samply",
]
js-tracer = [
"reth-node-builder/js-tracer",
"reth-node-ethereum/js-tracer",
@@ -107,7 +103,6 @@ asm-keccak = [
"reth-node-ethereum/asm-keccak",
]
keccak-cache-global = [
"reth-node-core/keccak-cache-global",
"reth-node-ethereum/keccak-cache-global",
]
jemalloc = [

View File

@@ -80,8 +80,6 @@ pub fn make_genesis_header(genesis: &Genesis, hardforks: &ChainHardforks) -> Hea
.then_some(EMPTY_REQUESTS_HASH);
Header {
number: genesis.number.unwrap_or_default(),
parent_hash: genesis.parent_hash.unwrap_or_default(),
gas_limit: genesis.gas_limit,
difficulty: genesis.difficulty,
nonce: genesis.nonce.into(),

View File

@@ -23,10 +23,7 @@ use reth_node_core::{
dirs::{ChainPath, DataDirPath},
};
use reth_provider::{
providers::{
BlockchainProvider, NodeTypesForProvider, RocksDBProvider, StaticFileProvider,
StaticFileProviderBuilder,
},
providers::{BlockchainProvider, NodeTypesForProvider, RocksDBProvider, StaticFileProvider},
ProviderFactory, StaticFileProviderFactory,
};
use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget};
@@ -103,23 +100,15 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
}
info!(target: "reth::cli", ?db_path, ?sf_path, "Opening storage");
let genesis_block_number = self.chain.genesis().number.unwrap_or_default();
let (db, sfp) = match access {
AccessRights::RW => (
Arc::new(init_db(db_path, self.db.database_args())?),
StaticFileProviderBuilder::read_write(sf_path)?
.with_genesis_block_number(genesis_block_number)
.build()?,
StaticFileProvider::read_write(sf_path)?,
),
AccessRights::RO | AccessRights::RoInconsistent => (
Arc::new(open_db_read_only(&db_path, self.db.database_args())?),
StaticFileProvider::read_only(sf_path, false)?,
),
AccessRights::RO | AccessRights::RoInconsistent => {
(Arc::new(open_db_read_only(&db_path, self.db.database_args())?), {
let provider = StaticFileProviderBuilder::read_only(sf_path)?
.with_genesis_block_number(genesis_block_number)
.build()?;
provider.watch_directory();
provider
})
}
};
// TransactionDB only support read-write mode
let rocksdb_provider = RocksDBProvider::builder(data_dir.rocksdb())

View File

@@ -1,9 +1,8 @@
//! Command that initializes the node from a genesis file.
use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
use alloy_consensus::BlockHeader;
use clap::Parser;
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_provider::BlockHashReader;
use std::sync::Arc;
@@ -23,9 +22,8 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitComman
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
let genesis_block_number = provider_factory.chain_spec().genesis_header().number();
let hash = provider_factory
.block_hash(genesis_block_number)?
.block_hash(0)?
.ok_or_else(|| eyre::eyre!("Genesis hash not found."))?;
info!(target: "reth::cli", hash = ?hash, "Genesis block written");

View File

@@ -72,7 +72,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
.split();
if result.len() != 1 {
eyre::bail!(
"Invalid number of bodies received. Expected: 1. Received: {}",
"Invalid number of headers received. Expected: 1. Received: {}",
result.len()
)
}

View File

@@ -11,7 +11,6 @@ use reth_node_builder::{
PayloadTypes,
};
use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
use reth_primitives_traits::AlloyBlockHeader;
use reth_provider::providers::BlockchainProvider;
use reth_rpc_server_types::RpcModuleSelection;
use reth_tasks::TaskManager;
@@ -158,8 +157,8 @@ where
.await?;
let node = NodeTestContext::new(node, self.attributes_generator).await?;
let genesis_number = self.chain_spec.genesis_header().number();
let genesis = node.block_hash(genesis_number);
let genesis = node.block_hash(0);
node.update_forkchoice(genesis, genesis).await?;
eyre::Ok(node)

View File

@@ -86,7 +86,7 @@ where
evm_config: C,
) -> Self
where
V: EngineValidator<N::Payload, Provider = BlockchainProvider<N>>,
V: EngineValidator<N::Payload>,
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let engine_kind =

View File

@@ -29,7 +29,6 @@ reth-provider.workspace = true
reth-prune.workspace = true
reth-revm.workspace = true
reth-stages-api.workspace = true
reth-storage-errors.workspace = true
reth-tasks.workspace = true
reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
@@ -53,7 +52,6 @@ futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
mini-moka = { workspace = true, features = ["sync"] }
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true
# metrics

View File

@@ -128,12 +128,12 @@ we send them along with the state updates to the [Sparse Trie Task](#sparse-trie
### Finishing the calculation
Once all transactions are executed, the [Engine](#engine) sends a `StateRootMessage::FinishedStateUpdates` message
Once all transactions are executed, the [Engine](#engine) sends a `StateRootMessage::FinishStateUpdates` message
to the State Root Task, marking the end of receiving state updates.
Every time we receive a new proof from the [MultiProof Manager](#multiproof-manager), we also check
the following conditions:
1. Are all updates received? (`StateRootMessage::FinishedStateUpdates` was sent)
1. Are all updates received? (`StateRootMessage::FinishStateUpdates` was sent)
2. Is `ProofSequencer` empty? (no proofs are pending for sequencing)
3. Are all proofs that were sent to the [`MultiProofManager::spawn_or_queue`](#multiproof-manager) finished
calculating and were sent to the [Sparse Trie Task](#sparse-trie-task)?

View File

@@ -219,19 +219,10 @@ pub enum HandlerEvent<T> {
}
/// Internal events issued by the [`ChainOrchestrator`].
#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum FromOrchestrator {
/// Invoked when backfill sync finished
BackfillSyncFinished(ControlFlow),
/// Invoked when backfill sync started
BackfillSyncStarted,
/// Gracefully terminate the engine service.
///
/// When this variant is received, the engine will persist all remaining in-memory blocks
/// to disk before shutting down. Once persistence is complete, a signal is sent through
/// the oneshot channel to notify the caller.
Terminate {
/// Channel to signal termination completion.
tx: tokio::sync::oneshot::Sender<()>,
},
}

View File

@@ -1,4 +1,5 @@
use crate::metrics::PersistenceMetrics;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
@@ -141,23 +142,27 @@ where
&self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saving range of blocks");
let first_block_hash = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block_hash = blocks.last().map(|b| b.recovered_block.num_hash());
debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saving range of blocks");
let start_time = Instant::now();
let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
hash: block.recovered_block().hash(),
number: block.recovered_block().header().number(),
});
if last_block.is_some() {
if last_block_hash_num.is_some() {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks)?;
provider_rw.commit()?;
}
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saved range of blocks");
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block)
Ok(last_block_hash_num)
}
}

View File

@@ -14,7 +14,7 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
/// * [`BlockBuffer::remove_old_blocks`] to remove old blocks that precede the finalized number.
///
/// Note: Buffer is limited by number of blocks that it can contain and eviction of the block
/// is done in FIFO order (oldest inserted block is evicted first).
/// is done by last recently used block.
#[derive(Debug)]
pub struct BlockBuffer<B: Block> {
/// All blocks in the buffer stored by their block hash.

View File

@@ -31,9 +31,6 @@ pub(crate) struct CachedStateProvider<S> {
/// Metrics for the cached state provider
metrics: CachedStateMetrics,
/// If prewarm enabled we populate every cache miss
prewarm: bool,
}
impl<S> CachedStateProvider<S>
@@ -42,32 +39,12 @@ where
{
/// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and
/// [`CachedStateMetrics`].
pub(crate) const fn new(
pub(crate) const fn new_with_caches(
state_provider: S,
caches: ExecutionCache,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics, prewarm: false }
}
}
impl<S> CachedStateProvider<S> {
/// Enables pre-warm mode so that every cache miss is populated.
///
/// This is only relevant for pre-warm transaction execution with the intention to pre-populate
/// the cache with data for regular block execution. During regular block execution the
/// cache doesn't need to be populated because the actual EVM database
/// [`State`](revm::database::State) also caches internally during block execution and the cache
/// is then updated after the block with the entire [`BundleState`] output of that block which
/// contains all accessed accounts,code,storage. See also [`ExecutionCache::insert_state`].
pub(crate) const fn prewarm(mut self) -> Self {
self.prewarm = true;
self
}
/// Returns whether this provider should pre-warm cache misses.
const fn is_prewarm(&self) -> bool {
self.prewarm
Self { state_provider, caches, metrics }
}
}
@@ -146,10 +123,7 @@ impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
self.metrics.account_cache_misses.increment(1);
let res = self.state_provider.basic_account(address)?;
if self.is_prewarm() {
self.caches.account_cache.insert(*address, res);
}
self.caches.account_cache.insert(*address, res);
Ok(res)
}
}
@@ -174,19 +148,15 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
match self.caches.get_storage(&account, &storage_key) {
(SlotStatus::NotCached, maybe_cache) => {
let final_res = self.state_provider.storage(account, storage_key)?;
if self.is_prewarm() {
let account_cache = maybe_cache.unwrap_or_default();
account_cache.insert_storage(storage_key, final_res);
// we always need to insert the value to update the weights.
// Note: there exists a race when the storage cache did not exist yet and two
// consumers looking up the a storage value for this account for the first time,
// however we can assume that this will only happen for the very first
// (mostlikely the same) value, and don't expect that this
// will accidentally replace an account storage cache with
// additional values.
self.caches.insert_storage_cache(account, account_cache);
}
let account_cache = maybe_cache.unwrap_or_default();
account_cache.insert_storage(storage_key, final_res);
// we always need to insert the value to update the weights.
// Note: there exists a race when the storage cache did not exist yet and two
// consumers looking up the a storage value for this account for the first time,
// however we can assume that this will only happen for the very first (mostlikely
// the same) value, and don't expect that this will accidentally
// replace an account storage cache with additional values.
self.caches.insert_storage_cache(account, account_cache);
self.metrics.storage_cache_misses.increment(1);
Ok(final_res)
@@ -213,11 +183,7 @@ impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
self.metrics.code_cache_misses.increment(1);
let final_res = self.state_provider.bytecode_by_hash(code_hash)?;
if self.is_prewarm() {
self.caches.code_cache.insert(*code_hash, final_res.clone());
}
self.caches.code_cache.insert(*code_hash, final_res.clone());
Ok(final_res)
}
}
@@ -819,7 +785,7 @@ mod tests {
let caches = ExecutionCacheBuilder::default().build_caches(1000);
let state_provider =
CachedStateProvider::new(provider, caches, CachedStateMetrics::zeroed());
CachedStateProvider::new_with_caches(provider, caches, CachedStateMetrics::zeroed());
// check that the storage is empty
let res = state_provider.storage(address, storage_key);
@@ -842,7 +808,7 @@ mod tests {
let caches = ExecutionCacheBuilder::default().build_caches(1000);
let state_provider =
CachedStateProvider::new(provider, caches, CachedStateMetrics::zeroed());
CachedStateProvider::new_with_caches(provider, caches, CachedStateMetrics::zeroed());
// check that the storage returns the expected value
let res = state_provider.storage(address, storage_key);

View File

@@ -83,7 +83,7 @@ where
{
/// Creates a new [`InstrumentedStateProvider`] from a state provider with the provided label
/// for metrics.
pub fn new(state_provider: S, source: &'static str) -> Self {
pub fn from_state_provider(state_provider: S, source: &'static str) -> Self {
Self {
state_provider,
metrics: StateProviderMetrics::new_with_labels(&[("source", source)]),

View File

@@ -39,7 +39,6 @@ use revm::state::EvmState;
use state::TreeState;
use std::{
fmt::Debug,
ops,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
Arc,
@@ -116,10 +115,10 @@ impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
where
P: BlockReader + StateProviderFactory + StateReader + Clone,
{
/// Consumes the builder and creates a new state provider.
pub fn build(self) -> ProviderResult<StateProviderBox> {
/// Creates a new state provider from this builder.
pub fn build(&self) -> ProviderResult<StateProviderBox> {
let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
if let Some(overlay) = self.overlay {
if let Some(overlay) = self.overlay.clone() {
provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
}
Ok(provider)
@@ -316,14 +315,12 @@ where
+ HashedPostStateProvider
+ TrieReader
+ Clone
+ Send
+ Sync
+ 'static,
<P as DatabaseProviderFactory>::Provider:
BlockReader<Block = N::Block, Header = N::BlockHeader>,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T, Provider = P>,
V: EngineValidator<T>,
{
/// Creates a new [`EngineApiTreeHandler`].
#[expect(clippy::too_many_arguments)]
@@ -429,13 +426,9 @@ where
match self.try_recv_engine_message() {
Ok(Some(msg)) => {
debug!(target: "engine::tree", %msg, "received new engine message");
match self.on_engine_message(msg) {
Ok(ops::ControlFlow::Break(())) => return,
Ok(ops::ControlFlow::Continue(())) => {}
Err(fatal) => {
error!(target: "engine::tree", %fatal, "insert block fatal error");
return
}
if let Err(fatal) = self.on_engine_message(msg) {
error!(target: "engine::tree", %fatal, "insert block fatal error");
return
}
}
Ok(None) => {
@@ -933,6 +926,48 @@ where
Ok(())
}
/// Determines if the given block is part of a fork by checking that these
/// conditions are true:
/// * walking back from the target hash to verify that the target hash is not part of an
/// extension of the canonical chain.
/// * walking back from the current head to verify that the target hash is not already part of
/// the canonical chain.
///
/// The header is required as an arg, because we might be checking that the header is a fork
/// block before it's in the tree state and before it's in the database.
fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
let target_hash = target.block.hash;
// verify that the given hash is not part of an extension of the canon chain.
let canonical_head = self.state.tree_state.canonical_head();
let mut current_hash;
let mut current_block = target;
loop {
if current_block.block.hash == canonical_head.hash {
return Ok(false)
}
// We already passed the canonical head
if current_block.block.number <= canonical_head.number {
break
}
current_hash = current_block.parent;
let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
current_block = next_block.block_with_parent();
}
// verify that the given hash is not already part of canonical chain stored in memory
if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
return Ok(false)
}
// verify that the given hash is not already part of persisted canonical chain
if self.provider.block_number(target_hash)?.is_some() {
return Ok(false)
}
Ok(true)
}
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
@@ -1267,7 +1302,22 @@ where
// Check if persistence has complete
match rx.try_recv() {
Ok(last_persisted_hash_num) => {
self.on_persistence_complete(last_persisted_hash_num, start_time)?;
self.metrics.engine.persistence_duration.record(start_time.elapsed());
let Some(BlockNumHash {
hash: last_persisted_block_hash,
number: last_persisted_block_number,
}) = last_persisted_hash_num
else {
// if this happened, then we persisted no blocks because we sent an
// empty vec of blocks
warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
return Ok(())
};
debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
self.persistence_state
.finish(last_persisted_block_hash, last_persisted_block_number);
self.on_new_persisted_block()?;
}
Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
Err(TryRecvError::Empty) => {
@@ -1280,8 +1330,7 @@ where
if let Some(new_tip_num) = self.find_disk_reorg()? {
self.remove_blocks(new_tip_num)
} else if self.should_persist() {
let blocks_to_persist =
self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
let blocks_to_persist = self.get_canonical_blocks_to_persist()?;
self.persist_blocks(blocks_to_persist);
}
}
@@ -1289,72 +1338,11 @@ where
Ok(())
}
/// Finishes termination by persisting all remaining blocks and signaling completion.
///
/// This blocks until all persistence is complete. Always signals completion,
/// even if an error occurs.
fn finish_termination(
&mut self,
pending_termination: oneshot::Sender<()>,
) -> Result<(), AdvancePersistenceError> {
trace!(target: "engine::tree", "finishing termination, persisting remaining blocks");
let result = self.persist_until_complete();
let _ = pending_termination.send(());
result
}
/// Persists all remaining blocks until none are left.
fn persist_until_complete(&mut self) -> Result<(), AdvancePersistenceError> {
loop {
// Wait for any in-progress persistence to complete (blocking)
if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
let result = rx.blocking_recv().map_err(|_| TryRecvError::Closed)?;
self.on_persistence_complete(result, start_time)?;
}
let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
if blocks_to_persist.is_empty() {
debug!(target: "engine::tree", "persistence complete, signaling termination");
return Ok(())
}
debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
self.persist_blocks(blocks_to_persist);
}
}
/// Handles a completed persistence task.
fn on_persistence_complete(
&mut self,
last_persisted_hash_num: Option<BlockNumHash>,
start_time: Instant,
) -> Result<(), AdvancePersistenceError> {
self.metrics.engine.persistence_duration.record(start_time.elapsed());
let Some(BlockNumHash {
hash: last_persisted_block_hash,
number: last_persisted_block_number,
}) = last_persisted_hash_num
else {
// if this happened, then we persisted no blocks because we sent an empty vec of blocks
warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
return Ok(())
};
debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
self.on_new_persisted_block()?;
Ok(())
}
/// Handles a message from the engine.
///
/// Returns `ControlFlow::Break(())` if the engine should terminate.
fn on_engine_message(
&mut self,
msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
) -> Result<(), InsertBlockFatalError> {
match msg {
FromEngine::Event(event) => match event {
FromOrchestrator::BackfillSyncStarted => {
@@ -1364,13 +1352,6 @@ where
FromOrchestrator::BackfillSyncFinished(ctrl) => {
self.on_backfill_sync_finished(ctrl)?;
}
FromOrchestrator::Terminate { tx } => {
debug!(target: "engine::tree", "received terminate request");
if let Err(err) = self.finish_termination(tx) {
error!(target: "engine::tree", %err, "Termination failed");
}
return Ok(ops::ControlFlow::Break(()))
}
},
FromEngine::Request(request) => {
match request {
@@ -1378,7 +1359,7 @@ where
let block_num_hash = block.recovered_block().num_hash();
if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
// outdated block that can be skipped
return Ok(ops::ControlFlow::Continue(()))
return Ok(())
}
debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
@@ -1486,7 +1467,7 @@ where
}
}
}
Ok(ops::ControlFlow::Continue(()))
Ok(())
}
/// Invoked if the backfill sync has finished to target.
@@ -1720,10 +1701,10 @@ where
}
/// Returns a batch of consecutive canonical blocks to persist in the range
/// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
/// `(last_persisted_number .. canonical_head - threshold]`. The expected
/// order is oldest -> newest.
fn get_canonical_blocks_to_persist(
&self,
target: PersistTarget,
) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
// We will calculate the state root using the database, so we need to be sure there are no
// changes
@@ -1734,12 +1715,9 @@ where
let last_persisted_number = self.persistence_state.last_persisted_block.number;
let canonical_head_number = self.state.tree_state.canonical_block_number();
let target_number = match target {
PersistTarget::Head => canonical_head_number,
PersistTarget::Threshold => {
canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
}
};
// Persist only up to block buffer target
let target_number =
canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
debug!(
target: "engine::tree",
@@ -2477,7 +2455,7 @@ where
&mut self,
block_id: BlockWithParent,
input: Input,
execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N, P>) -> Result<ExecutedBlock<N>, Err>,
execute: impl FnOnce(&mut V, Input, TreeCtx<'_, N>) -> Result<ExecutedBlock<N>, Err>,
convert_to_block: impl FnOnce(&mut Self, Input) -> Result<SealedBlock<N::Block>, Err>,
) -> Result<InsertPayloadOk, Err>
where
@@ -2501,7 +2479,8 @@ where
_ => {}
};
let provider_builder = match self.state_provider_builder(block_id.parent) {
// Ensure that the parent state is available.
match self.state_provider_builder(block_id.parent) {
Err(err) => {
let block = convert_to_block(self, input)?;
return Err(InsertBlockError::new(block, err.into()).into());
@@ -2525,31 +2504,19 @@ where
missing_ancestor,
}))
}
Ok(Some(builder)) => builder,
};
Ok(Some(_)) => {}
}
// Build the state provider. The builder is cloned because it's also needed for parallel
// tasks.
let state_provider = match provider_builder.clone().build() {
Ok(provider) => provider,
// determine whether we are on a fork chain
let is_fork = match self.is_fork(block_id) {
Err(err) => {
let block = convert_to_block(self, input)?;
return Err(InsertBlockError::new(block, err.into()).into());
}
Ok(is_fork) => is_fork,
};
// determine whether we are on a fork chain by comparing the block number with the
// canonical head. This is a simple check that is sufficient for the event emission below.
// A block is considered a fork if its number is less than or equal to the canonical head,
// as this indicates there's already a canonical block at that height.
let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
let ctx = TreeCtx::with_precomputed(
&mut self.state,
&self.canonical_in_memory_state,
state_provider,
provider_builder,
);
let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);
let start = Instant::now();
@@ -2893,12 +2860,3 @@ pub enum InsertPayloadOk {
/// The payload was valid and inserted into the tree.
Inserted(BlockStatus),
}
/// Target for block persistence.
#[derive(Debug, Clone, Copy)]
enum PersistTarget {
/// Persist up to `canonical_head - memory_block_buffer_target`.
Threshold,
/// Persist all blocks up to and including the canonical head.
Head,
}

View File

@@ -23,12 +23,11 @@ use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::prelude::*;
use reth_engine_primitives::ExecutableTxIterator;
use reth_evm::{
execute::{ExecutableTxFor, WithTxEnv},
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
TxEnvFor,
ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
use reth_revm::{db::BundleState, state::EvmState};
@@ -93,13 +92,6 @@ pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
/// 144MB.
pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxTuple>::Tx>,
<I as ExecutableTxTuple>::Error,
<N as NodePrimitives>::Receipt,
>;
/// Entrypoint for executing the payload.
#[derive(Debug)]
pub struct PayloadProcessor<Evm>
@@ -208,6 +200,7 @@ where
///
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
#[allow(clippy::type_complexity)]
#[instrument(
level = "debug",
target = "engine::tree::payload_processor",
@@ -222,7 +215,7 @@ where
multiproof_provider_factory: F,
config: &TreeConfig,
bal: Option<Arc<BlockAccessList>>,
) -> IteratorPayloadHandle<Evm, I, N>
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
@@ -327,7 +320,7 @@ where
env: ExecutionEnv<Evm>,
transactions: I,
provider_builder: StateProviderBuilder<N, P>,
) -> IteratorPayloadHandle<Evm, I, N>
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
@@ -407,7 +400,7 @@ where
transaction_count_hint: usize,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
) -> CacheTaskHandle<N::Receipt>
) -> CacheTaskHandle
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
@@ -588,15 +581,12 @@ where
}
/// Handle to all the spawned tasks.
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
/// caching task without cloning the expensive `BundleState`.
#[derive(Debug)]
pub struct PayloadHandle<Tx, Err, R> {
pub struct PayloadHandle<Tx, Err> {
/// Channel for evm state updates
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
// must include the receiver of the state root wired to the sparse trie
prewarm_handle: CacheTaskHandle<R>,
prewarm_handle: CacheTaskHandle,
/// Stream of block transactions
transactions: mpsc::Receiver<Result<Tx, Err>>,
/// Receiver for the state root
@@ -605,7 +595,7 @@ pub struct PayloadHandle<Tx, Err, R> {
_span: Span,
}
impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
impl<Tx, Err> PayloadHandle<Tx, Err> {
/// Awaits the state root
///
/// # Panics
@@ -658,14 +648,9 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
/// Terminates the entire caching task.
///
/// If the [`ExecutionOutcome`] is provided it will update the shared cache using its
/// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
/// path without cloning the expensive `BundleState`.
pub(super) fn terminate_caching(
&mut self,
execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
) {
self.prewarm_handle.terminate_caching(execution_outcome)
/// If the [`BundleState`] is provided it will update the shared cache.
pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
self.prewarm_handle.terminate_caching(block_output)
}
/// Returns iterator yielding transactions from the stream.
@@ -677,20 +662,17 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
}
/// Access to the spawned [`PrewarmCacheTask`].
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
/// prewarm task without cloning the expensive `BundleState`.
#[derive(Debug)]
pub(crate) struct CacheTaskHandle<R> {
pub(crate) struct CacheTaskHandle {
/// The shared cache the task operates with.
cache: Option<StateExecutionCache>,
/// Metrics for the caches
cache_metrics: Option<CachedStateMetrics>,
/// Channel to the spawned prewarm task if any
to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent>>,
}
impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
impl CacheTaskHandle {
/// Terminates the pre-warming transaction processing.
///
/// Note: This does not terminate the task yet.
@@ -702,25 +684,20 @@ impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
/// Terminates the entire pre-warming task.
///
/// If the [`ExecutionOutcome`] is provided it will update the shared cache using its
/// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
pub(super) fn terminate_caching(
&mut self,
execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
) {
/// If the [`BundleState`] is provided it will update the shared cache.
pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
if let Some(tx) = self.to_prewarm_task.take() {
let event = PrewarmTaskEvent::Terminate { execution_outcome };
// Only clone when we have an active task and a state to send
let event = PrewarmTaskEvent::Terminate { block_output: block_output.cloned() };
let _ = tx.send(event);
}
}
}
impl<R> Drop for CacheTaskHandle<R> {
impl Drop for CacheTaskHandle {
fn drop(&mut self) {
// Ensure we always terminate on drop - send None without needing Send + Sync bounds
if let Some(tx) = self.to_prewarm_task.take() {
let _ = tx.send(PrewarmTaskEvent::Terminate { execution_outcome: None });
}
// Ensure we always terminate on drop
self.terminate_caching(None);
}
}
@@ -773,8 +750,6 @@ impl ExecutionCache {
cache
.as_ref()
// Check `is_available()` to ensure no other tasks (e.g., prewarming) currently hold
// a reference to this cache. We can only reuse it when we have exclusive access.
.filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
.cloned()
}

View File

@@ -3,7 +3,11 @@
use crate::tree::payload_processor::bal::bal_to_hashed_post_state;
use alloy_eip7928::BlockAccessList;
use alloy_evm::block::StateChangeSource;
use alloy_primitives::{keccak256, map::HashSet, B256};
use alloy_primitives::{
keccak256,
map::{B256Set, HashSet},
B256,
};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use dashmap::DashMap;
use derive_more::derive::Deref;
@@ -19,6 +23,7 @@ use reth_trie_parallel::{
proof::ParallelProof,
proof_task::{
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
StorageProofInput,
},
};
use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant};
@@ -165,6 +170,11 @@ impl ProofSequencer {
while let Some(pending) = self.pending_proofs.remove(&current_sequence) {
consecutive_proofs.push(pending);
current_sequence += 1;
// if we don't have the next number, stop collecting
if !self.pending_proofs.contains_key(&current_sequence) {
break;
}
}
self.next_to_deliver += consecutive_proofs.len() as u64;
@@ -231,6 +241,74 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat
hashed_state
}
/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`].
#[derive(Debug)]
enum PendingMultiproofTask {
/// A storage multiproof task input.
Storage(StorageMultiproofInput),
/// A regular multiproof task input.
Regular(MultiproofInput),
}
impl PendingMultiproofTask {
/// Returns the proof sequence number of the task.
const fn proof_sequence_number(&self) -> u64 {
match self {
Self::Storage(input) => input.proof_sequence_number,
Self::Regular(input) => input.proof_sequence_number,
}
}
/// Returns whether or not the proof targets are empty.
fn proof_targets_is_empty(&self) -> bool {
match self {
Self::Storage(input) => input.proof_targets.is_empty(),
Self::Regular(input) => input.proof_targets.is_empty(),
}
}
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
fn send_empty_proof(self) {
match self {
Self::Storage(input) => input.send_empty_proof(),
Self::Regular(input) => input.send_empty_proof(),
}
}
}
impl From<StorageMultiproofInput> for PendingMultiproofTask {
fn from(input: StorageMultiproofInput) -> Self {
Self::Storage(input)
}
}
impl From<MultiproofInput> for PendingMultiproofTask {
fn from(input: MultiproofInput) -> Self {
Self::Regular(input)
}
}
/// Input parameters for dispatching a dedicated storage multiproof calculation.
#[derive(Debug)]
struct StorageMultiproofInput {
hashed_state_update: HashedPostState,
hashed_address: B256,
proof_targets: B256Set,
proof_sequence_number: u64,
state_root_message_sender: CrossbeamSender<MultiProofMessage>,
multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
}
impl StorageMultiproofInput {
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
fn send_empty_proof(self) {
let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
sequence_number: self.proof_sequence_number,
state: self.hashed_state_update,
});
}
}
/// Input parameters for dispatching a multiproof calculation.
#[derive(Debug)]
struct MultiproofInput {
@@ -305,18 +383,91 @@ impl MultiproofManager {
}
/// Dispatches a new multiproof calculation to worker pools.
fn dispatch(&self, input: MultiproofInput) {
fn dispatch(&self, input: PendingMultiproofTask) {
// If there are no proof targets, we can just send an empty multiproof back immediately
if input.proof_targets.is_empty() {
if input.proof_targets_is_empty() {
trace!(
sequence_number = input.proof_sequence_number,
sequence_number = input.proof_sequence_number(),
"No proof targets, sending empty multiproof back immediately"
);
input.send_empty_proof();
return;
}
self.dispatch_multiproof(input);
match input {
PendingMultiproofTask::Storage(storage_input) => {
self.dispatch_storage_proof(storage_input);
}
PendingMultiproofTask::Regular(multiproof_input) => {
self.dispatch_multiproof(multiproof_input);
}
}
}
/// Dispatches a single storage proof calculation to worker pool.
fn dispatch_storage_proof(&self, storage_multiproof_input: StorageMultiproofInput) {
let StorageMultiproofInput {
hashed_state_update,
hashed_address,
proof_targets,
proof_sequence_number,
multi_added_removed_keys,
state_root_message_sender: _,
} = storage_multiproof_input;
let storage_targets = proof_targets.len();
trace!(
target: "engine::tree::payload_processor::multiproof",
proof_sequence_number,
?proof_targets,
storage_targets,
"Dispatching storage proof to workers"
);
let start = Instant::now();
// Create prefix set from targets
let prefix_set = reth_trie::prefix_set::PrefixSetMut::from(
proof_targets.iter().map(reth_trie::Nibbles::unpack),
);
let prefix_set = prefix_set.freeze();
// Build computation input (data only)
let input = StorageProofInput::new(
hashed_address,
prefix_set,
proof_targets,
true, // with_branch_node_masks
Some(multi_added_removed_keys),
);
// Dispatch to storage worker
if let Err(e) = self.proof_worker_handle.dispatch_storage_proof(
input,
ProofResultContext::new(
self.proof_result_tx.clone(),
proof_sequence_number,
hashed_state_update,
start,
),
) {
error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch storage proof");
return;
}
self.metrics
.active_storage_workers_histogram
.record(self.proof_worker_handle.active_storage_workers() as f64);
self.metrics
.active_account_workers_histogram
.record(self.proof_worker_handle.active_account_workers() as f64);
self.metrics
.pending_storage_multiproofs_histogram
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
self.metrics
.pending_account_multiproofs_histogram
.record(self.proof_worker_handle.pending_account_tasks() as f64);
}
/// Signals that a multiproof calculation has finished.
@@ -663,14 +814,17 @@ impl MultiProofTask {
available_storage_workers,
MultiProofTargets::chunks,
|proof_targets| {
self.multiproof_manager.dispatch(MultiproofInput {
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
});
self.multiproof_manager.dispatch(
MultiproofInput {
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);
},
);
self.metrics.prefetch_proof_chunks_histogram.record(num_chunks as f64);
@@ -818,14 +972,17 @@ impl MultiProofTask {
);
spawned_proof_targets.extend_ref(&proof_targets);
self.multiproof_manager.dispatch(MultiproofInput {
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
});
self.multiproof_manager.dispatch(
MultiproofInput {
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);
},
);
self.metrics
@@ -1301,9 +1458,6 @@ impl MultiProofTask {
/// Context for multiproof message batching loop.
///
/// Contains processing state that persists across loop iterations.
///
/// Used by `process_multiproof_message` to batch consecutive same-type messages received via
/// `try_recv` for efficient processing.
struct MultiproofBatchCtx {
/// Buffers a non-matching message type encountered during batching.
/// Processed first in next iteration to preserve ordering while allowing same-type

View File

@@ -27,11 +27,10 @@ use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
use reth_execution_types::ExecutionOutcome;
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{BlockReader, StateProviderFactory, StateReader};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_provider::{BlockReader, StateProviderBox, StateProviderFactory, StateReader};
use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
use reth_trie::MultiProofTargets;
use std::{
sync::{
@@ -87,7 +86,7 @@ where
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
actions_rx: Receiver<PrewarmTaskEvent>,
/// Parent span for tracing
parent_span: Span,
}
@@ -106,7 +105,7 @@ where
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
transaction_count_hint: usize,
max_concurrency: usize,
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
) -> (Self, Sender<PrewarmTaskEvent>) {
let (actions_tx, actions_rx) = channel();
trace!(
@@ -136,11 +135,8 @@ where
/// For Optimism chains, special handling is applied to the first transaction if it's a
/// deposit transaction (type 0x7E/126) which sets critical metadata that affects all
/// subsequent transactions in the block.
fn spawn_all<Tx>(
&self,
pending: mpsc::Receiver<Tx>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
) where
fn spawn_all<Tx>(&self, pending: mpsc::Receiver<Tx>, actions_tx: Sender<PrewarmTaskEvent>)
where
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
{
let executor = self.executor.clone();
@@ -252,7 +248,7 @@ where
///
/// This method is called from `run()` only after all execution tasks are complete.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn save_cache(self, execution_outcome: Arc<ExecutionOutcome<N::Receipt>>) {
fn save_cache(self, state: BundleState) {
let start = Instant::now();
let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
@@ -269,8 +265,7 @@ where
let new_cache = SavedCache::new(hash, caches, cache_metrics);
// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(execution_outcome.state()).is_err() {
if new_cache.cache().insert_state(&state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
@@ -305,12 +300,12 @@ where
pub(super) fn run(
self,
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
actions_tx: Sender<PrewarmTaskEvent>,
) {
// spawn execution tasks.
self.spawn_all(pending, actions_tx);
let mut final_execution_outcome = None;
let mut final_block_output = None;
let mut finished_execution = false;
while let Ok(event) = self.actions_rx.recv() {
match event {
@@ -323,9 +318,9 @@ where
// completed executing a set of transactions
self.send_multi_proof_targets(proof_targets);
}
PrewarmTaskEvent::Terminate { execution_outcome } => {
PrewarmTaskEvent::Terminate { block_output } => {
trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
final_execution_outcome = Some(execution_outcome);
final_block_output = Some(block_output);
if finished_execution {
// all tasks are done, we can exit, which will save caches and exit
@@ -339,7 +334,7 @@ where
finished_execution = true;
if final_execution_outcome.is_some() {
if final_block_output.is_some() {
// all tasks are done, we can exit, which will save caches and exit
break
}
@@ -349,9 +344,9 @@ where
debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
// save caches and finish using the shared ExecutionOutcome
if let Some(Some(execution_outcome)) = final_execution_outcome {
self.save_cache(execution_outcome);
// save caches and finish
if let Some(Some(state)) = final_block_output {
self.save_cache(state);
}
}
}
@@ -393,10 +388,10 @@ where
metrics,
terminate_execution,
precompile_cache_disabled,
precompile_cache_map,
mut precompile_cache_map,
} = self;
let mut state_provider = match provider.build() {
let state_provider = match provider.build() {
Ok(provider) => provider,
Err(err) => {
trace!(
@@ -409,15 +404,13 @@ where
};
// Use the caches to create a new provider with caching
if let Some(saved_cache) = saved_cache {
let state_provider: StateProviderBox = if let Some(saved_cache) = saved_cache {
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
state_provider = Box::new(
CachedStateProvider::new(state_provider, caches, cache_metrics)
// ensure we pre-warm the cache
.prewarm(),
);
}
Box::new(CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics))
} else {
state_provider
};
let state_provider = StateProviderDatabase::new(state_provider);
@@ -459,7 +452,7 @@ where
fn transact_batch<Tx>(
self,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
sender: Sender<PrewarmTaskEvent<N::Receipt>>,
sender: Sender<PrewarmTaskEvent>,
done_tx: Sender<()>,
) where
Tx: ExecutableTxFor<Evm>,
@@ -540,7 +533,7 @@ where
&self,
idx: usize,
executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
actions_tx: Sender<PrewarmTaskEvent>,
done_tx: Sender<()>,
) -> mpsc::Sender<IndexedTransaction<Tx>>
where
@@ -596,18 +589,14 @@ fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize)
}
/// The events the pre-warm task can handle.
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
/// execution path without cloning the expensive `BundleState`.
pub(super) enum PrewarmTaskEvent<R> {
pub(super) enum PrewarmTaskEvent {
/// Forcefully terminate all remaining transaction execution.
TerminateTransactionExecution,
/// Forcefully terminate the task on demand and update the shared cache with the given output
/// before exiting.
Terminate {
/// The final execution outcome. Using `Arc` allows sharing with the main execution
/// path without cloning the expensive `BundleState`.
execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
/// The final block state output.
block_output: Option<BundleState>,
},
/// The outcome of a pre-warm task
Outcome {

View File

@@ -166,7 +166,8 @@ where
// Update storage slots with new values and calculate storage roots.
let span = tracing::Span::current();
let results: Vec<_> = state
let (tx, rx) = mpsc::channel();
state
.storages
.into_iter()
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
@@ -216,7 +217,13 @@ where
SparseStateTrieResult::Ok((address, storage_trie))
})
.collect();
.for_each_init(
|| tx.clone(),
|tx, result| {
let _ = tx.send(result);
},
);
drop(tx);
// Defer leaf removals until after updates/additions, so that we don't delete an intermediate
// branch node during a removal and then re-add that branch back during a later leaf addition.
@@ -228,7 +235,7 @@ where
let _enter =
tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
.entered();
for result in results {
for result in rx {
let (address, storage_trie) = result?;
trie.insert_storage_trie(address, storage_trie);

View File

@@ -35,13 +35,12 @@ use reth_primitives_traits::{
};
use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockReader,
DatabaseProviderFactory, DatabaseProviderROFactory, ExecutionOutcome, HashedPostStateProvider,
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider, StateProviderBox,
StateProviderFactory, StateReader, TrieReader,
DatabaseProviderFactory, ExecutionOutcome, HashedPostStateProvider, ProviderError,
PruneCheckpointReader, StageCheckpointReader, StateProvider, StateProviderFactory, StateReader,
StateRootProvider, TrieReader,
};
use reth_revm::db::State;
use reth_storage_errors::db::DatabaseError;
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot, TrieInputSorted};
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInputSorted};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::Address;
use std::{
@@ -55,73 +54,30 @@ use tracing::{debug, debug_span, error, info, instrument, trace, warn};
/// Context providing access to tree state during validation.
///
/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
/// internals.
///
/// The generic parameter `P` represents the provider type used for state lookups.
pub struct TreeCtx<'a, N: NodePrimitives, P> {
/// internals
pub struct TreeCtx<'a, N: NodePrimitives> {
/// The engine API tree state
state: &'a mut EngineApiTreeState<N>,
/// Reference to the canonical in-memory state
canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
/// Optional precomputed state provider and builder to avoid redundant lookups.
/// This is set by [`crate::tree::EngineApiTreeHandler`] after validating parent state exists.
pub precomputed: Option<StateProviderAndBuilder<N, P>>,
}
/// Precomputed state provider and builder for block validation.
///
/// Contains both the built state provider (for main execution) and the builder
/// (for spawning parallel tasks that need their own providers).
pub struct StateProviderAndBuilder<N: NodePrimitives, P> {
/// The built state provider for main execution.
pub provider: StateProviderBox,
/// The builder for spawning parallel tasks.
pub builder: StateProviderBuilder<N, P>,
}
impl<N: NodePrimitives, P> std::fmt::Debug for StateProviderAndBuilder<N, P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StateProviderAndBuilder")
.field("provider", &"StateProviderBox")
.field("builder", &"StateProviderBuilder")
.finish()
}
}
impl<'a, N: NodePrimitives, P> std::fmt::Debug for TreeCtx<'a, N, P> {
impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TreeCtx")
.field("state", &"EngineApiTreeState")
.field("canonical_in_memory_state", &self.canonical_in_memory_state)
.field("precomputed", &self.precomputed.as_ref().map(|_| "Some(...)"))
.finish()
}
}
impl<'a, N: NodePrimitives, P> TreeCtx<'a, N, P> {
/// Creates a new tree context.
impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
/// Creates a new tree context
pub const fn new(
state: &'a mut EngineApiTreeState<N>,
canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
) -> Self {
Self { state, canonical_in_memory_state, precomputed: None }
}
/// Creates a new tree context with precomputed state provider and builder.
///
/// This is the preferred constructor when the provider and builder are available,
/// as it avoids redundant state lookups in the validator.
pub const fn with_precomputed(
state: &'a mut EngineApiTreeState<N>,
canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
provider: StateProviderBox,
builder: StateProviderBuilder<N, P>,
) -> Self {
Self {
state,
canonical_in_memory_state,
precomputed: Some(StateProviderAndBuilder { provider, builder }),
}
Self { state, canonical_in_memory_state }
}
/// Returns a reference to the engine tree state
@@ -365,7 +321,7 @@ where
pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
&mut self,
input: BlockOrPayload<T>,
mut ctx: TreeCtx<'_, N, P>,
mut ctx: TreeCtx<'_, N>,
) -> ValidationOutcome<N, InsertPayloadError<N::Block>>
where
V: PayloadValidator<T, Block = N::Block>,
@@ -402,33 +358,23 @@ where
let parent_hash = input.parent_hash();
let block_num_hash = input.num_hash();
trace!(target: "engine::tree::payload_validator", "Building state provider");
trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
let _enter =
debug_span!(target: "engine::tree::payload_validator", "state provider").entered();
// Use precomputed state from TreeCtx (set by EngineApiTreeHandler) to avoid
// redundant state lookups. Fall back to computing if not available (legacy callers).
let (mut state_provider, provider_builder) = if let Some(precomputed) =
ctx.precomputed.take()
{
(precomputed.provider, precomputed.builder)
} else {
// Legacy path: compute both builder and provider
let Some(builder) = ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
else {
return Err(InsertBlockError::new(
self.convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into())
};
let provider = ensure_ok!(builder.clone().build());
(provider, builder)
let Some(provider_builder) =
ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
else {
// this is pre-validated in the tree
return Err(InsertBlockError::new(
self.convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into())
};
let mut state_provider = ensure_ok!(provider_builder.build());
drop(_enter);
// Fetch parent block. This goes to memory most of the time unless the parent block is
// beyond the in-memory buffer.
// fetch parent block
let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state()))
else {
return Err(InsertBlockError::new(
@@ -453,7 +399,7 @@ where
"Decided which state root algorithm to run"
);
// Get an iterator over the transactions in the payload
// use prewarming background task
let txs = self.tx_iterator_for(&input)?;
// Extract the BAL, if valid and available
@@ -478,17 +424,21 @@ where
// Use cached state provider before executing, used in execution after prewarming threads
// complete
if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
state_provider =
Box::new(CachedStateProvider::new(state_provider, caches, cache_metrics));
state_provider = Box::new(CachedStateProvider::new_with_caches(
state_provider,
caches,
cache_metrics,
));
};
if self.config.state_provider_metrics() {
state_provider = Box::new(InstrumentedStateProvider::new(state_provider, "engine"));
}
// Execute the block and handle any execution errors
let (output, senders) = match self.execute_block(&state_provider, env, &input, &mut handle)
{
let (output, senders) = match if self.config.state_provider_metrics() {
let state_provider =
InstrumentedStateProvider::from_state_provider(&state_provider, "engine");
self.execute_block(&state_provider, env, &input, &mut handle)
} else {
self.execute_block(&state_provider, env, &input, &mut handle)
} {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
};
@@ -572,7 +522,7 @@ where
}
let (root, updates) = ensure_ok_post_block!(
self.compute_state_root_serial(block.parent_hash(), &hashed_state, ctx.state()),
state_provider.state_root_with_updates(hashed_state.clone()),
block
);
(root, updates, root_time.elapsed())
@@ -602,14 +552,17 @@ where
.into())
}
// Create ExecutionOutcome and wrap in Arc for sharing with both the caching task
// and the deferred trie task. This avoids cloning the expensive BundleState.
let execution_outcome = Arc::new(ExecutionOutcome::from((output, block_num_hash.number)));
// terminate prewarming task with good state output
handle.terminate_caching(Some(&output.state));
// Terminate prewarming task with the shared execution outcome
handle.terminate_caching(Some(Arc::clone(&execution_outcome)));
Ok(self.spawn_deferred_trie_task(block, execution_outcome, &ctx, hashed_state, trie_output))
Ok(self.spawn_deferred_trie_task(
block,
output,
block_num_hash.number,
&ctx,
hashed_state,
trie_output,
))
}
/// Return sealed block header from database or in-memory state by hash.
@@ -652,7 +605,7 @@ where
state_provider: S,
env: ExecutionEnv<Evm>,
input: &BlockOrPayload<T>,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err>,
) -> Result<(BlockExecutionOutput<N::Receipt>, Vec<Address>), InsertBlockErrorKind>
where
S: StateProvider,
@@ -664,7 +617,7 @@ where
debug!(target: "engine::tree::payload_validator", "Executing block");
let mut db = State::builder()
.with_database(StateProviderDatabase::new(state_provider))
.with_database(StateProviderDatabase::new(&state_provider))
.with_bundle_update()
.without_state_clear()
.build();
@@ -710,6 +663,8 @@ where
///
/// Returns `Ok(_)` if computed successfully.
/// Returns `Err(_)` if error was encountered during computation.
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
/// should be used instead.
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn compute_state_root_parallel(
&self,
@@ -739,36 +694,6 @@ where
ParallelStateRoot::new(factory, prefix_sets).incremental_root_with_updates()
}
/// Compute state root for the given hashed post state in serial.
fn compute_state_root_serial(
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
state: &EngineApiTreeState<N>,
) -> ProviderResult<(B256, TrieUpdates)> {
let (mut input, block_hash) = self.compute_trie_input(parent_hash, state)?;
// Extend state overlay with current block's sorted state.
input.prefix_sets.extend(hashed_state.construct_prefix_sets());
let sorted_hashed_state = hashed_state.clone_into_sorted();
Arc::make_mut(&mut input.state).extend_ref(&sorted_hashed_state);
let TrieInputSorted { nodes, state, .. } = input;
let prefix_sets = hashed_state.construct_prefix_sets();
let factory = OverlayStateProviderFactory::new(self.provider.clone())
.with_block_hash(Some(block_hash))
.with_trie_overlay(Some(nodes))
.with_hashed_state_overlay(Some(state));
let provider = factory.database_provider_ro()?;
Ok(StateRoot::new(&provider, &provider)
.with_prefix_sets(prefix_sets.freeze())
.root_with_updates()
.map_err(Into::<DatabaseError>::into)?)
}
/// Validates the block after execution.
///
/// This performs:
@@ -781,7 +706,7 @@ where
block: &RecoveredBlock<N::Block>,
parent_block: &SealedHeader<N::BlockHeader>,
output: &BlockExecutionOutput<N::Receipt>,
ctx: &mut TreeCtx<'_, N, P>,
ctx: &mut TreeCtx<'_, N>,
) -> Result<HashedPostState, InsertBlockErrorKind>
where
V: PayloadValidator<T, Block = N::Block>,
@@ -868,7 +793,6 @@ where
PayloadHandle<
impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
N::Receipt,
>,
InsertBlockErrorKind,
> {
@@ -1103,8 +1027,9 @@ where
fn spawn_deferred_trie_task(
&self,
block: RecoveredBlock<N::Block>,
execution_outcome: Arc<ExecutionOutcome<N::Receipt>>,
ctx: &TreeCtx<'_, N, P>,
output: BlockExecutionOutput<N::Receipt>,
block_number: u64,
ctx: &TreeCtx<'_, N>,
hashed_state: HashedPostState,
trie_output: TrieUpdates,
) -> ExecutedBlock<N> {
@@ -1153,7 +1078,7 @@ where
ExecutedBlock::with_deferred_trie_data(
Arc::new(block),
execution_outcome,
Arc::new(ExecutionOutcome::from((output, block_number))),
deferred_trie_data,
)
}
@@ -1181,9 +1106,6 @@ pub trait EngineValidator<
N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
>: Send + Sync + 'static
{
/// The provider type used for state lookups.
type Provider;
/// Validates the payload attributes with respect to the header.
///
/// By default, this enforces that the payload attributes timestamp is greater than the
@@ -1216,14 +1138,14 @@ pub trait EngineValidator<
fn validate_payload(
&mut self,
payload: Types::ExecutionData,
ctx: TreeCtx<'_, N, Self::Provider>,
ctx: TreeCtx<'_, N>,
) -> ValidationOutcome<N>;
/// Validates a block downloaded from the network.
fn validate_block(
&mut self,
block: SealedBlock<N::Block>,
ctx: TreeCtx<'_, N, Self::Provider>,
ctx: TreeCtx<'_, N>,
) -> ValidationOutcome<N>;
/// Hook called after an executed block is inserted directly into the tree.
@@ -1242,16 +1164,12 @@ where
+ StateReader
+ HashedPostStateProvider
+ Clone
+ Send
+ Sync
+ 'static,
N: NodePrimitives,
V: PayloadValidator<Types, Block = N::Block>,
Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
{
type Provider = P;
fn validate_payload_attributes_against_header(
&self,
attr: &Types::PayloadAttributes,
@@ -1271,7 +1189,7 @@ where
fn validate_payload(
&mut self,
payload: Types::ExecutionData,
ctx: TreeCtx<'_, N, P>,
ctx: TreeCtx<'_, N>,
) -> ValidationOutcome<N> {
self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
}
@@ -1279,7 +1197,7 @@ where
fn validate_block(
&mut self,
block: SealedBlock<N::Block>,
ctx: TreeCtx<'_, N, P>,
ctx: TreeCtx<'_, N>,
) -> ValidationOutcome<N> {
self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
}

View File

@@ -1,58 +1,50 @@
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
use alloy_primitives::Bytes;
use dashmap::DashMap;
use moka::policy::EvictionPolicy;
use parking_lot::Mutex;
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
use revm::precompile::{PrecompileId, PrecompileOutput, PrecompileResult};
use revm_primitives::Address;
use std::{hash::Hash, sync::Arc};
use schnellru::LruMap;
use std::{
collections::HashMap,
hash::{Hash, Hasher},
sync::Arc,
};
/// Default max cache size for [`PrecompileCache`]
const MAX_CACHE_SIZE: u32 = 10_000;
/// Stores caches for each precompile.
#[derive(Debug, Clone, Default)]
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>>>)
pub struct PrecompileCacheMap<S>(HashMap<Address, PrecompileCache<S>>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
impl<S> PrecompileCacheMap<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
pub(crate) fn cache_for_address(&self, address: Address) -> PrecompileCache<S> {
// Try just using `.get` first to avoid acquiring a write lock.
if let Some(cache) = self.0.get(&address) {
return cache.clone();
}
// Otherwise, fallback to `.entry` and initialize the cache.
//
// This should be very rare as caches for all precompiles will be initialized as soon as
// first EVM is created.
pub(crate) fn cache_for_address(&mut self, address: Address) -> PrecompileCache<S> {
self.0.entry(address).or_default().clone()
}
}
/// Cache for precompiles, for each input stores the result.
///
/// [`LruMap`] requires a mutable reference on `get` since it updates the LRU order,
/// so we use a [`Mutex`] instead of an `RwLock`.
#[derive(Debug, Clone)]
pub struct PrecompileCache<S>(
moka::sync::Cache<Bytes, CacheEntry<S>, alloy_primitives::map::DefaultHashBuilder>,
)
pub struct PrecompileCache<S>(Arc<Mutex<LruMap<CacheKey<S>, CacheEntry>>>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
impl<S> Default for PrecompileCache<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
fn default() -> Self {
Self(
moka::sync::CacheBuilder::new(MAX_CACHE_SIZE as u64)
.initial_capacity(MAX_CACHE_SIZE as usize)
.eviction_policy(EvictionPolicy::lru())
.build_with_hasher(Default::default()),
)
Self(Arc::new(Mutex::new(LruMap::new(schnellru::ByLength::new(MAX_CACHE_SIZE)))))
}
}
@@ -60,31 +52,63 @@ impl<S> PrecompileCache<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
fn get(&self, input: &[u8], spec: S) -> Option<CacheEntry<S>> {
self.0.get(input).filter(|e| e.spec == spec)
fn get(&self, key: &CacheKeyRef<'_, S>) -> Option<CacheEntry> {
self.0.lock().get(key).cloned()
}
/// Inserts the given key and value into the cache, returning the new cache size.
fn insert(&self, input: Bytes, value: CacheEntry<S>) -> usize {
self.0.insert(input, value);
self.0.entry_count() as usize
fn insert(&self, key: CacheKey<S>, value: CacheEntry) -> usize {
let mut cache = self.0.lock();
cache.insert(key, value);
cache.len()
}
}
/// Cache key, spec id and precompile call input. spec id is included in the key to account for
/// precompile repricing across fork activations.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CacheKey<S>((S, Bytes));
impl<S> CacheKey<S> {
const fn new(spec_id: S, input: Bytes) -> Self {
Self((spec_id, input))
}
}
/// Cache key reference, used to avoid cloning the input bytes when looking up using a [`CacheKey`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheKeyRef<'a, S>((S, &'a [u8]));
impl<'a, S> CacheKeyRef<'a, S> {
const fn new(spec_id: S, input: &'a [u8]) -> Self {
Self((spec_id, input))
}
}
impl<S: PartialEq> PartialEq<CacheKey<S>> for CacheKeyRef<'_, S> {
fn eq(&self, other: &CacheKey<S>) -> bool {
self.0 .0 == other.0 .0 && self.0 .1 == other.0 .1.as_ref()
}
}
impl<'a, S: Hash> Hash for CacheKeyRef<'a, S> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0 .0.hash(state);
self.0 .1.hash(state);
}
}
/// Cache entry, precompile successful output.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheEntry<S> {
output: PrecompileOutput,
spec: S,
}
pub struct CacheEntry(PrecompileOutput);
impl<S> CacheEntry<S> {
impl CacheEntry {
const fn gas_used(&self) -> u64 {
self.output.gas_used
self.0.gas_used
}
fn to_precompile_result(&self) -> PrecompileResult {
Ok(self.output.clone())
Ok(self.0.clone())
}
}
@@ -166,7 +190,9 @@ where
}
fn call(&self, input: PrecompileInput<'_>) -> PrecompileResult {
if let Some(entry) = &self.cache.get(input.data, self.spec_id.clone()) {
let key = CacheKeyRef::new(self.spec_id.clone(), input.data);
if let Some(entry) = &self.cache.get(&key) {
self.increment_by_one_precompile_cache_hits();
if input.gas >= entry.gas_used() {
return entry.to_precompile_result()
@@ -178,10 +204,8 @@ where
match &result {
Ok(output) => {
let size = self.cache.insert(
Bytes::copy_from_slice(calldata),
CacheEntry { output: output.clone(), spec: self.spec_id.clone() },
);
let key = CacheKey::new(self.spec_id.clone(), Bytes::copy_from_slice(calldata));
let size = self.cache.insert(key, CacheEntry(output.clone()));
self.set_precompile_cache_size_metric(size as f64);
self.increment_by_one_precompile_cache_misses();
}
@@ -222,12 +246,31 @@ impl CachedPrecompileMetrics {
#[cfg(test)]
mod tests {
use std::hash::DefaultHasher;
use super::*;
use reth_evm::{EthEvmFactory, Evm, EvmEnv, EvmFactory};
use reth_revm::db::EmptyDB;
use revm::{context::TxEnv, precompile::PrecompileOutput};
use revm_primitives::hardfork::SpecId;
#[test]
fn test_cache_key_ref_hash() {
let key1 = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
let key2 = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
assert!(PartialEq::eq(&key2, &key1));
let mut hasher = DefaultHasher::new();
key1.hash(&mut hasher);
let hash1 = hasher.finish();
let mut hasher = DefaultHasher::new();
key2.hash(&mut hasher);
let hash2 = hasher.finish();
assert_eq!(hash1, hash2);
}
#[test]
fn test_precompile_cache_basic() {
let dyn_precompile: DynPrecompile = (|_input: PrecompileInput<'_>| -> PrecompileResult {
@@ -250,11 +293,12 @@ mod tests {
reverted: false,
};
let input = b"test_input";
let expected = CacheEntry { output, spec: SpecId::PRAGUE };
cache.cache.insert(input.into(), expected.clone());
let key = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
let expected = CacheEntry(output);
cache.cache.insert(key, expected.clone());
let actual = cache.cache.get(input, SpecId::PRAGUE).unwrap();
let key = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
let actual = cache.cache.get(&key).unwrap();
assert_eq!(actual, expected);
}
@@ -268,7 +312,7 @@ mod tests {
let address1 = Address::repeat_byte(1);
let address2 = Address::repeat_byte(2);
let cache_map = PrecompileCacheMap::default();
let mut cache_map = PrecompileCacheMap::default();
// create the first precompile with a specific output
let precompile1: DynPrecompile = (PrecompileId::custom("custom"), {

View File

@@ -4,7 +4,7 @@ use crate::{
tree::{
payload_validator::{BasicEngineValidator, TreeCtx, ValidationOutcome},
persistence_state::CurrentPersistenceAction,
PersistTarget, TreeConfig,
TreeConfig,
},
};
@@ -285,8 +285,7 @@ impl TestHarness {
let fcu_state = self.fcu_state(block_hash);
let (tx, rx) = oneshot::channel();
let _ = self
.tree
self.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: fcu_state,
@@ -499,7 +498,7 @@ fn test_tree_persist_block_batch() {
// process the message
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
let _ = test_harness.tree.on_engine_message(msg).unwrap();
test_harness.tree.on_engine_message(msg).unwrap();
// we now should receive the other batch
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
@@ -578,7 +577,7 @@ async fn test_engine_request_during_backfill() {
.with_backfill_state(BackfillSyncState::Active);
let (tx, rx) = oneshot::channel();
let _ = test_harness
test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
@@ -659,7 +658,7 @@ async fn test_holesky_payload() {
TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
let (tx, rx) = oneshot::channel();
let _ = test_harness
test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::NewPayload {
@@ -884,8 +883,7 @@ async fn test_get_canonical_blocks_to_persist() {
.with_persistence_threshold(persistence_threshold)
.with_memory_block_buffer_target(memory_block_buffer_target);
let blocks_to_persist =
test_harness.tree.get_canonical_blocks_to_persist(PersistTarget::Threshold).unwrap();
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap();
let expected_blocks_to_persist_length: usize =
(canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
@@ -904,8 +902,7 @@ async fn test_get_canonical_blocks_to_persist() {
assert!(test_harness.tree.state.tree_state.sealed_header_by_hash(&fork_block_hash).is_some());
let blocks_to_persist =
test_harness.tree.get_canonical_blocks_to_persist(PersistTarget::Threshold).unwrap();
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap();
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
// check that the fork block is not included in the blocks to persist
@@ -984,7 +981,7 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
// add block to mock provider to enable persistence clean up.
test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block());
let _ = test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
@@ -994,7 +991,7 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
_ => panic!("Unexpected event: {event:#?}"),
}
let _ = test_harness
test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
.last()
@@ -1050,7 +1047,7 @@ async fn test_fcu_with_canonical_ancestor_updates_latest_block() {
// Send FCU to the canonical ancestor
let (tx, rx) = oneshot::channel();
let _ = test_harness
test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
@@ -1946,53 +1943,4 @@ mod forkchoice_updated_tests {
.unwrap();
assert!(result.is_some(), "OpStack should handle canonical head");
}
/// Test that engine termination persists all blocks and signals completion.
#[test]
fn test_engine_termination_with_everything_persisted() {
let chain_spec = MAINNET.clone();
let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
// Create 10 blocks to persist
let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..11).collect();
let canonical_tip = blocks.last().unwrap().recovered_block().number;
let test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
// Create termination channel
let (terminate_tx, mut terminate_rx) = oneshot::channel();
let to_tree_tx = test_harness.to_tree_tx.clone();
let action_rx = test_harness.action_rx;
// Spawn tree in background thread
std::thread::Builder::new()
.name("Engine Task".to_string())
.spawn(|| test_harness.tree.run())
.unwrap();
// Send terminate request
to_tree_tx
.send(FromEngine::Event(FromOrchestrator::Terminate { tx: terminate_tx }))
.unwrap();
// Handle persistence actions until termination completes
let mut last_persisted_number = 0;
loop {
if terminate_rx.try_recv().is_ok() {
break;
}
if let Ok(PersistenceAction::SaveBlocks(saved_blocks, sender)) =
action_rx.recv_timeout(std::time::Duration::from_millis(100))
{
if let Some(last) = saved_blocks.last() {
last_persisted_number = last.recovered_block().number;
}
sender.send(saved_blocks.last().map(|b| b.recovered_block().num_hash())).unwrap();
}
}
// Ensure we persisted right to the tip
assert_eq!(last_persisted_number, canonical_tip);
}
}

View File

@@ -38,7 +38,6 @@ tempfile.workspace = true
default = []
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
samply = ["reth-tracing/samply", "reth-node-core/samply"]
dev = ["reth-cli-commands/arbitrary"]

View File

@@ -87,7 +87,9 @@ fn verify_receipts<R: Receipt>(
logs_bloom,
expected_receipts_root,
expected_logs_bloom,
)
)?;
Ok(())
}
/// Compare the calculated receipts root with the expected receipts root, also compare

View File

@@ -170,7 +170,7 @@ impl DisplayHardforks {
let mut post_merge = Vec::new();
for (fork, condition, metadata) in hardforks {
let display_fork = DisplayFork {
let mut display_fork = DisplayFork {
name: fork.name().to_string(),
activated_at: condition,
eip: None,
@@ -181,7 +181,12 @@ impl DisplayHardforks {
ForkCondition::Block(_) => {
pre_merge.push(display_fork);
}
ForkCondition::TTD { .. } => {
ForkCondition::TTD { activation_block_number, total_difficulty, fork_block } => {
display_fork.activated_at = ForkCondition::TTD {
activation_block_number,
fork_block,
total_difficulty,
};
with_merge.push(display_fork);
}
ForkCondition::Timestamp(_) => {

View File

@@ -61,7 +61,6 @@ reth-node-core.workspace = true
reth-e2e-test-utils.workspace = true
reth-tasks.workspace = true
reth-testing-utils.workspace = true
reth-stages-types.workspace = true
tempfile.workspace = true
jsonrpsee-core.workspace = true
@@ -91,7 +90,6 @@ asm-keccak = [
]
keccak-cache-global = [
"alloy-primitives/keccak-cache-global",
"reth-node-core/keccak-cache-global",
]
js-tracer = [
"reth-node-builder/js-tracer",
@@ -111,5 +109,4 @@ test-utils = [
"reth-evm/test-utils",
"reth-primitives-traits/test-utils",
"reth-evm-ethereum/test-utils",
"reth-stages-types/test-utils",
]

View File

@@ -1,100 +0,0 @@
use crate::utils::eth_payload_attributes;
use alloy_genesis::Genesis;
use alloy_primitives::B256;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_e2e_test_utils::{setup, transaction::TransactionTestContext};
use reth_node_ethereum::EthereumNode;
use reth_provider::{HeaderProvider, StageCheckpointReader};
use reth_stages_types::StageId;
use std::sync::Arc;
/// Tests that a node can initialize and advance with a custom genesis block number.
#[tokio::test]
async fn can_run_eth_node_with_custom_genesis_number() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Create genesis with custom block number (e.g., 1000)
let mut genesis: Genesis =
serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
genesis.number = Some(1000);
genesis.parent_hash = Some(B256::random());
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(genesis)
.cancun_activated()
.build(),
);
let (mut nodes, _tasks, wallet) =
setup::<EthereumNode>(1, chain_spec, false, eth_payload_attributes).await?;
let mut node = nodes.pop().unwrap();
// Verify stage checkpoints are initialized to genesis block number (1000)
for stage in StageId::ALL {
let checkpoint = node.inner.provider.get_stage_checkpoint(stage)?;
assert!(checkpoint.is_some(), "Stage {:?} checkpoint should exist", stage);
assert_eq!(
checkpoint.unwrap().block_number,
1000,
"Stage {:?} checkpoint should be at genesis block 1000",
stage
);
}
// Advance the chain (block 1001)
let raw_tx = TransactionTestContext::transfer_tx_bytes(1, wallet.inner).await;
let tx_hash = node.rpc.inject_tx(raw_tx).await?;
let payload = node.advance_block().await?;
let block_hash = payload.block().hash();
let block_number = payload.block().number;
// Verify we're at block 1001 (genesis + 1)
assert_eq!(block_number, 1001, "Block number should be 1001 after advancing from genesis 1000");
// Assert the block has been committed
node.assert_new_block(tx_hash, block_hash, block_number).await?;
Ok(())
}
/// Tests that block queries respect custom genesis boundaries.
#[tokio::test]
async fn custom_genesis_block_query_boundaries() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let genesis_number = 5000u64;
let mut genesis: Genesis =
serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
genesis.number = Some(genesis_number);
genesis.parent_hash = Some(B256::random());
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(genesis)
.cancun_activated()
.build(),
);
let (mut nodes, _tasks, _wallet) =
setup::<EthereumNode>(1, chain_spec, false, eth_payload_attributes).await?;
let node = nodes.pop().unwrap();
// Query genesis block should succeed
let genesis_header = node.inner.provider.header_by_number(genesis_number)?;
assert!(genesis_header.is_some(), "Genesis block at {} should exist", genesis_number);
// Query blocks before genesis should return None
for block_num in [0, 1, genesis_number - 1] {
let header = node.inner.provider.header_by_number(block_num)?;
assert!(header.is_none(), "Block {} before genesis should not exist", block_num);
}
Ok(())
}

View File

@@ -7,7 +7,6 @@ use reth_e2e_test_utils::{
use reth_node_builder::{NodeBuilder, NodeHandle};
use reth_node_core::{args::RpcServerArgs, node_config::NodeConfig};
use reth_node_ethereum::EthereumNode;
use reth_provider::BlockNumReader;
use reth_tasks::TaskManager;
use std::sync::Arc;
@@ -128,55 +127,3 @@ async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyr
Ok(())
}
#[tokio::test]
async fn test_engine_graceful_shutdown() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let (mut nodes, _tasks, wallet) = setup::<EthereumNode>(
1,
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
),
false,
eth_payload_attributes,
)
.await?;
let mut node = nodes.pop().unwrap();
let raw_tx = TransactionTestContext::transfer_tx_bytes(1, wallet.inner).await;
let tx_hash = node.rpc.inject_tx(raw_tx).await?;
let payload = node.advance_block().await?;
node.assert_new_block(tx_hash, payload.block().hash(), payload.block().number).await?;
// Get block number before shutdown
let block_before = node.inner.provider.best_block_number()?;
assert_eq!(block_before, 1, "Expected 1 block before shutdown");
// Verify block is NOT yet persisted to database
let db_block_before = node.inner.provider.last_block_number()?;
assert_eq!(db_block_before, 0, "Block should not be persisted yet");
// Trigger graceful shutdown
let done_rx = node
.inner
.add_ons_handle
.engine_shutdown
.shutdown()
.expect("shutdown should return receiver");
tokio::time::timeout(std::time::Duration::from_secs(2), done_rx)
.await
.expect("shutdown timed out")
.expect("shutdown completion channel should not be closed");
let db_block = node.inner.provider.last_block_number()?;
assert_eq!(db_block, 1, "Database should have persisted block 1");
Ok(())
}

View File

@@ -1,7 +1,6 @@
#![allow(missing_docs)]
mod blobs;
mod custom_genesis;
mod dev;
mod eth;
mod p2p;

View File

@@ -81,7 +81,6 @@ arbitrary = [
]
keccak-cache-global = [
"reth-node-ethereum?/keccak-cache-global",
"reth-node-core?/keccak-cache-global",
]
test-utils = [
"reth-chainspec/test-utils",

View File

@@ -61,6 +61,8 @@ pub use alloy_evm::{
*,
};
pub use alloy_evm::block::state_changes as state_change;
/// A complete configuration of EVM for Reth.
///
/// This trait encapsulates complete configuration required for transaction execution and block

View File

@@ -1218,7 +1218,9 @@ impl ReverseHeadersDownloaderBuilder {
next_request_block_number: 0,
next_chain_tip_block_number: 0,
lowest_validated_header: None,
request_limit,
// TODO(mattsse): tmp hotfix to prevent issues with syncing from besu which has an upper
// limit of 512
request_limit: request_limit.min(512),
min_concurrent_requests,
max_concurrent_requests,
stream_batch_size,

View File

@@ -312,6 +312,7 @@ impl ECIES {
/// Create a new ECIES client with the given static secret key and remote peer ID.
pub fn new_client(secret_key: SecretKey, remote_id: PeerId) -> Result<Self, ECIESError> {
// TODO(rand): use rng for nonce
let mut rng = rng();
let nonce = B256::random();
let ephemeral_secret_key = SecretKey::new(&mut rng);

View File

@@ -101,9 +101,8 @@ where
.or(Err(P2PStreamError::HandshakeError(P2PHandshakeError::Timeout)))?
.ok_or(P2PStreamError::HandshakeError(P2PHandshakeError::NoResponse))??;
// Check that the uncompressed message length does not exceed the max payload size.
// Note: The first message (Hello/Disconnect) is not snappy compressed. We will check the
// decompressed length again for subsequent messages after the handshake.
// let's check the compressed length first, we will need to check again once confirming
// that it contains snappy-compressed data (this will be the case for all non-p2p messages).
if first_message_bytes.len() > MAX_PAYLOAD_SIZE {
return Err(P2PStreamError::MessageTooBig {
message_size: first_message_bytes.len(),

View File

@@ -76,7 +76,6 @@ secp256k1 = { workspace = true, features = ["global-context", "std", "recovery"]
## misc
aquamarine.workspace = true
eyre.workspace = true
parking_lot.workspace = true
jsonrpsee.workspace = true
fdlimit.workspace = true
rayon.workspace = true

View File

@@ -483,7 +483,6 @@ where
StaticFileProviderBuilder::read_write(self.data_dir().static_files())?
.with_metrics()
.with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
.with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
.build()?;
// Initialize RocksDB provider with metrics, statistics, and default tables
@@ -938,44 +937,28 @@ where
///
/// A target block hash if the pipeline is inconsistent, otherwise `None`.
pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
// We skip the era stage if it's not enabled
let era_enabled = self.era_import_source().is_some();
let mut all_stages =
StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
// Get the expected first stage based on config.
let first_stage = all_stages.next().expect("there must be at least one stage");
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
let first_stage_checkpoint = self
.blockchain_db()
.get_stage_checkpoint(first_stage)?
.get_stage_checkpoint(*StageId::ALL.first().unwrap())?
.unwrap_or_default()
.block_number;
// Compare all other stages against the first
for stage_id in all_stages {
// Skip the first stage as we've already retrieved it and comparing all other checkpoints
// against it.
for stage_id in StageId::ALL.iter().skip(1) {
let stage_checkpoint = self
.blockchain_db()
.get_stage_checkpoint(stage_id)?
.get_stage_checkpoint(*stage_id)?
.unwrap_or_default()
.block_number;
// If the checkpoint of any stage is less than the checkpoint of the first stage,
// retrieve and return the block hash of the latest header and use it as the target.
debug!(
target: "consensus::engine",
first_stage_id = %first_stage,
first_stage_checkpoint,
stage_id = %stage_id,
stage_checkpoint = stage_checkpoint,
"Checking stage against first stage",
);
if stage_checkpoint < first_stage_checkpoint {
debug!(
target: "consensus::engine",
first_stage_id = %first_stage,
first_stage_checkpoint,
inconsistent_stage_id = %stage_id,
inconsistent_stage_checkpoint = stage_checkpoint,

View File

@@ -3,7 +3,7 @@
use crate::{
common::{Attached, LaunchContextWith, WithConfigs},
hooks::NodeHooks,
rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
setup::build_networked_pipeline,
AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
@@ -13,7 +13,6 @@ use futures::{stream_select, StreamExt};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::{
chain::FromOrchestrator,
engine::{EngineApiRequest, EngineRequestHandler},
tree::TreeConfig,
};
@@ -261,16 +260,8 @@ impl EngineNodeLauncher {
)),
);
let RpcHandle {
rpc_server_handles,
rpc_registry,
engine_events,
beacon_engine_handle,
engine_shutdown: _,
} = add_ons.launch_add_ons(add_ons_ctx).await?;
// Create engine shutdown handle
let (engine_shutdown, mut shutdown_rx) = EngineShutdown::new();
let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
add_ons.launch_add_ons(add_ons_ctx).await?;
// Run consensus engine to completion
let initial_target = ctx.initial_backfill_target()?;
@@ -304,14 +295,6 @@ impl EngineNodeLauncher {
// advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
loop {
tokio::select! {
shutdown_req = &mut shutdown_rx => {
if let Ok(req) = shutdown_req {
debug!(target: "reth::cli", "received engine shutdown request");
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
FromOrchestrator::Terminate { tx: req.done_tx }.into()
);
}
}
payload = built_payloads.select_next_some() => {
if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
@@ -383,7 +366,6 @@ impl EngineNodeLauncher {
rpc_registry,
engine_events,
beacon_engine_handle,
engine_shutdown,
},
};
// Notify on node started

View File

@@ -11,7 +11,6 @@ use crate::{
use alloy_rpc_types::engine::ClientVersionV1;
use alloy_rpc_types_engine::ExecutionData;
use jsonrpsee::{core::middleware::layer::Either, RpcModule};
use parking_lot::Mutex;
use reth_chain_state::CanonStateSubscriptions;
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks, Hardforks};
use reth_node_api::{
@@ -42,9 +41,7 @@ use std::{
fmt::{self, Debug},
future::Future,
ops::{Deref, DerefMut},
sync::Arc,
};
use tokio::sync::oneshot;
/// Contains the handles to the spawned RPC servers.
///
@@ -335,8 +332,6 @@ pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
pub engine_events: EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
/// Handle to the beacon consensus engine.
pub beacon_engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
/// Handle to trigger engine shutdown.
pub engine_shutdown: EngineShutdown,
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, EthApi> {
@@ -346,7 +341,6 @@ impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, Et
rpc_registry: self.rpc_registry.clone(),
engine_events: self.engine_events.clone(),
beacon_engine_handle: self.beacon_engine_handle.clone(),
engine_shutdown: self.engine_shutdown.clone(),
}
}
}
@@ -367,7 +361,6 @@ where
f.debug_struct("RpcHandle")
.field("rpc_server_handles", &self.rpc_server_handles)
.field("rpc_registry", &self.rpc_registry)
.field("engine_shutdown", &self.engine_shutdown)
.finish()
}
}
@@ -963,7 +956,6 @@ where
rpc_registry: registry,
engine_events,
beacon_engine_handle: engine_handle,
engine_shutdown: EngineShutdown::default(),
})
}
@@ -1279,7 +1271,6 @@ pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
type EngineValidator: EngineValidator<
<Node::Types as NodeTypes>::Payload,
<Node::Types as NodeTypes>::Primitives,
Provider = Node::Provider,
>;
/// Builds the tree validator for the consensus engine.
@@ -1390,7 +1381,6 @@ where
version: version_metadata().cargo_pkg_version.to_string(),
commit: version_metadata().vergen_git_sha.to_string(),
};
Ok(EngineApi::new(
ctx.node.provider().clone(),
ctx.config.chain.clone(),
@@ -1402,7 +1392,6 @@ where
EngineCapabilities::default(),
engine_validator,
ctx.config.engine.accept_execution_requests_hash,
ctx.node.network().clone(),
))
}
}
@@ -1437,48 +1426,3 @@ impl IntoEngineApiRpcModule for NoopEngineApi {
RpcModule::new(())
}
}
/// Handle to trigger graceful engine shutdown.
///
/// This handle can be used to request a graceful shutdown of the engine,
/// which will persist all remaining in-memory blocks before terminating.
#[derive(Clone, Debug)]
pub struct EngineShutdown {
/// Channel to send shutdown signal.
tx: Arc<Mutex<Option<oneshot::Sender<EngineShutdownRequest>>>>,
}
impl EngineShutdown {
/// Creates a new [`EngineShutdown`] handle and returns the receiver.
pub fn new() -> (Self, oneshot::Receiver<EngineShutdownRequest>) {
let (tx, rx) = oneshot::channel();
(Self { tx: Arc::new(Mutex::new(Some(tx))) }, rx)
}
/// Requests a graceful engine shutdown.
///
/// All remaining in-memory blocks will be persisted before the engine terminates.
///
/// Returns a receiver that resolves when shutdown is complete.
/// Returns `None` if shutdown was already triggered.
pub fn shutdown(&self) -> Option<oneshot::Receiver<()>> {
let mut guard = self.tx.lock();
let tx = guard.take()?;
let (done_tx, done_rx) = oneshot::channel();
let _ = tx.send(EngineShutdownRequest { done_tx });
Some(done_rx)
}
}
impl Default for EngineShutdown {
fn default() -> Self {
Self { tx: Arc::new(Mutex::new(None)) }
}
}
/// Request to shutdown the engine.
#[derive(Debug)]
pub struct EngineShutdownRequest {
/// Channel to signal shutdown completion.
pub done_tx: oneshot::Sender<()>,
}

View File

@@ -80,9 +80,8 @@ tokio.workspace = true
# Features for vergen to generate correct env vars
jemalloc = ["reth-cli-util/jemalloc"]
asm-keccak = ["alloy-primitives/asm-keccak"]
keccak-cache-global = ["alloy-primitives/keccak-cache-global"]
# Feature to enable opentelemetry export
otlp = ["reth-tracing/otlp"]
samply = ["reth-tracing/samply"]
min-error-logs = ["tracing/release_max_level_error"]
min-warn-logs = ["tracing/release_max_level_warn"]

View File

@@ -36,7 +36,7 @@ use reth_network::{
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
},
HelloMessageWithProtocols, NetworkConfigBuilder, NetworkPrimitives,
HelloMessageWithProtocols, NetworkConfigBuilder, NetworkPrimitives, SessionsConfig,
};
use reth_network_peers::{mainnet_nodes, TrustedPeer};
use secp256k1::SecretKey;
@@ -339,7 +339,7 @@ impl NetworkArgs {
NetworkConfigBuilder::<N>::new(secret_key)
.external_ip_resolver(self.nat.clone())
.sessions_config(
config.sessions.clone().with_upscaled_event_buffer(peers_config.max_peers()),
SessionsConfig::default().with_upscaled_event_buffer(peers_config.max_peers()),
)
.peer_config(peers_config)
.boot_nodes(chain_bootnodes.clone())

View File

@@ -208,7 +208,7 @@ where
active: true,
syncing: self.network.is_syncing(),
peers: self.network.num_connected_peers() as u64,
gas_price: self.pool.block_info().pending_basefee,
gas_price: 0, // TODO
uptime: 100,
},
};

View File

@@ -27,10 +27,9 @@ tracing.workspace = true
workspace = true
[features]
default = ["jemalloc", "otlp", "reth-optimism-evm/portable", "js-tracer", "keccak-cache-global", "asm-keccak"]
default = ["jemalloc", "otlp", "reth-optimism-evm/portable", "js-tracer", "keccak-cache-global"]
otlp = ["reth-optimism-cli/otlp"]
samply = ["reth-optimism-cli/samply"]
js-tracer = [
"reth-optimism-node/js-tracer",

View File

@@ -78,7 +78,6 @@ default = []
# Opentelemtry feature to activate metrics export
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
samply = ["reth-tracing/samply", "reth-node-core/samply"]
asm-keccak = [
"alloy-primitives/asm-keccak",

View File

@@ -80,10 +80,8 @@ reth-payload-util.workspace = true
reth-revm = { workspace = true, features = ["std"] }
reth-rpc.workspace = true
reth-rpc-eth-types.workspace = true
reth-stages-types.workspace = true
alloy-network.workspace = true
alloy-op-hardforks.workspace = true
futures.workspace = true
op-alloy-network.workspace = true
@@ -97,7 +95,6 @@ asm-keccak = [
]
keccak-cache-global = [
"alloy-primitives/keccak-cache-global",
"reth-node-core/keccak-cache-global",
"reth-optimism-node/keccak-cache-global",
]
js-tracer = [
@@ -125,7 +122,6 @@ test-utils = [
"reth-optimism-primitives/arbitrary",
"reth-primitives-traits/test-utils",
"reth-trie-common/test-utils",
"reth-stages-types/test-utils",
]
reth-codec = ["reth-optimism-primitives/reth-codec"]

View File

@@ -299,16 +299,23 @@ mod test {
use super::*;
use crate::engine;
use alloy_op_hardforks::BASE_SEPOLIA_JOVIAN_TIMESTAMP;
use alloy_primitives::{b64, Address, B256, B64};
use alloy_rpc_types_engine::PayloadAttributes;
use reth_chainspec::ChainSpec;
use reth_chainspec::{ChainSpec, ForkCondition, Hardfork};
use reth_optimism_chainspec::{OpChainSpec, BASE_SEPOLIA};
use reth_optimism_forks::OpHardfork;
use reth_provider::noop::NoopProvider;
use reth_trie_common::KeccakKeyHasher;
const JOVIAN_TIMESTAMP: u64 = 1744909000;
fn get_chainspec() -> Arc<OpChainSpec> {
let base_sepolia_spec = BASE_SEPOLIA.inner.clone();
let mut base_sepolia_spec = BASE_SEPOLIA.inner.clone();
// TODO: Remove this once we know the Jovian timestamp
base_sepolia_spec
.hardforks
.insert(OpHardfork::Jovian.boxed(), ForkCondition::Timestamp(JOVIAN_TIMESTAMP));
Arc::new(OpChainSpec {
inner: ChainSpec {
@@ -420,8 +427,7 @@ mod test {
fn test_well_formed_attributes_jovian_valid() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let attributes =
get_attributes(Some(b64!("0000000000000000")), Some(1), BASE_SEPOLIA_JOVIAN_TIMESTAMP);
let attributes = get_attributes(Some(b64!("0000000000000000")), Some(1), JOVIAN_TIMESTAMP);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
OpEngineTypes,
@@ -436,7 +442,7 @@ mod test {
fn test_malformed_attributes_jovian_with_eip_1559_params_none() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let attributes = get_attributes(None, Some(1), BASE_SEPOLIA_JOVIAN_TIMESTAMP);
let attributes = get_attributes(None, Some(1), JOVIAN_TIMESTAMP);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
OpEngineTypes,
@@ -466,8 +472,7 @@ mod test {
fn test_malformed_attributes_post_jovian_with_min_base_fee_none() {
let validator =
OpEngineValidator::new::<KeccakKeyHasher>(get_chainspec(), NoopProvider::default());
let attributes =
get_attributes(Some(b64!("0000000000000000")), None, BASE_SEPOLIA_JOVIAN_TIMESTAMP);
let attributes = get_attributes(Some(b64!("0000000000000000")), None, JOVIAN_TIMESTAMP);
let result = <engine::OpEngineValidator<_, _, _> as EngineApiValidator<
OpEngineTypes,

View File

@@ -146,7 +146,6 @@ where
EngineCapabilities::new(OP_ENGINE_CAPABILITIES.iter().copied()),
engine_validator,
ctx.config.engine.accept_execution_requests_hash,
ctx.node.network().clone(),
);
Ok(OpEngineApi::new(inner))

View File

@@ -1,123 +0,0 @@
//! Tests for custom genesis block number support.
use alloy_consensus::BlockHeader;
use alloy_genesis::Genesis;
use alloy_primitives::B256;
use reth_chainspec::EthChainSpec;
use reth_db::test_utils::create_test_rw_db_with_path;
use reth_e2e_test_utils::{
node::NodeTestContext, transaction::TransactionTestContext, wallet::Wallet,
};
use reth_node_builder::{EngineNodeLauncher, Node, NodeBuilder, NodeConfig};
use reth_node_core::args::DatadirArgs;
use reth_optimism_chainspec::OpChainSpecBuilder;
use reth_optimism_node::{utils::optimism_payload_attributes, OpNode};
use reth_provider::{providers::BlockchainProvider, HeaderProvider, StageCheckpointReader};
use reth_stages_types::StageId;
use std::sync::Arc;
use tokio::sync::Mutex;
/// Tests that an OP node can initialize with a custom genesis block number.
#[tokio::test]
async fn test_op_node_custom_genesis_number() {
reth_tracing::init_test_tracing();
let genesis_number = 1000;
// Create genesis with custom block number (1000)
let mut genesis: Genesis =
serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
genesis.number = Some(genesis_number);
genesis.parent_hash = Some(B256::random());
let chain_spec =
Arc::new(OpChainSpecBuilder::base_mainnet().genesis(genesis).ecotone_activated().build());
let wallet = Arc::new(Mutex::new(Wallet::default().with_chain_id(chain_spec.chain().into())));
// Configure and launch the node
let config = NodeConfig::new(chain_spec.clone()).with_datadir_args(DatadirArgs {
datadir: reth_db::test_utils::tempdir_path().into(),
..Default::default()
});
let db = create_test_rw_db_with_path(
config
.datadir
.datadir
.unwrap_or_chain_default(config.chain.chain(), config.datadir.clone())
.db(),
);
let tasks = reth_tasks::TaskManager::current();
let node_handle = NodeBuilder::new(config.clone())
.with_database(db)
.with_types_and_provider::<OpNode, BlockchainProvider<_>>()
.with_components(OpNode::default().components())
.with_add_ons(OpNode::new(Default::default()).add_ons())
.launch_with_fn(|builder| {
let launcher = EngineNodeLauncher::new(
tasks.executor(),
builder.config.datadir(),
Default::default(),
);
builder.launch_with(launcher)
})
.await
.expect("Failed to launch node");
let mut node =
NodeTestContext::new(node_handle.node, optimism_payload_attributes).await.unwrap();
// Verify stage checkpoints are initialized to genesis block number (1000)
for stage in StageId::ALL {
let checkpoint = node.inner.provider.get_stage_checkpoint(stage).unwrap();
assert!(checkpoint.is_some(), "Stage {:?} checkpoint should exist", stage);
assert_eq!(
checkpoint.unwrap().block_number,
1000,
"Stage {:?} checkpoint should be at genesis block 1000",
stage
);
}
// Query genesis block should succeed
let genesis_header = node.inner.provider.header_by_number(genesis_number).unwrap();
assert!(genesis_header.is_some(), "Genesis block at {} should exist", genesis_number);
// Query blocks before genesis should return None
for block_num in [0, 1, genesis_number - 1] {
let header = node.inner.provider.header_by_number(block_num).unwrap();
assert!(header.is_none(), "Block {} before genesis should not exist", block_num);
}
// Advance the chain with a single block
let _ = wallet; // wallet available for future use
let block_payloads = node
.advance(1, |_| {
Box::pin({
let value = wallet.clone();
async move {
let mut wallet = value.lock().await;
let tx_fut = TransactionTestContext::optimism_l1_block_info_tx(
wallet.chain_id,
wallet.inner.clone(),
wallet.inner_nonce,
);
wallet.inner_nonce += 1;
tx_fut.await
}
})
})
.await
.unwrap();
assert_eq!(block_payloads.len(), 1);
let block = block_payloads.first().unwrap().block();
// Verify the new block is at 1001 (genesis 1000 + 1)
assert_eq!(
block.number(),
1001,
"Block number should be 1001 after advancing from genesis 100"
);
}

View File

@@ -6,6 +6,4 @@ mod priority;
mod rpc;
mod custom_genesis;
const fn main() {}

View File

@@ -76,7 +76,6 @@ arbitrary = [
]
keccak-cache-global = [
"reth-optimism-node?/keccak-cache-global",
"reth-node-core?/keccak-cache-global",
]
test-utils = [
"reth-chainspec/test-utils",

View File

@@ -7,7 +7,7 @@ use alloy_rpc_types_eth::{Log, TransactionReceipt};
use op_alloy_consensus::{OpReceipt, OpTransaction};
use op_alloy_rpc_types::{L1BlockInfo, OpTransactionReceipt, OpTransactionReceiptFields};
use op_revm::estimate_tx_compressed_size;
use reth_chainspec::{ChainSpecProvider, EthChainSpec};
use reth_chainspec::ChainSpecProvider;
use reth_node_api::NodePrimitives;
use reth_optimism_evm::RethL1BlockInfo;
use reth_optimism_forks::OpHardforks;
@@ -74,11 +74,9 @@ where
let mut l1_block_info = match reth_optimism_evm::extract_l1_info(block.body()) {
Ok(l1_block_info) => l1_block_info,
Err(err) => {
let genesis_number =
self.provider.chain_spec().genesis().number.unwrap_or_default();
// If it is the genesis block (i.e. block number is 0), there is no L1 info, so
// we return an empty l1_block_info.
if block.header().number() == genesis_number {
if block.header().number() == 0 {
return Ok(vec![]);
}
return Err(err.into());

View File

@@ -19,11 +19,8 @@ pub trait PayloadTransactions {
ctx: (),
) -> Option<Self::Transaction>;
/// Marks the transaction identified by `sender` and `nonce` as invalid for this iterator.
///
/// Implementations must ensure that subsequent transactions returned from this iterator do not
/// depend on this transaction. For example, they may choose to stop yielding any further
/// transactions from this sender in the current iteration.
/// Exclude descendants of the transaction with given sender and nonce from the iterator,
/// because this transaction won't be included in the block.
fn mark_invalid(&mut self, sender: Address, nonce: u64);
}
@@ -49,9 +46,6 @@ impl<T> PayloadTransactions for NoopPayloadTransactions<T> {
/// Wrapper struct that allows to convert `BestTransactions` (used in tx pool) to
/// `PayloadTransactions` (used in block composition).
///
/// Note: `mark_invalid` for this type filters out all further transactions from the given sender
/// in the current iteration, mirroring the semantics of `BestTransactions::mark_invalid`.
#[derive(Debug)]
pub struct BestPayloadTransactions<T, I>
where

View File

@@ -179,7 +179,7 @@ impl<B: Block> SealedBlock<B> {
/// Recovers all senders from the transactions in the block.
///
/// Returns an error if any of the transactions fail to recover the sender.
/// Returns `None` if any of the transactions fail to recover the sender.
pub fn senders(&self) -> Result<Vec<Address>, RecoveryError> {
self.body().recover_signers()
}

View File

@@ -94,7 +94,7 @@ impl<H: Sealable> SealedHeader<H> {
*self.hash_ref()
}
/// This is the inverse of [`Self::seal_slow`] which returns the raw header and hash.
/// This is the inverse of [`Header::seal_slow`] which returns the raw header and hash.
pub fn split(self) -> (H, BlockHash) {
let hash = self.hash();
(self.header, hash)

View File

@@ -42,12 +42,13 @@ fn validate_blob_tx(
blob_sidecar.blobs.extend(blob_sidecar_ext.blobs);
blob_sidecar.proofs.extend(blob_sidecar_ext.proofs);
blob_sidecar.commitments.extend(blob_sidecar_ext.commitments);
}
// ensure exactly num_blobs blobs
blob_sidecar.blobs.truncate(num_blobs as usize);
blob_sidecar.proofs.truncate(num_blobs as usize);
blob_sidecar.commitments.truncate(num_blobs as usize);
if blob_sidecar.blobs.len() > num_blobs as usize {
blob_sidecar.blobs.truncate(num_blobs as usize);
blob_sidecar.proofs.truncate(num_blobs as usize);
blob_sidecar.commitments.truncate(num_blobs as usize);
}
}
tx.blob_versioned_hashes = blob_sidecar.versioned_hashes().collect();

View File

@@ -240,18 +240,6 @@ pub trait EngineApi<Engine: EngineTypes> {
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Option<Vec<BlobAndProofV2>>>;
/// Fetch blobs for the consensus layer from the blob store.
///
/// Returns a response of the same length as the request. Missing or older-version blobs are
/// returned as `null` elements.
///
/// Returns `null` if syncing.
#[method(name = "getBlobsV3")]
async fn get_blobs_v3(
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>>;
}
/// A subset of the ETH rpc interface: <https://ethereum.github.io/execution-apis/api-documentation>

View File

@@ -54,7 +54,6 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
EngineCapabilities::default(),
EthereumEngineValidator::new(MAINNET.clone()),
false,
NoopNetwork::default(),
);
let module = AuthRpcModule::new(engine_api);
module.start_server(config).await.unwrap()

View File

@@ -23,7 +23,6 @@ reth-tasks.workspace = true
reth-engine-primitives.workspace = true
reth-transaction-pool.workspace = true
reth-primitives-traits.workspace = true
reth-network-api.workspace = true
# ethereum
alloy-eips.workspace = true

View File

@@ -19,7 +19,6 @@ pub const CAPABILITIES: &[&str] = &[
"engine_getPayloadBodiesByRangeV1",
"engine_getBlobsV1",
"engine_getBlobsV2",
"engine_getBlobsV3",
];
// The list of all supported Engine capabilities available over the engine endpoint.

View File

@@ -18,7 +18,6 @@ use async_trait::async_trait;
use jsonrpsee_core::{server::RpcModule, RpcResult};
use reth_chainspec::EthereumHardforks;
use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
use reth_network_api::NetworkInfo;
use reth_payload_builder::PayloadStore;
use reth_payload_primitives::{
validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
@@ -95,9 +94,7 @@ where
capabilities: EngineCapabilities,
validator: Validator,
accept_execution_requests_hash: bool,
network: impl NetworkInfo + 'static,
) -> Self {
let is_syncing = Arc::new(move || network.is_syncing());
let inner = Arc::new(EngineApiInner {
provider,
chain_spec,
@@ -110,7 +107,6 @@ where
tx_pool,
validator,
accept_execution_requests_hash,
is_syncing,
});
Self { inner }
}
@@ -796,35 +792,6 @@ where
.map_err(|err| EngineApiError::Internal(Box::new(err)))
}
fn get_blobs_v3(
&self,
versioned_hashes: Vec<B256>,
) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
// Check if Osaka fork is active
let current_timestamp =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
return Err(EngineApiError::EngineObjectValidationError(
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
));
}
if versioned_hashes.len() > MAX_BLOB_LIMIT {
return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
}
// Spec requires returning `null` if syncing.
if (*self.inner.is_syncing)() {
return Ok(None)
}
self.inner
.tx_pool
.get_blobs_for_versioned_hashes_v3(&versioned_hashes)
.map(Some)
.map_err(|err| EngineApiError::Internal(Box::new(err)))
}
/// Metered version of `get_blobs_v2`.
pub fn get_blobs_v2_metered(
&self,
@@ -860,27 +827,6 @@ where
res
}
/// Metered version of `get_blobs_v3`.
pub fn get_blobs_v3_metered(
&self,
versioned_hashes: Vec<B256>,
) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
let hashes_len = versioned_hashes.len();
let start = Instant::now();
let res = Self::get_blobs_v3(self, versioned_hashes);
self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
if let Ok(Some(blobs)) = &res {
let blobs_found = blobs.iter().flatten().count();
let blobs_missed = hashes_len - blobs_found;
self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
}
res
}
}
// This is the concrete ethereum engine API implementation.
@@ -1153,14 +1099,6 @@ where
trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
Ok(self.get_blobs_v2_metered(versioned_hashes)?)
}
async fn get_blobs_v3(
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
Ok(self.get_blobs_v3_metered(versioned_hashes)?)
}
}
impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
@@ -1217,8 +1155,6 @@ struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSp
/// Engine validator.
validator: Validator,
accept_execution_requests_hash: bool,
/// Returns `true` if the node is currently syncing.
is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
}
#[cfg(test)]
@@ -1226,13 +1162,10 @@ mod tests {
use super::*;
use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
use assert_matches::assert_matches;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_chainspec::{ChainSpec, MAINNET};
use reth_engine_primitives::BeaconEngineMessage;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_ethereum_primitives::Block;
use reth_network_api::{
noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
};
use reth_node_ethereum::EthereumEngineValidator;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_provider::test_utils::MockEthProvider;
@@ -1273,7 +1206,6 @@ mod tests {
EngineCapabilities::default(),
EthereumEngineValidator::new(chain_spec.clone()),
false,
NoopNetwork::default(),
);
let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
(handle, api)
@@ -1315,76 +1247,6 @@ mod tests {
assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
}
#[derive(Clone)]
struct TestNetworkInfo {
syncing: bool,
}
impl NetworkInfo for TestNetworkInfo {
fn local_addr(&self) -> std::net::SocketAddr {
(std::net::Ipv4Addr::UNSPECIFIED, 0).into()
}
async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
#[allow(deprecated)]
Ok(NetworkStatus {
client_version: "test".to_string(),
protocol_version: 5,
eth_protocol_info: EthProtocolInfo {
network: 1,
difficulty: None,
genesis: Default::default(),
config: Default::default(),
head: Default::default(),
},
capabilities: vec![],
})
}
fn chain_id(&self) -> u64 {
1
}
fn is_syncing(&self) -> bool {
self.syncing
}
fn is_initially_syncing(&self) -> bool {
self.syncing
}
}
#[tokio::test]
async fn get_blobs_v3_returns_null_when_syncing() {
let chain_spec: Arc<ChainSpec> =
Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
let provider = Arc::new(MockEthProvider::default());
let payload_store = spawn_test_payload_service::<EthEngineTypes>();
let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
let api = EngineApi::new(
provider,
chain_spec.clone(),
ConsensusEngineHandle::new(to_engine),
payload_store.into(),
NoopTransactionPool::default(),
Box::<TokioTaskExecutor>::default(),
ClientVersionV1 {
code: ClientCode::RH,
name: "Reth".to_string(),
version: "v0.0.0-test".to_string(),
commit: "test".to_string(),
},
EngineCapabilities::default(),
EthereumEngineValidator::new(chain_spec),
false,
TestNetworkInfo { syncing: true },
);
let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
assert_matches!(res, Ok(None));
}
// tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
mod get_payload_bodies {
use super::*;

View File

@@ -46,8 +46,6 @@ pub(crate) struct EngineApiLatencyMetrics {
pub(crate) get_blobs_v1: Histogram,
/// Latency for `engine_getBlobsV2`
pub(crate) get_blobs_v2: Histogram,
/// Latency for `engine_getBlobsV3`
pub(crate) get_blobs_v3: Histogram,
}
#[derive(Metrics)]

View File

@@ -92,7 +92,7 @@ where
///
/// Can fail if the element is rejected by the limiter or if we fail to grow an empty map.
///
/// See [`LruMap::insert`] for more info.
/// See [`Schnellru::insert`](LruMap::insert) for more info.
pub fn insert<'a>(&mut self, key: L::KeyToInsert<'a>, value: V) -> bool
where
L::KeyToInsert<'a>: Hash + PartialEq<K>,

View File

@@ -75,7 +75,6 @@ reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-downloaders.workspace = true
reth-static-file.workspace = true
reth-stages-api = { workspace = true, features = ["test-utils"] }
reth-storage-api.workspace = true
reth-testing-utils.workspace = true
reth-trie = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
@@ -117,7 +116,6 @@ test-utils = [
"reth-ethereum-primitives?/test-utils",
"reth-evm-ethereum/test-utils",
]
rocksdb = ["reth-provider/rocksdb"]
[[bench]]
name = "criterion"

View File

@@ -23,7 +23,7 @@ use reth_stages_api::{
};
use reth_static_file_types::StaticFileSegment;
use std::{
cmp::{max, Ordering},
cmp::Ordering,
ops::RangeInclusive,
sync::Arc,
task::{ready, Context, Poll},
@@ -620,11 +620,7 @@ where
// Otherwise, we recalculate the whole stage checkpoint including the amount of gas
// already processed, if there's any.
_ => {
let genesis_block_number = provider.genesis_block_number();
let processed = calculate_gas_used_from_headers(
provider,
genesis_block_number..=max(start_block - 1, genesis_block_number),
)?;
let processed = calculate_gas_used_from_headers(provider, 0..=start_block - 1)?;
ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: start_block, to: max_block },

View File

@@ -5,7 +5,7 @@ use reth_consensus::ConsensusError;
use reth_primitives_traits::{GotExpected, SealedHeader};
use reth_provider::{
ChainStateBlockReader, DBProvider, HeaderProvider, ProviderError, PruneCheckpointReader,
PruneCheckpointWriter, StageCheckpointReader, StageCheckpointWriter, TrieWriter,
PruneCheckpointWriter, StageCheckpointReader, TrieWriter,
};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneSegment, MERKLE_CHANGESETS_RETENTION_BLOCKS,
@@ -300,7 +300,6 @@ where
+ DBProvider
+ HeaderProvider
+ ChainStateBlockReader
+ StageCheckpointWriter
+ PruneCheckpointReader
+ PruneCheckpointWriter,
{
@@ -405,28 +404,6 @@ where
computed_range.start = computed_range.end;
}
// If we've unwound so far that there are no longer enough trie changesets available then
// simply clear them and the checkpoints, so that on next pipeline startup they will be
// regenerated.
debug!(
target: "sync::stages::merkle_changesets",
?computed_range,
retention_blocks=?self.retention_blocks,
"Checking if computed range is over retention threshold",
);
if computed_range.end - computed_range.start < self.retention_blocks {
debug!(
target: "sync::stages::merkle_changesets",
?computed_range,
retention_blocks=?self.retention_blocks,
"Clearing checkpoints completely",
);
provider.clear_trie_changesets()?;
provider
.save_stage_checkpoint(StageId::MerkleChangeSets, StageCheckpoint::default())?;
return Ok(UnwindOutput { checkpoint: StageCheckpoint::default() })
}
// `computed_range.end` is exclusive
let checkpoint = StageCheckpoint::new(computed_range.end.saturating_sub(1));

View File

@@ -3,16 +3,17 @@ use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
use reth_db_api::{
table::{Decode, Decompress, Value},
cursor::{DbCursorRO, DbCursorRW},
table::Value,
tables,
transaction::DbTxMut,
RawKey, RawValue,
};
use reth_etl::Collector;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
use reth_provider::{
BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
TransactionsProvider, TransactionsProviderExt,
BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
@@ -64,9 +65,7 @@ where
+ PruneCheckpointReader
+ StatsReader
+ StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
+ TransactionsProviderExt
+ StorageSettingsCache
+ RocksDBProviderFactory,
+ TransactionsProviderExt,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -151,27 +150,16 @@ where
);
if range_output.is_final_range {
let total_hashes = hash_collector.len();
let interval = (total_hashes / 10).max(1);
// Use append mode when table is empty (first sync) - significantly faster
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
let mut txhash_cursor = provider
.tx_ref()
.cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
let total_hashes = hash_collector.len();
let interval = (total_hashes / 10).max(1);
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash_bytes, number_bytes) = hash_to_number?;
let (hash, number) = hash_to_number?;
if index > 0 && index.is_multiple_of(interval) {
info!(
target: "sync::stages::transaction_lookup",
@@ -181,16 +169,12 @@ where
);
}
// Decode from raw ETL bytes
let hash = TxHash::decode(&hash_bytes)?;
let tx_num = TxNumber::decompress(&number_bytes)?;
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
let key = RawKey::<TxHash>::from_vec(hash);
if append_only {
txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
} else {
txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
}
}
trace!(target: "sync::stages::transaction_lookup",
@@ -215,19 +199,11 @@ where
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
// Cursor to unwind tx hash to number
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
let static_file_provider = provider.static_file_provider();
let rev_walker = provider
.block_body_indices_range(range.clone())?
@@ -242,18 +218,15 @@ where
// Delete all transactions that belong to this block
for tx_id in body.tx_num_range() {
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
writer.delete_transaction_hash_number(transaction.trie_hash())?;
// First delete the transaction and hash to id mapping
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
{
tx_hash_number_cursor.delete_current()?;
}
}
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
@@ -293,7 +266,7 @@ mod tests {
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_db_api::transaction::DbTx;
use reth_ethereum_primitives::Block;
use reth_primitives_traits::SealedBlock;
use reth_provider::{
@@ -608,160 +581,4 @@ mod tests {
self.ensure_no_hash_by_block(input.unwind_to)
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
/// writes transaction hash mappings to `RocksDB` instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
let (previous_stage, stage_progress) = (110, 100);
let mut rng = generators::rng();
// Set up the runner
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Insert blocks with transactions
let blocks = random_block_range(
&mut rng,
stage_progress + 1..=previous_stage,
BlockRangeParams {
parent: Some(B256::ZERO),
tx_count: 1..3, // Ensure we have transactions
..Default::default()
},
);
runner
.db
.insert_blocks(blocks.iter(), StorageKind::Static)
.expect("failed to insert blocks");
// Count expected transactions
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
assert!(expected_tx_count > 0, "test requires at least one transaction");
// Execute the stage
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert!(result.is_ok(), "stage execution failed: {:?}", result);
// Verify MDBX table is empty (data should be in RocksDB)
let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
assert_eq!(
mdbx_count, 0,
"MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
);
// Verify RocksDB has the data
let rocksdb = runner.db.factory.rocksdb_provider();
let mut rocksdb_count = 0;
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
rocksdb_count += 1;
}
}
assert_eq!(
rocksdb_count, expected_tx_count,
"RocksDB should contain all transaction hashes"
);
}
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
/// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX.
#[tokio::test]
async fn unwind_deletes_from_rocksdb_when_enabled() {
let (previous_stage, stage_progress) = (110, 100);
let mut rng = generators::rng();
// Set up the runner
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
// Insert blocks with transactions
let blocks = random_block_range(
&mut rng,
stage_progress + 1..=previous_stage,
BlockRangeParams {
parent: Some(B256::ZERO),
tx_count: 1..3, // Ensure we have transactions
..Default::default()
},
);
runner
.db
.insert_blocks(blocks.iter(), StorageKind::Static)
.expect("failed to insert blocks");
// Count expected transactions
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
assert!(expected_tx_count > 0, "test requires at least one transaction");
// Execute the stage first to populate RocksDB
let exec_input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
let rx = runner.execute(exec_input);
let result = rx.await.unwrap();
assert!(result.is_ok(), "stage execution failed: {:?}", result);
// Verify RocksDB has the data before unwind
let rocksdb = runner.db.factory.rocksdb_provider();
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(
result.is_some(),
"Transaction hash {:?} should exist before unwind",
hash
);
}
}
// Now unwind to stage_progress (removing all the blocks we added)
let unwind_input = UnwindInput {
checkpoint: StageCheckpoint::new(previous_stage),
unwind_to: stage_progress,
bad_block: None,
};
let unwind_result = runner.unwind(unwind_input).await;
assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
// Verify RocksDB data is deleted after unwind
let rocksdb = runner.db.factory.rocksdb_provider();
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(
result.is_none(),
"Transaction hash {:?} should be deleted from RocksDB after unwind",
hash
);
}
}
}
}
}

View File

@@ -50,7 +50,7 @@ impl Default for TestStageDB {
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create test provider factory"),
}
@@ -68,7 +68,7 @@ impl TestStageDB {
create_test_rw_db_with_path(path),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create test provider factory"),
}

View File

@@ -30,7 +30,7 @@ pub trait DbTxUnwindExt: DbTxMut {
let mut deleted = 0;
while let Some(Ok((entry_key, _))) = reverse_walker.next() {
if selector(entry_key) <= key {
if selector(entry_key.clone()) <= key {
break
}
reverse_walker.delete_current()?;

View File

@@ -100,7 +100,6 @@ where
+ StateWriter
+ TrieWriter
+ MetadataWriter
+ ChainSpecProvider
+ AsRef<PF::ProviderRW>,
PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
{
@@ -127,7 +126,6 @@ where
+ StateWriter
+ TrieWriter
+ MetadataWriter
+ ChainSpecProvider
+ AsRef<PF::ProviderRW>,
PF::ChainSpec: EthChainSpec<Header = <PF::Primitives as NodePrimitives>::BlockHeader>,
{
@@ -136,12 +134,9 @@ where
let genesis = chain.genesis();
let hash = chain.genesis_hash();
// Get the genesis block number from the chain spec
let genesis_block_number = chain.genesis_header().number();
// Check if we already have the genesis header or if we have the wrong one.
match factory.block_hash(genesis_block_number) {
Ok(None) | Err(ProviderError::MissingStaticFileBlock(StaticFileSegment::Headers, _)) => {}
match factory.block_hash(0) {
Ok(None) | Err(ProviderError::MissingStaticFileBlock(StaticFileSegment::Headers, 0)) => {}
Ok(Some(block_hash)) => {
if block_hash == hash {
// Some users will at times attempt to re-sync from scratch by just deleting the
@@ -184,26 +179,15 @@ where
// compute state root to populate trie tables
compute_state_root(&provider_rw, None)?;
// set stage checkpoint to genesis block number for all stages
let checkpoint = StageCheckpoint::new(genesis_block_number);
// insert sync stage
for stage in StageId::ALL {
provider_rw.save_stage_checkpoint(stage, checkpoint)?;
provider_rw.save_stage_checkpoint(stage, Default::default())?;
}
// Static file segments start empty, so we need to initialize the genesis block.
let static_file_provider = provider_rw.static_file_provider();
// Static file segments start empty, so we need to initialize the genesis block.
// For genesis blocks with non-zero block numbers, we need to use get_writer() instead of
// latest_writer() to ensure the genesis block is stored in the correct static file range.
static_file_provider
.get_writer(genesis_block_number, StaticFileSegment::Receipts)?
.user_header_mut()
.set_block_range(genesis_block_number, genesis_block_number);
static_file_provider
.get_writer(genesis_block_number, StaticFileSegment::Transactions)?
.user_header_mut()
.set_block_range(genesis_block_number, genesis_block_number);
static_file_provider.latest_writer(StaticFileSegment::Receipts)?.increment_block(0)?;
static_file_provider.latest_writer(StaticFileSegment::Transactions)?.increment_block(0)?;
// Behaviour reserved only for new nodes should be set here.
provider_rw.write_storage_settings(storage_settings)?;
@@ -226,11 +210,9 @@ where
+ DBProvider<Tx: DbTxMut>
+ HeaderProvider
+ StateWriter
+ ChainSpecProvider
+ AsRef<Provider>,
{
let genesis_block_number = provider.chain_spec().genesis_header().number();
insert_state(provider, alloc, genesis_block_number)
insert_state(provider, alloc, 0)
}
/// Inserts state at given block into database.
@@ -353,10 +335,9 @@ pub fn insert_genesis_history<'a, 'b, Provider>(
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut> + HistoryWriter + ChainSpecProvider,
Provider: DBProvider<Tx: DbTxMut> + HistoryWriter,
{
let genesis_block_number = provider.chain_spec().genesis_header().number();
insert_history(provider, alloc, genesis_block_number)
insert_history(provider, alloc, 0)
}
/// Inserts history indices for genesis accounts and storage.
@@ -396,37 +377,17 @@ where
let (header, block_hash) = (chain.genesis_header(), chain.genesis_hash());
let static_file_provider = provider.static_file_provider();
// Get the actual genesis block number from the header
let genesis_block_number = header.number();
match static_file_provider.block_hash(genesis_block_number) {
Ok(None) | Err(ProviderError::MissingStaticFileBlock(StaticFileSegment::Headers, _)) => {
let difficulty = header.difficulty();
// For genesis blocks with non-zero block numbers, we need to ensure they are stored
// in the correct static file range. We use get_writer() with the genesis block number
// to ensure the genesis block is stored in the correct static file range.
let mut writer = static_file_provider
.get_writer(genesis_block_number, StaticFileSegment::Headers)?;
// For non-zero genesis blocks, we need to set block range to genesis_block_number and
// append header without increment block
if genesis_block_number > 0 {
writer
.user_header_mut()
.set_block_range(genesis_block_number, genesis_block_number);
writer.append_header_direct(header, difficulty, &block_hash)?;
} else {
// For zero genesis blocks, use normal append_header
writer.append_header(header, &block_hash)?;
}
match static_file_provider.block_hash(0) {
Ok(None) | Err(ProviderError::MissingStaticFileBlock(StaticFileSegment::Headers, 0)) => {
let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
writer.append_header(header, &block_hash)?;
}
Ok(Some(_)) => {}
Err(e) => return Err(e),
}
provider.tx_ref().put::<tables::HeaderNumbers>(block_hash, genesis_block_number)?;
provider.tx_ref().put::<tables::BlockBodyIndices>(genesis_block_number, Default::default())?;
provider.tx_ref().put::<tables::HeaderNumbers>(block_hash, 0)?;
provider.tx_ref().put::<tables::BlockBodyIndices>(0, Default::default())?;
Ok(())
}

View File

@@ -187,21 +187,6 @@ impl<'a> EitherWriter<'a, (), ()> {
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
/// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
///
/// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
/// `None` for other variants.
///
/// This is used to defer `RocksDB` commits to the provider level, ensuring all
/// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
#[cfg(all(unix, feature = "rocksdb"))]
pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
match self {
Self::Database(_) | Self::StaticFile(_) => None,
Self::RocksDB(batch) => Some(batch.into_inner()),
}
}
/// Increment the block number.
///
/// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
@@ -319,24 +304,13 @@ where
CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
{
/// Puts a transaction hash number mapping.
///
/// When `append_only` is true, uses `cursor.append()` which is significantly faster
/// but requires entries to be inserted in order and the table to be empty.
/// When false, uses `cursor.insert()` which handles arbitrary insertion order.
pub fn put_transaction_hash_number(
&mut self,
hash: TxHash,
tx_num: TxNumber,
append_only: bool,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => {
if append_only {
Ok(cursor.append(hash, &tx_num)?)
} else {
Ok(cursor.insert(hash, &tx_num)?)
}
}
Self::Database(cursor) => Ok(cursor.upsert(hash, &tx_num)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
@@ -689,18 +663,12 @@ mod tests {
#[cfg(all(test, unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use crate::{
providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
test_utils::create_test_provider_factory,
RocksDBProviderFactory,
};
use crate::providers::rocksdb::{RocksDBBuilder, RocksDBProvider};
use alloy_primitives::{Address, B256};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
tables,
};
use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
use tempfile::TempDir;
fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
@@ -714,87 +682,6 @@ mod rocksdb_tests {
(temp_dir, provider)
}
/// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer
/// when the storage setting is enabled, and that put operations followed by commit
/// persist the data to `RocksDB`.
#[test]
fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash1 = B256::from([1u8; 32]);
let hash2 = B256::from([2u8; 32]);
let tx_num1 = 100u64;
let tx_num2 = 200u64;
// Get the RocksDB batch from the provider
let rocksdb = factory.rocksdb_provider();
let batch = rocksdb.batch();
// Create EitherWriter with RocksDB
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
// Verify we got a RocksDB writer
assert!(matches!(writer, EitherWriter::RocksDB(_)));
// Write transaction hash numbers (append_only=false since we're using RocksDB)
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
// Extract the batch and register with provider for commit
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
// Commit via provider - this commits RocksDB batch too
provider.commit().unwrap();
// Verify data was written to RocksDB
let rocksdb = factory.rocksdb_provider();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
}
/// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`.
#[test]
fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash = B256::from([1u8; 32]);
let tx_num = 100u64;
// First, write a value directly to RocksDB
let rocksdb = factory.rocksdb_provider();
rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
// Now delete using EitherWriter
let batch = rocksdb.batch();
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
writer.delete_transaction_hash_number(hash).unwrap();
// Extract the batch and commit via provider
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
provider.commit().unwrap();
// Verify deletion
let rocksdb = factory.rocksdb_provider();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
}
#[test]
fn test_rocksdb_batch_transaction_hash_numbers() {
let (_temp_dir, provider) = create_rocksdb_provider();
@@ -929,65 +816,4 @@ mod rocksdb_tests {
// Verify deletion
assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
}
/// Test that `RocksDB` commits happen at `provider.commit()` level, not at writer level.
///
/// This ensures all storage commits (MDBX, static files, `RocksDB`) happen atomically
/// in a single place, making it easier to reason about commit ordering and consistency.
#[test]
fn test_rocksdb_commits_at_provider_level() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash1 = B256::from([1u8; 32]);
let hash2 = B256::from([2u8; 32]);
let tx_num1 = 100u64;
let tx_num2 = 200u64;
// Get the RocksDB batch from the provider
let rocksdb = factory.rocksdb_provider();
let batch = rocksdb.batch();
// Create provider and EitherWriter
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
// Write transaction hash numbers (append_only=false since we're using RocksDB)
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
// Extract the raw batch from the writer and register it with the provider
let raw_batch = writer.into_raw_rocksdb_batch();
if let Some(batch) = raw_batch {
provider.set_pending_rocksdb_batch(batch);
}
// Data should NOT be visible yet (batch not committed)
let rocksdb = factory.rocksdb_provider();
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
None,
"Data should not be visible before provider.commit()"
);
// Commit the provider - this should commit both MDBX and RocksDB
provider.commit().unwrap();
// Now data should be visible in RocksDB
let rocksdb = factory.rocksdb_provider();
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
Some(tx_num1),
"Data should be visible after provider.commit()"
);
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
Some(tx_num2),
"Data should be visible after provider.commit()"
);
}
}

View File

@@ -181,11 +181,6 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.database.rocksdb_provider()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
}
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {

View File

@@ -153,11 +153,6 @@ impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
}
impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {

View File

@@ -151,6 +151,7 @@ impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
/// A provider struct that fetches data from the database.
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
#[derive(Debug)]
pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Database transaction.
tx: TX,
@@ -166,29 +167,10 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
storage_settings: Arc<RwLock<StorageSettings>>,
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
}
impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("DatabaseProvider");
s.field("tx", &self.tx)
.field("chain_spec", &self.chain_spec)
.field("static_file_provider", &self.static_file_provider)
.field("prune_modes", &self.prune_modes)
.field("storage", &self.storage)
.field("storage_settings", &self.storage_settings)
.field("rocksdb_provider", &self.rocksdb_provider);
#[cfg(all(unix, feature = "rocksdb"))]
s.field("pending_rocksdb_batches", &"<pending batches>");
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
}
}
impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
/// Returns reference to prune modes.
pub const fn prune_modes_ref(&self) -> &PruneModes {
@@ -277,11 +259,6 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
self.pending_rocksdb_batches.lock().push(batch);
}
}
impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
@@ -313,8 +290,6 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}
@@ -570,8 +545,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}
@@ -3205,7 +3178,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.prune_modes_ref()
}
/// Commit database transaction, static files, and pending `RocksDB` batches.
/// Commit database transaction and static files.
fn commit(self) -> ProviderResult<bool> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
@@ -3213,27 +3186,9 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
// checkpoints on the next start-up.
if self.static_file_provider.has_unwind_queued() {
self.tx.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
}
self.static_file_provider.commit()?;
} else {
self.static_file_provider.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
}
self.tx.commit()?;
}

View File

@@ -1,913 +0,0 @@
//! Invariant checking for `RocksDB` tables.
//!
//! This module provides consistency checks for tables stored in `RocksDB`, similar to the
//! consistency checks for static files. The goal is to detect and potentially heal
//! inconsistencies between `RocksDB` data and MDBX checkpoints.
use super::RocksDBProvider;
use crate::StaticFileProviderFactory;
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::BlockNumber;
use rayon::prelude::*;
use reth_db::cursor::DbCursorRO;
use reth_db_api::{tables, transaction::DbTx};
use reth_stages_types::StageId;
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{
DBProvider, StageCheckpointReader, StorageSettingsCache, TransactionsProvider,
};
use reth_storage_errors::provider::ProviderResult;
impl RocksDBProvider {
/// Checks consistency of `RocksDB` tables against MDBX stage checkpoints.
///
/// Returns an unwind target block number if the pipeline needs to unwind to rebuild
/// `RocksDB` data. Returns `None` if all invariants pass or if inconsistencies were healed.
///
/// # Invariants checked
///
/// For `TransactionHashNumbers`:
/// - The maximum `TxNumber` value should not exceed what the `TransactionLookup` stage
/// checkpoint indicates has been processed.
/// - If `RocksDB` is ahead, excess entries are pruned (healed).
/// - If `RocksDB` is behind, an unwind is required.
///
/// For `StoragesHistory`:
/// - The maximum block number in shards should not exceed the `IndexStorageHistory` stage
/// checkpoint.
/// - Similar healing/unwind logic applies.
///
/// # Requirements
///
/// For pruning `TransactionHashNumbers`, the provider must be able to supply transaction
/// data (typically from static files) so that transaction hashes can be computed. This
/// implies that static files should be ahead of or in sync with `RocksDB`.
pub fn check_consistency<Provider>(
&self,
provider: &Provider,
) -> ProviderResult<Option<BlockNumber>>
where
Provider: DBProvider
+ StageCheckpointReader
+ StorageSettingsCache
+ StaticFileProviderFactory
+ TransactionsProvider<Transaction: Encodable2718>,
{
let mut unwind_target: Option<BlockNumber> = None;
// Check TransactionHashNumbers if stored in RocksDB
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
let Some(target) = self.check_transaction_hash_numbers(provider)?
{
unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
}
// Check StoragesHistory if stored in RocksDB
if provider.cached_storage_settings().storages_history_in_rocksdb &&
let Some(target) = self.check_storages_history(provider)?
{
unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
}
Ok(unwind_target)
}
/// Checks invariants for the `TransactionHashNumbers` table.
///
/// Returns a block number to unwind to if MDBX is behind the checkpoint.
/// If static files are ahead of MDBX, excess `RocksDB` entries are pruned (healed).
///
/// # Approach
///
/// Instead of iterating `RocksDB` entries (which is expensive and doesn't give us the
/// tx range we need), we use static files and MDBX to determine what needs pruning:
/// - Static files are committed before `RocksDB`, so they're at least at the same height
/// - MDBX `TransactionBlocks` tells us what's been fully committed
/// - If static files have more transactions than MDBX, prune the excess range
fn check_transaction_hash_numbers<Provider>(
&self,
provider: &Provider,
) -> ProviderResult<Option<BlockNumber>>
where
Provider: DBProvider
+ StageCheckpointReader
+ StaticFileProviderFactory
+ TransactionsProvider<Transaction: Encodable2718>,
{
// Get the TransactionLookup stage checkpoint
let checkpoint = provider
.get_stage_checkpoint(StageId::TransactionLookup)?
.map(|cp| cp.block_number)
.unwrap_or(0);
// Get last tx_num from MDBX - this tells us what MDBX has fully committed
let mut cursor = provider.tx_ref().cursor_read::<tables::TransactionBlocks>()?;
let mdbx_last = cursor.last()?;
// Get highest tx_num from static files - this tells us what tx data is available
let highest_static_tx = provider
.static_file_provider()
.get_highest_static_file_tx(StaticFileSegment::Transactions);
match (mdbx_last, highest_static_tx) {
(Some((mdbx_tx, mdbx_block)), Some(highest_tx)) if highest_tx > mdbx_tx => {
// Static files are ahead of MDBX - prune RocksDB entries for the excess range.
// This is the common case during recovery from a crash during unwinding.
tracing::info!(
target: "reth::providers::rocksdb",
mdbx_last_tx = mdbx_tx,
mdbx_block,
highest_static_tx = highest_tx,
"Static files ahead of MDBX, pruning TransactionHashNumbers excess data"
);
self.prune_transaction_hash_numbers_in_range(provider, (mdbx_tx + 1)..=highest_tx)?;
// After pruning, check if MDBX is behind checkpoint
if checkpoint > mdbx_block {
tracing::warn!(
target: "reth::providers::rocksdb",
mdbx_block,
checkpoint,
"MDBX behind checkpoint after pruning, unwind needed"
);
return Ok(Some(mdbx_block));
}
}
(Some((_mdbx_tx, mdbx_block)), _) => {
// MDBX and static files are in sync (or static files don't have more data).
// Check if MDBX is behind checkpoint.
if checkpoint > mdbx_block {
tracing::warn!(
target: "reth::providers::rocksdb",
mdbx_block,
checkpoint,
"MDBX behind checkpoint, unwind needed"
);
return Ok(Some(mdbx_block));
}
}
(None, Some(highest_tx)) => {
// MDBX has no transactions but static files have data.
// This means RocksDB might have stale entries - prune them all.
tracing::info!(
target: "reth::providers::rocksdb",
highest_static_tx = highest_tx,
"MDBX empty but static files have data, pruning all TransactionHashNumbers"
);
self.prune_transaction_hash_numbers_in_range(provider, 0..=highest_tx)?;
}
(None, None) => {
// Both MDBX and static files are empty.
// If checkpoint says we should have data, that's an inconsistency.
if checkpoint > 0 {
tracing::warn!(
target: "reth::providers::rocksdb",
checkpoint,
"Checkpoint set but no transaction data exists, unwind needed"
);
return Ok(Some(0));
}
}
}
Ok(None)
}
/// Prunes `TransactionHashNumbers` entries for transactions in the given range.
///
/// This fetches transactions from the provider, computes their hashes in parallel,
/// and deletes the corresponding entries from `RocksDB` by key. This approach is more
/// scalable than iterating all rows because it only processes the transactions that
/// need to be pruned.
///
/// # Requirements
///
/// The provider must be able to supply transaction data (typically from static files)
/// so that transaction hashes can be computed. This implies that static files should
/// be ahead of or in sync with `RocksDB`.
fn prune_transaction_hash_numbers_in_range<Provider>(
&self,
provider: &Provider,
tx_range: std::ops::RangeInclusive<u64>,
) -> ProviderResult<()>
where
Provider: TransactionsProvider<Transaction: Encodable2718>,
{
if tx_range.is_empty() {
return Ok(());
}
// Fetch transactions in the range and compute their hashes in parallel
let hashes: Vec<_> = provider
.transactions_by_tx_range(tx_range.clone())?
.into_par_iter()
.map(|tx| tx.trie_hash())
.collect();
if !hashes.is_empty() {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = hashes.len(),
tx_range_start = *tx_range.start(),
tx_range_end = *tx_range.end(),
"Pruning TransactionHashNumbers entries by tx range"
);
let mut batch = self.batch();
for hash in hashes {
batch.delete::<tables::TransactionHashNumbers>(hash)?;
}
batch.commit()?;
}
Ok(())
}
/// Checks invariants for the `StoragesHistory` table.
///
/// Returns a block number to unwind to if `RocksDB` is behind the checkpoint.
/// If `RocksDB` is ahead of the checkpoint, excess entries are pruned (healed).
fn check_storages_history<Provider>(
&self,
provider: &Provider,
) -> ProviderResult<Option<BlockNumber>>
where
Provider: DBProvider + StageCheckpointReader,
{
// Get the IndexStorageHistory stage checkpoint
let checkpoint = provider
.get_stage_checkpoint(StageId::IndexStorageHistory)?
.map(|cp| cp.block_number)
.unwrap_or(0);
// Check if RocksDB has any data
let rocks_first = self.first::<tables::StoragesHistory>()?;
match rocks_first {
Some(_) => {
// If checkpoint is 0 but we have data, clear everything
if checkpoint == 0 {
tracing::info!(
target: "reth::providers::rocksdb",
"StoragesHistory has data but checkpoint is 0, clearing all"
);
self.prune_storages_history_above(0)?;
return Ok(None);
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries
let mut max_highest_block = 0u64;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let highest = key.sharded_key.highest_block_number;
if highest != u64::MAX && highest > max_highest_block {
max_highest_block = highest;
}
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
checkpoint,
"StoragesHistory ahead of checkpoint, pruning excess data"
);
self.prune_storages_history_above(checkpoint)?;
}
Ok(None)
}
None => {
// Empty RocksDB table
if checkpoint > 0 {
// Stage says we should have data but we don't
return Ok(Some(0));
}
Ok(None)
}
}
}
/// Prunes `StoragesHistory` entries where `highest_block_number` > `max_block`.
///
/// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate
/// and delete entries where `key.sharded_key.highest_block_number > max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let mut to_delete: Vec<StorageShardedKey> = Vec::new();
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let highest_block = key.sharded_key.highest_block_number;
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
to_delete.push(key);
}
}
let deleted = to_delete.len();
if deleted > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
max_block,
"Pruning StoragesHistory entries"
);
let mut batch = self.batch();
for key in to_delete {
batch.delete::<tables::StoragesHistory>(key)?;
}
batch.commit()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
providers::rocksdb::RocksDBBuilder, test_utils::create_test_provider_factory, BlockWriter,
DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
};
use alloy_primitives::{Address, B256};
use reth_db::cursor::DbCursorRW;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, StorageSettings},
tables::{self, BlockNumberList},
transaction::DbTxMut,
};
use reth_stages_types::StageCheckpoint;
use reth_testing_utils::generators::{self, BlockRangeParams};
use tempfile::TempDir;
#[test]
fn test_first_last_empty_rocksdb() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
// Empty RocksDB, no checkpoints - should be consistent
let first = provider.first::<tables::TransactionHashNumbers>().unwrap();
let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
assert!(first.is_none());
assert!(last.is_none());
}
#[test]
fn test_first_last_with_data() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
.build()
.unwrap();
// Insert some data
let tx_hash = B256::from([1u8; 32]);
provider.put::<tables::TransactionHashNumbers>(tx_hash, &100).unwrap();
// RocksDB has data
let last = provider.last::<tables::TransactionHashNumbers>().unwrap();
assert!(last.is_some());
assert_eq!(last.unwrap().1, 100);
}
#[test]
fn test_check_consistency_empty_rocksdb_no_checkpoint_is_ok() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy()
.with_transaction_hash_numbers_in_rocksdb(true)
.with_storages_history_in_rocksdb(true),
);
let provider = factory.database_provider_ro().unwrap();
// Empty RocksDB and no checkpoints - should be consistent (None = no unwind needed)
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_check_consistency_empty_rocksdb_with_checkpoint_needs_unwind() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
.build()
.unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB is empty but checkpoint says block 100 was processed
// This means RocksDB is missing data and we need to unwind to rebuild
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild RocksDB");
}
#[test]
fn test_check_consistency_mdbx_empty_static_files_have_data_prunes_rocksdb() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
.build()
.unwrap();
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
// Generate blocks with real transactions and insert them
let mut rng = generators::rng();
let blocks = generators::random_block_range(
&mut rng,
0..=2,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
);
let mut tx_hashes = Vec::new();
{
let provider = factory.database_provider_rw().unwrap();
let mut tx_count = 0u64;
for block in &blocks {
provider.insert_block(block.clone().try_recover().expect("recover block")).unwrap();
for tx in &block.body().transactions {
let hash = tx.trie_hash();
tx_hashes.push(hash);
rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
tx_count += 1;
}
}
provider.commit().unwrap();
}
// Simulate crash recovery: MDBX was reset but static files and RocksDB still have data.
// Clear TransactionBlocks to simulate empty MDBX state.
{
let provider = factory.database_provider_rw().unwrap();
let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
let mut to_delete = Vec::new();
let mut walker = cursor.walk(Some(0)).unwrap();
while let Some((tx_num, _)) = walker.next().transpose().unwrap() {
to_delete.push(tx_num);
}
drop(walker);
for tx_num in to_delete {
cursor.seek_exact(tx_num).unwrap();
cursor.delete_current().unwrap();
}
// No checkpoint set (checkpoint = 0)
provider.commit().unwrap();
}
// Verify RocksDB data exists
assert!(rocksdb.last::<tables::TransactionHashNumbers>().unwrap().is_some());
let provider = factory.database_provider_ro().unwrap();
// MDBX TransactionBlocks is empty, but static files have transaction data.
// This means RocksDB has stale data that should be pruned (healed).
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify data was pruned
for hash in &tx_hashes {
assert!(
rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
"RocksDB should be empty after pruning"
);
}
}
#[test]
fn test_check_consistency_storages_history_empty_with_checkpoint_needs_unwind() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB is empty but checkpoint says block 100 was processed
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild StoragesHistory");
}
#[test]
fn test_check_consistency_storages_history_has_data_no_checkpoint_prunes_data() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
// Insert data into RocksDB
let key = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
rocksdb.put::<tables::StoragesHistory>(key, &block_list).unwrap();
// Verify data exists
assert!(rocksdb.last::<tables::StoragesHistory>().unwrap().is_some());
// Create a test provider factory for MDBX with NO checkpoint
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
let provider = factory.database_provider_ro().unwrap();
// RocksDB has data but checkpoint is 0
// This means RocksDB has stale data that should be pruned (healed)
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify data was pruned
assert!(
rocksdb.last::<tables::StoragesHistory>().unwrap().is_none(),
"RocksDB should be empty after pruning"
);
}
#[test]
fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
.build()
.unwrap();
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
// Generate blocks with real transactions (blocks 0-2, 6 transactions total)
let mut rng = generators::rng();
let blocks = generators::random_block_range(
&mut rng,
0..=2,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
);
{
let provider = factory.database_provider_rw().unwrap();
let mut tx_count = 0u64;
for block in &blocks {
provider.insert_block(block.clone().try_recover().expect("recover block")).unwrap();
for tx in &block.body().transactions {
let hash = tx.trie_hash();
rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
tx_count += 1;
}
}
provider.commit().unwrap();
}
// Now simulate a scenario where checkpoint is ahead of MDBX.
// This happens when the checkpoint was saved but MDBX data was lost/corrupted.
// Set checkpoint to block 10 (beyond our actual data at block 2)
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(10))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// MDBX has data up to block 2, but checkpoint says block 10 was processed.
// The static files highest tx matches MDBX last tx (both at block 2).
// Checkpoint > mdbx_block means we need to unwind to rebuild.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result,
Some(2),
"Should require unwind to block 2 (MDBX's last block) to rebuild from checkpoint"
);
}
#[test]
fn test_check_consistency_rocksdb_ahead_of_checkpoint_prunes_excess() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
.build()
.unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
// Generate blocks with real transactions:
// Blocks 0-5, each with 2 transactions = 12 total transactions (0-11)
let mut rng = generators::rng();
let blocks = generators::random_block_range(
&mut rng,
0..=5,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
);
// Track which hashes belong to which blocks
let mut tx_hashes = Vec::new();
let mut tx_count = 0u64;
{
let provider = factory.database_provider_rw().unwrap();
// Insert ALL blocks (0-5) to write transactions to static files
for block in &blocks {
provider.insert_block(block.clone().try_recover().expect("recover block")).unwrap();
for tx in &block.body().transactions {
let hash = tx.trie_hash();
tx_hashes.push(hash);
rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
tx_count += 1;
}
}
provider.commit().unwrap();
}
// Simulate crash recovery scenario:
// MDBX was unwound to block 2, but RocksDB and static files still have more data.
// Remove TransactionBlocks entries for blocks 3-5 to simulate MDBX unwind.
{
let provider = factory.database_provider_rw().unwrap();
// Delete TransactionBlocks entries for tx > 5 (i.e., for blocks 3-5)
// TransactionBlocks maps last_tx_in_block -> block_number
// After unwind, only entries for blocks 0-2 should remain (tx 5 -> block 2)
let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
// Walk and delete entries where block > 2
let mut to_delete = Vec::new();
let mut walker = cursor.walk(Some(0)).unwrap();
while let Some((tx_num, block_num)) = walker.next().transpose().unwrap() {
if block_num > 2 {
to_delete.push(tx_num);
}
}
drop(walker);
for tx_num in to_delete {
cursor.seek_exact(tx_num).unwrap();
cursor.delete_current().unwrap();
}
// Set checkpoint to block 2
provider
.save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has tx hashes for all blocks (0-5)
// MDBX TransactionBlocks only goes up to tx 5 (block 2)
// Static files have data for all txs (0-11)
// This means RocksDB is ahead and should prune entries for tx 6-11
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify: hashes for blocks 0-2 (tx 0-5) should remain, blocks 3-5 (tx 6-11) should be
// pruned First 6 hashes should remain
for (i, hash) in tx_hashes.iter().take(6).enumerate() {
assert!(
rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_some(),
"tx {} should remain",
i
);
}
// Last 6 hashes should be pruned
for (i, hash) in tx_hashes.iter().skip(6).enumerate() {
assert!(
rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
"tx {} should be pruned",
i + 6
);
}
}
#[test]
fn test_check_consistency_storages_history_ahead_of_checkpoint_prunes_excess() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
// Insert data into RocksDB with different highest_block_numbers
let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
let key_block_100 = StorageShardedKey::new(Address::ZERO, B256::from([1u8; 32]), 100);
let key_block_150 = StorageShardedKey::new(Address::ZERO, B256::from([2u8; 32]), 150);
let key_block_max = StorageShardedKey::new(Address::ZERO, B256::from([3u8; 32]), u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_block_50.clone(), &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_block_100.clone(), &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_block_150.clone(), &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_block_max.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has entries with highest_block = 150 which exceeds checkpoint (100)
// Should prune entries where highest_block > 100 (but not u64::MAX sentinel)
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify key_block_150 was pruned, but others remain
assert!(
rocksdb.get::<tables::StoragesHistory>(key_block_50).unwrap().is_some(),
"Entry with highest_block=50 should remain"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(key_block_100).unwrap().is_some(),
"Entry with highest_block=100 should remain"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(key_block_150).unwrap().is_none(),
"Entry with highest_block=150 should be pruned"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(key_block_max).unwrap().is_some(),
"Entry with highest_block=u64::MAX (sentinel) should remain"
);
}
/// Test that pruning works by fetching transactions and computing their hashes,
/// rather than iterating all rows. This test uses random blocks with unique
/// transactions so we can verify the correct entries are pruned.
#[test]
fn test_prune_transaction_hash_numbers_by_range() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
.build()
.unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
// Generate random blocks with unique transactions
// Block 0 (genesis) has no transactions
// Blocks 1-5 each have 2 transactions = 10 transactions total
let mut rng = generators::rng();
let blocks = generators::random_block_range(
&mut rng,
0..=5,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
);
// Insert blocks into the database
let mut tx_count = 0u64;
let mut tx_hashes = Vec::new();
{
let provider = factory.database_provider_rw().unwrap();
for block in &blocks {
provider.insert_block(block.clone().try_recover().expect("recover block")).unwrap();
// Store transaction hash -> tx_number mappings in RocksDB
for tx in &block.body().transactions {
let hash = tx.trie_hash();
tx_hashes.push(hash);
rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_count).unwrap();
tx_count += 1;
}
}
// Set checkpoint to block 2 (meaning we should only have tx hashes for blocks 0-2)
// Blocks 0, 1, 2 have 6 transactions (2 each), so tx 0-5 should remain
provider
.save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(2))
.unwrap();
provider.commit().unwrap();
}
// At this point:
// - RocksDB has tx hashes for blocks 0-5 (10 total: 2 per block)
// - Checkpoint says we only processed up to block 2
// - We need to prune tx hashes for blocks 3, 4, 5 (tx 6-9)
// Verify RocksDB has the expected number of entries before pruning
let rocksdb_count_before: usize =
rocksdb.iter::<tables::TransactionHashNumbers>().unwrap().count();
assert_eq!(
rocksdb_count_before, tx_count as usize,
"RocksDB should have all {} transaction hashes before pruning",
tx_count
);
let provider = factory.database_provider_ro().unwrap();
// Verify we can fetch transactions by tx range
let all_txs = provider.transactions_by_tx_range(0..tx_count).unwrap();
assert_eq!(all_txs.len(), tx_count as usize, "Should be able to fetch all transactions");
// Verify the hashes match between what we stored and what we compute from fetched txs
for (i, tx) in all_txs.iter().enumerate() {
let computed_hash = tx.trie_hash();
assert_eq!(
computed_hash, tx_hashes[i],
"Hash mismatch for tx {}: stored {:?} vs computed {:?}",
i, tx_hashes[i], computed_hash
);
}
// Blocks 0, 1, 2 have 2 tx each = 6 tx total (indices 0-5)
// We want to keep tx 0-5, prune tx 6-9
let max_tx_to_keep = 5u64;
let tx_to_prune_start = max_tx_to_keep + 1;
// Prune transactions 6-9 (blocks 3-5)
rocksdb
.prune_transaction_hash_numbers_in_range(&provider, tx_to_prune_start..=(tx_count - 1))
.expect("prune should succeed");
// Verify: transactions 0-5 should remain, 6-9 should be pruned
let mut remaining_count = 0;
for result in rocksdb.iter::<tables::TransactionHashNumbers>().unwrap() {
let (_hash, tx_num) = result.unwrap();
assert!(
tx_num <= max_tx_to_keep,
"Transaction {} should have been pruned (> {})",
tx_num,
max_tx_to_keep
);
remaining_count += 1;
}
assert_eq!(
remaining_count,
(max_tx_to_keep + 1) as usize,
"Should have {} transactions (0-{})",
max_tx_to_keep + 1,
max_tx_to_keep
);
}
}

View File

@@ -1,7 +1,5 @@
//! [`RocksDBProvider`] implementation
mod invariants;
mod metrics;
mod provider;
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};

View File

@@ -380,65 +380,6 @@ impl RocksDBProvider {
})
}
/// Gets the first (smallest key) entry from the specified table.
pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
let cf = this.get_cf_handle::<T>()?;
let mut iter = this.0.db.iterator_cf(cf, IteratorMode::Start);
match iter.next() {
Some(Ok((key_bytes, value_bytes))) => {
let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
let value = T::Value::decompress(&value_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
Ok(Some((key, value)))
}
Some(Err(e)) => {
Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
})))
}
None => Ok(None),
}
})
}
/// Gets the last (largest key) entry from the specified table.
pub fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
let cf = this.get_cf_handle::<T>()?;
let mut iter = this.0.db.iterator_cf(cf, IteratorMode::End);
match iter.next() {
Some(Ok((key_bytes, value_bytes))) => {
let key = <T::Key as reth_db_api::table::Decode>::decode(&key_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
let value = T::Value::decompress(&value_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
Ok(Some((key, value)))
}
Some(Err(e)) => {
Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
})))
}
None => Ok(None),
}
})
}
/// Creates an iterator over all entries in the specified table.
///
/// Returns decoded `(Key, Value)` pairs in key order.
pub fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
let cf = self.get_cf_handle::<T>()?;
let iter = self.0.db.iterator_cf(cf, IteratorMode::Start);
Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
}
/// Writes a batch of operations atomically.
pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
where
@@ -450,19 +391,6 @@ impl RocksDBProvider {
batch_handle.commit()
})
}
/// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
///
/// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
/// and needs to be committed at a later point (e.g., at provider commit time).
pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
self.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
}
/// Handle for building a batch of operations atomically.
@@ -537,18 +465,6 @@ impl<'a> RocksDBBatch<'a> {
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
/// Returns a reference to the underlying `RocksDB` provider.
pub const fn provider(&self) -> &RocksDBProvider {
self.provider
}
/// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
///
/// This is used to defer commits to the provider level.
pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
self.inner
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
@@ -668,50 +584,6 @@ impl<'db> RocksTx<'db> {
}
}
/// Iterator over a `RocksDB` table (non-transactional).
///
/// Yields decoded `(Key, Value)` pairs in key order.
pub struct RocksDBIter<'db, T: Table> {
inner: rocksdb::DBIteratorWithThreadMode<'db, TransactionDB>,
_marker: std::marker::PhantomData<T>,
}
impl<T: Table> fmt::Debug for RocksDBIter<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBIter").field("table", &T::NAME).finish_non_exhaustive()
}
}
impl<T: Table> Iterator for RocksDBIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
let (key_bytes, value_bytes) = match self.inner.next()? {
Ok(kv) => kv,
Err(e) => {
return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))))
}
};
// Decode key
let key = match <T::Key as reth_db_api::table::Decode>::decode(&key_bytes) {
Ok(k) => k,
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
};
// Decompress value
let value = match T::Value::decompress(&value_bytes) {
Ok(v) => v,
Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))),
};
Some(Ok((key, value)))
}
}
/// Iterator over a `RocksDB` table within a transaction.
///
/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
@@ -1070,28 +942,4 @@ mod tests {
assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
}
}
#[test]
fn test_first_and_last_entry() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Empty table should return None for both
assert_eq!(provider.first::<TestTable>().unwrap(), None);
assert_eq!(provider.last::<TestTable>().unwrap(), None);
// Insert some entries
provider.put::<TestTable>(10, &b"value_10".to_vec()).unwrap();
provider.put::<TestTable>(20, &b"value_20".to_vec()).unwrap();
provider.put::<TestTable>(5, &b"value_5".to_vec()).unwrap();
// First should return the smallest key
let first = provider.first::<TestTable>().unwrap();
assert_eq!(first, Some((5, b"value_5".to_vec())));
// Last should return the largest key
let last = provider.last::<TestTable>().unwrap();
assert_eq!(last, Some((20, b"value_20".to_vec())));
}
}

View File

@@ -151,23 +151,6 @@ impl<N: NodePrimitives> StaticFileProviderBuilder<N> {
self
}
/// Sets the genesis block number for the [`StaticFileProvider`].
///
/// This configures the genesis block number, which is used to determine the starting point
/// for block indexing and querying operations.
///
/// # Arguments
///
/// * `genesis_block_number` - The block number of the genesis block.
///
/// # Returns
///
/// Returns `Self` to allow method chaining.
pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
self.inner.genesis_block_number = genesis_block_number;
self
}
/// Builds the final [`StaticFileProvider`] and initializes the index.
pub fn build(self) -> ProviderResult<StaticFileProvider<N>> {
let provider = StaticFileProvider(Arc::new(self.inner));
@@ -325,8 +308,6 @@ pub struct StaticFileProviderInner<N> {
blocks_per_file: HashMap<StaticFileSegment, u64>,
/// Write lock for when access is [`StaticFileAccess::RW`].
_lock_file: Option<StorageLock>,
/// Genesis block number, default is 0;
genesis_block_number: u64,
}
impl<N: NodePrimitives> StaticFileProviderInner<N> {
@@ -353,7 +334,6 @@ impl<N: NodePrimitives> StaticFileProviderInner<N> {
access,
blocks_per_file,
_lock_file,
genesis_block_number: 0,
};
Ok(provider)
@@ -429,11 +409,6 @@ impl<N: NodePrimitives> StaticFileProviderInner<N> {
block,
)
}
/// Get genesis block number
pub const fn genesis_block_number(&self) -> u64 {
self.genesis_block_number
}
}
impl<N: NodePrimitives> StaticFileProvider<N> {
@@ -1751,11 +1726,7 @@ impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
&self,
segment: StaticFileSegment,
) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
let genesis_number = self.0.as_ref().genesis_block_number();
self.get_writer(
self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
segment,
)
self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
}
fn commit(&self) -> ProviderResult<()> {

View File

@@ -363,9 +363,8 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
.as_ref()
.map(|block_range| block_range.end())
.or_else(|| {
(self.writer.user_header().expected_block_start() >
self.reader().genesis_block_number())
.then(|| self.writer.user_header().expected_block_start() - 1)
(self.writer.user_header().expected_block_start() > 0)
.then(|| self.writer.user_header().expected_block_start() - 1)
});
self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
@@ -646,37 +645,6 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
Ok(())
}
/// Appends header to static file without calling `increment_block`.
/// This is useful for genesis blocks with non-zero block numbers.
pub fn append_header_direct(
&mut self,
header: &N::BlockHeader,
total_difficulty: U256,
hash: &BlockHash,
) -> ProviderResult<()>
where
N::BlockHeader: Compact,
{
let start = Instant::now();
self.ensure_no_queued_prune()?;
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
self.append_column(header)?;
self.append_column(CompactU256::from(total_difficulty))?;
self.append_column(hash)?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
StaticFileSegment::Headers,
StaticFileProviderOperation::Append,
Some(start.elapsed()),
);
}
Ok(())
}
/// Appends transaction to static file.
///
/// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be

View File

@@ -1,5 +1,5 @@
use crate::{
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBBuilder, StaticFileProvider},
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
HashingWriter, ProviderFactory, TrieWriter,
};
use alloy_primitives::B256;
@@ -62,10 +62,7 @@ pub fn create_test_provider_factory_with_node_types<N: NodeTypesForProvider>(
db,
chain_spec,
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBBuilder::new(&rocksdb_dir)
.with_default_tables()
.build()
.expect("failed to create test RocksDB provider"),
RocksDBProvider::new(&rocksdb_dir).expect("failed to create test RocksDB provider"),
)
.expect("failed to create test provider factory")
}

View File

@@ -29,9 +29,4 @@ impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<
fn rocksdb_provider(&self) -> RocksDBProvider {
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
// No-op for NoopProvider
}
}

View File

@@ -6,11 +6,4 @@ use crate::providers::RocksDBProvider;
pub trait RocksDBProviderFactory {
/// Returns the `RocksDB` provider.
fn rocksdb_provider(&self) -> RocksDBProvider;
/// Adds a pending `RocksDB` batch to be committed when this provider is committed.
///
/// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file
/// commits, ensuring atomicity across all storage backends.
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
}

View File

@@ -26,9 +26,7 @@ tracing-logfmt.workspace = true
clap = { workspace = true, features = ["derive"] }
eyre.workspace = true
rolling-file.workspace = true
tracing-samply = { workspace = true, optional = true }
[features]
default = ["otlp"]
otlp = ["reth-tracing-otlp"]
samply = ["tracing-samply"]

View File

@@ -224,12 +224,6 @@ impl Tracer for RethTracer {
None
};
#[cfg(feature = "samply")]
layers.add_layer(
tracing_samply::SamplyLayer::new()
.map_err(|e| eyre::eyre!("Failed to create samply layer: {}", e))?,
);
// The error is returned if the global default subscriber is already set,
// so it's safe to ignore it
let _ = tracing_subscriber::registry().with(layers.into_inner()).try_init();

View File

@@ -59,73 +59,6 @@ impl DiskFileBlobStore {
fn clear_cache(&self) {
self.inner.blob_cache.lock().clear()
}
/// Look up EIP-7594 blobs by their versioned hashes.
///
/// This returns a result vector with the **same length and order** as the input
/// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
/// `None` if it is missing or an older sidecar version.
///
/// The lookup first scans the in-memory cache and, if not all blobs are found, falls back to
/// reading candidate sidecars from disk using the `versioned_hash -> tx_hash` index.
fn get_by_versioned_hashes_eip7594(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
// we must return the blobs in order but we don't necessarily find them in the requested
// order
let mut result = vec![None; versioned_hashes.len()];
// first scan all cached full sidecars
for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
for (hash_idx, match_result) in
blob_sidecar.match_versioned_hashes(versioned_hashes)
{
result[hash_idx] = Some(match_result);
}
}
// return early if all blobs are found.
if result.iter().all(|blob| blob.is_some()) {
return Ok(result);
}
}
// not all versioned hashes were found, try to look up a matching tx
let mut missing_tx_hashes = Vec::new();
{
let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
for (idx, _) in
result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
{
// this is safe because the result vec has the same len
let versioned_hash = versioned_hashes[idx];
if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
missing_tx_hashes.push(tx_hash);
}
}
}
// if we have missing blobs, try to read them from disk and try again
if !missing_tx_hashes.is_empty() {
let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
for (_, blob_sidecar) in blobs_from_disk {
if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
for (hash_idx, match_result) in
blob_sidecar.match_versioned_hashes(versioned_hashes)
{
if result[hash_idx].is_none() {
result[hash_idx] = Some(match_result);
}
}
}
}
}
Ok(result)
}
}
impl BlobStore for DiskFileBlobStore {
@@ -151,9 +84,6 @@ impl BlobStore for DiskFileBlobStore {
}
fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
if txs.is_empty() {
return Ok(())
}
let txs = self.inner.retain_existing(txs)?;
self.inner.txs_to_delete.write().extend(txs);
Ok(())
@@ -275,7 +205,58 @@ impl BlobStore for DiskFileBlobStore {
&self,
versioned_hashes: &[B256],
) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
let result = self.get_by_versioned_hashes_eip7594(versioned_hashes)?;
// we must return the blobs in order but we don't necessarily find them in the requested
// order
let mut result = vec![None; versioned_hashes.len()];
// first scan all cached full sidecars
for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
for (hash_idx, match_result) in
blob_sidecar.match_versioned_hashes(versioned_hashes)
{
result[hash_idx] = Some(match_result);
}
}
// return early if all blobs are found.
if result.iter().all(|blob| blob.is_some()) {
// got all blobs, can return early
return Ok(Some(result.into_iter().map(Option::unwrap).collect()))
}
}
// not all versioned hashes were found, try to look up a matching tx
let mut missing_tx_hashes = Vec::new();
{
let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
for (idx, _) in
result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
{
// this is safe because the result vec has the same len
let versioned_hash = versioned_hashes[idx];
if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
missing_tx_hashes.push(tx_hash);
}
}
}
// if we have missing blobs, try to read them from disk and try again
if !missing_tx_hashes.is_empty() {
let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
for (_, blob_sidecar) in blobs_from_disk {
if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
for (hash_idx, match_result) in
blob_sidecar.match_versioned_hashes(versioned_hashes)
{
if result[hash_idx].is_none() {
result[hash_idx] = Some(match_result);
}
}
}
}
}
// only return the blobs if we found all requested versioned hashes
if result.iter().all(|blob| blob.is_some()) {
@@ -285,13 +266,6 @@ impl BlobStore for DiskFileBlobStore {
}
}
fn get_by_versioned_hashes_v3(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
self.get_by_versioned_hashes_eip7594(versioned_hashes)
}
fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size_tracker.data_size())
}
@@ -682,12 +656,7 @@ pub enum OpenDiskFileBlobStore {
#[cfg(test)]
mod tests {
use alloy_consensus::BlobTransactionSidecar;
use alloy_eips::{
eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
eip7594::{
BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
},
};
use alloy_eips::eip7594::BlobTransactionSidecarVariant;
use super::*;
use std::sync::atomic::Ordering;
@@ -713,20 +682,6 @@ mod tests {
.collect()
}
fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
let blob = Blob::default();
let commitment = Bytes48::default();
let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
let expected =
BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
(BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
}
#[test]
fn disk_insert_all_get_all() {
let (store, _dir) = tmp_store();
@@ -896,33 +851,4 @@ mod tests {
assert_eq!(stat.delete_succeed, 3);
assert_eq!(stat.delete_failed, 0);
}
#[test]
fn disk_get_blobs_v3_returns_partial_results() {
let (store, _dir) = tmp_store();
let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
store.insert(TxHash::random(), sidecar).unwrap();
assert_ne!(versioned_hash, B256::ZERO);
let request = vec![versioned_hash, B256::ZERO];
let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
assert_eq!(v3, vec![Some(expected), None]);
}
#[test]
fn disk_get_blobs_v3_can_fallback_to_disk() {
let (store, _dir) = tmp_store();
let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
store.insert(TxHash::random(), sidecar).unwrap();
store.clear_cache();
let v3 = store.get_by_versioned_hashes_v3(&[versioned_hash]).unwrap();
assert_eq!(v3, vec![Some(expected)]);
}
}

View File

@@ -13,35 +13,6 @@ pub struct InMemoryBlobStore {
inner: Arc<InMemoryBlobStoreInner>,
}
impl InMemoryBlobStore {
/// Look up EIP-7594 blobs by their versioned hashes.
///
/// This returns a result vector with the **same length and order** as the input
/// `versioned_hashes`. Each element is `Some(BlobAndProofV2)` if the blob is available, or
/// `None` if it is missing or an older sidecar version.
fn get_by_versioned_hashes_eip7594(
&self,
versioned_hashes: &[B256],
) -> Vec<Option<BlobAndProofV2>> {
let mut result = vec![None; versioned_hashes.len()];
for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
for (hash_idx, match_result) in
blob_sidecar.match_versioned_hashes(versioned_hashes)
{
result[hash_idx] = Some(match_result);
}
}
// Return early if all blobs are found.
if result.iter().all(|blob| blob.is_some()) {
break;
}
}
result
}
}
#[derive(Debug, Default)]
struct InMemoryBlobStoreInner {
/// Storage for all blob data.
@@ -163,7 +134,20 @@ impl BlobStore for InMemoryBlobStore {
&self,
versioned_hashes: &[B256],
) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
let result = self.get_by_versioned_hashes_eip7594(versioned_hashes);
let mut result = vec![None; versioned_hashes.len()];
for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
for (hash_idx, match_result) in
blob_sidecar.match_versioned_hashes(versioned_hashes)
{
result[hash_idx] = Some(match_result);
}
}
if result.iter().all(|blob| blob.is_some()) {
break;
}
}
if result.iter().all(|blob| blob.is_some()) {
Ok(Some(result.into_iter().map(Option::unwrap).collect()))
} else {
@@ -171,13 +155,6 @@ impl BlobStore for InMemoryBlobStore {
}
}
fn get_by_versioned_hashes_v3(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
Ok(self.get_by_versioned_hashes_eip7594(versioned_hashes))
}
fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size_tracker.data_size())
}
@@ -206,45 +183,3 @@ fn insert_size(
store.insert(tx, Arc::new(blob));
add
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_eips::{
eip4844::{kzg_to_versioned_hash, Blob, BlobAndProofV2, Bytes48},
eip7594::{
BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant, CELLS_PER_EXT_BLOB,
},
};
fn eip7594_single_blob_sidecar() -> (BlobTransactionSidecarVariant, B256, BlobAndProofV2) {
let blob = Blob::default();
let commitment = Bytes48::default();
let cell_proofs = vec![Bytes48::default(); CELLS_PER_EXT_BLOB];
let versioned_hash = kzg_to_versioned_hash(commitment.as_slice());
let expected =
BlobAndProofV2 { blob: Box::new(Blob::default()), proofs: cell_proofs.clone() };
let sidecar = BlobTransactionSidecarEip7594::new(vec![blob], vec![commitment], cell_proofs);
(BlobTransactionSidecarVariant::Eip7594(sidecar), versioned_hash, expected)
}
#[test]
fn mem_get_blobs_v3_returns_partial_results() {
let store = InMemoryBlobStore::default();
let (sidecar, versioned_hash, expected) = eip7594_single_blob_sidecar();
store.insert(B256::random(), sidecar).unwrap();
assert_ne!(versioned_hash, B256::ZERO);
let request = vec![versioned_hash, B256::ZERO];
let v2 = store.get_by_versioned_hashes_v2(&request).unwrap();
assert!(v2.is_none(), "v2 must return null if any requested blob is missing");
let v3 = store.get_by_versioned_hashes_v3(&request).unwrap();
assert_eq!(v3, vec![Some(expected), None]);
}
}

View File

@@ -100,15 +100,6 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
versioned_hashes: &[B256],
) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError>;
/// Return the [`BlobAndProofV2`]s for a list of blob versioned hashes.
///
/// The response is always the same length as the request. Missing or older-version blobs are
/// returned as `None` elements.
fn get_by_versioned_hashes_v3(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError>;
/// Data size of all transactions in the blob store.
fn data_size_hint(&self) -> Option<usize>;

View File

@@ -78,13 +78,6 @@ impl BlobStore for NoopBlobStore {
Ok(None)
}
fn get_by_versioned_hashes_v3(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
Ok(vec![None; versioned_hashes.len()])
}
fn data_size_hint(&self) -> Option<usize> {
Some(0)
}

View File

@@ -751,13 +751,6 @@ where
) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
self.pool.blob_store().get_by_versioned_hashes_v2(versioned_hashes)
}
fn get_blobs_for_versioned_hashes_v3(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
self.pool.blob_store().get_by_versioned_hashes_v3(versioned_hashes)
}
}
impl<V, T, S> TransactionPoolExt for Pool<V, T, S>

View File

@@ -269,26 +269,17 @@ pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
}
}
_ = stale_eviction_interval.tick() => {
let queued = pool
.queued_transactions();
let mut stale_blobs = Vec::new();
let now = std::time::Instant::now();
let stale_txs: Vec<_> = queued
let stale_txs: Vec<_> = pool
.queued_transactions()
.into_iter()
.filter(|tx| {
// filter stale transactions based on config
(tx.origin.is_external() || config.no_local_exemptions) && now - tx.timestamp > config.max_tx_lifetime
})
.map(|tx| {
if tx.is_eip4844() {
stale_blobs.push(*tx.hash());
}
*tx.hash()
(tx.origin.is_external() || config.no_local_exemptions) && tx.timestamp.elapsed() > config.max_tx_lifetime
})
.map(|tx| *tx.hash())
.collect();
debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
pool.remove_transactions(stale_txs);
pool.delete_blobs(stale_blobs);
}
}
// handle the result of the account reload

View File

@@ -345,13 +345,6 @@ impl<T: EthPoolTransaction> TransactionPool for NoopTransactionPool<T> {
) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
Ok(None)
}
fn get_blobs_for_versioned_hashes_v3(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
Ok(vec![None; versioned_hashes.len()])
}
}
/// A [`TransactionValidator`] that does nothing.

View File

@@ -638,15 +638,6 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
&self,
versioned_hashes: &[B256],
) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError>;
/// Return the [`BlobAndProofV2`]s for a list of blob versioned hashes.
///
/// The response is always the same length as the request. Missing or older-version blobs are
/// returned as `None` elements.
fn get_blobs_for_versioned_hashes_v3(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError>;
}
/// Extension for [`TransactionPool`] trait that allows to set the current block info.

View File

@@ -276,12 +276,13 @@ where
{
use rayon::iter::{ParallelBridge, ParallelIterator};
let (tx, rx) = std::sync::mpsc::channel();
let retain_updates = self.retain_updates;
// Process all storage trie revealings in parallel, having first removed the
// `reveal_nodes` tracking and `SparseTrie`s for each account from their HashMaps.
// These will be returned after processing.
let results: Vec<_> = storages
storages
.into_iter()
.map(|(account, storage_subtree)| {
let revealed_nodes = self.storage.take_or_create_revealed_paths(&account);
@@ -300,12 +301,14 @@ where
(account, revealed_nodes, trie, result)
})
.collect();
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
drop(tx);
// Return `revealed_nodes` and `SparseTrie` for each account, incrementing metrics and
// returning the last error seen if any.
let mut any_err = Ok(());
for (account, revealed_nodes, trie, result) in results {
for (account, revealed_nodes, trie, result) in rx {
self.storage.revealed_paths.insert(account, revealed_nodes);
self.storage.tries.insert(account, trie);
if let Ok(_metric_values) = result {

View File

@@ -160,15 +160,15 @@ where
///
/// If `metrics` feature is enabled, it also updates the metrics.
fn next_hashed_entry(&mut self) -> Result<Option<(B256, H::Value)>, DatabaseError> {
let next = self.hashed_cursor.next()?;
let result = self.hashed_cursor.next();
self.last_next_result = next;
self.last_next_result = result.clone()?;
#[cfg(feature = "metrics")]
{
self.metrics.inc_leaf_nodes_advanced();
}
Ok(next)
result
}
}

View File

@@ -0,0 +1,96 @@
# reth debug
Various debug routines
```bash
$ reth debug --help
```
```txt
Usage: reth debug [OPTIONS] <COMMAND>
Commands:
merkle Debug the clean & incremental state root calculations
in-memory-merkle Debug in-memory state root calculation
help Print this message or the help of the given subcommand(s)
Options:
-h, --help
Print help (see a summary with '-h')
Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout
[default: terminal]
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
--log.file.format <FORMAT>
The format to use for logs written to the log file
[default: terminal]
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
--log.file.filter <FILTER>
The filter to use for logs written to the log file
[default: debug]
--log.file.directory <PATH>
The path to put log files in
[default: <CACHE_DIR>/logs]
--log.file.max-size <SIZE>
The maximum size (in MB) of one log file
[default: 200]
--log.file.max-files <COUNT>
The maximum amount of log files that will be stored. If set to 0, background file logging is disabled
[default: 5]
--log.journald
Write logs to journald
--log.journald.filter <FILTER>
The filter to use for logs written to journald
[default: error]
--color <COLOR>
Sets whether or not the formatter emits ANSI terminal escape codes for colors and other text formatting
[default: always]
Possible values:
- always: Colors on
- auto: Auto-detect
- never: Colors off
Display:
-v, --verbosity...
Set the minimum log level.
-v Errors
-vv Warnings
-vvv Info
-vvvv Debug
-vvvvv Traces (warning: very verbose!)
-q, --quiet
Silence all log output
```

View File

@@ -0,0 +1,100 @@
# reth recover
Scripts for node recovery
```bash
$ reth recover --help
```
```txt
Usage: reth recover [OPTIONS] <COMMAND>
Commands:
storage-tries Recover the node by deleting dangling storage tries
help Print this message or the help of the given subcommand(s)
Options:
-h, --help
Print help (see a summary with '-h')
Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
[default: terminal]
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
--log.file.format <FORMAT>
The format to use for logs written to the log file
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
[default: terminal]
--log.file.filter <FILTER>
The filter to use for logs written to the log file
[default: debug]
--log.file.directory <PATH>
The path to put log files in
[default: <CACHE_DIR>/logs]
--log.file.name <NAME>
The prefix name of the log files
[default: reth.log]
--log.file.max-size <SIZE>
The maximum size (in MB) of one log file
[default: 200]
--log.file.max-files <COUNT>
The maximum amount of log files that will be stored. If set to 0, background file logging is disabled
[default: 5]
--log.journald
Write logs to journald
--log.journald.filter <FILTER>
The filter to use for logs written to journald
[default: error]
--color <COLOR>
Sets whether or not the formatter emits ANSI terminal escape codes for colors and other text formatting
Possible values:
- always: Colors on
- auto: Auto-detect
- never: Colors off
[default: always]
Display:
-v, --verbosity...
Set the minimum log level.
-v Errors
-vv Warnings
-vvv Info
-vvvv Debug
-vvvvv Traces (warning: very verbose!)
-q, --quiet
Silence all log output
```

Some files were not shown because too many files have changed in this diff Show More