Merge branch 'main' into transaction-fetcher

This commit is contained in:
Matthias Seitz
2023-10-24 13:19:37 +02:00
57 changed files with 1702 additions and 349 deletions

318
Cargo.lock generated
View File

@@ -86,17 +86,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom 0.2.10",
"once_cell",
"version_check",
]
[[package]]
name = "ahash"
version = "0.8.3"
@@ -496,6 +485,17 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.4.4"
@@ -521,6 +521,20 @@ dependencies = [
"event-listener",
]
[[package]]
name = "async-sse"
version = "5.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e6fa871e4334a622afd6bb2f611635e8083a6f5e2936c0f90f37c7ef9856298"
dependencies = [
"async-channel",
"futures-lite",
"http-types",
"log",
"memchr",
"pin-project-lite",
]
[[package]]
name = "async-trait"
version = "0.1.74"
@@ -661,6 +675,19 @@ dependencies = [
"serde",
]
[[package]]
name = "beacon-api-sse"
version = "0.0.0"
dependencies = [
"clap",
"eyre",
"futures-util",
"mev-share-sse",
"reth",
"tokio",
"tracing",
]
[[package]]
name = "bech32"
version = "0.9.1"
@@ -1376,6 +1403,15 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "concurrent-queue"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "confy"
version = "0.5.1"
@@ -1632,9 +1668,9 @@ checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
[[package]]
name = "crypto-bigint"
version = "0.5.3"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "740fe28e594155f10cfc383984cbefd529d7396050557148f79cb0f621204124"
checksum = "071c0f5945634bc9ba7a452f492377dd6b1993665ddb58f28704119b32f07a9a"
dependencies = [
"generic-array",
"rand_core 0.6.4",
@@ -2004,7 +2040,7 @@ dependencies = [
[[package]]
name = "discv5"
version = "0.3.1"
source = "git+https://github.com/sigp/discv5?rev=d2e30e04ee62418b9e57278cee907c02b99d5bd1#d2e30e04ee62418b9e57278cee907c02b99d5bd1"
source = "git+https://github.com/sigp/discv5?rev=f289bbd4c57d499bb1bdb393af3c249600a1c662#f289bbd4c57d499bb1bdb393af3c249600a1c662"
dependencies = [
"aes 0.7.5",
"aes-gcm",
@@ -2017,7 +2053,7 @@ dependencies = [
"hex",
"hkdf",
"lazy_static",
"lru 0.7.8",
"lru 0.12.0",
"more-asserts",
"parking_lot 0.11.2",
"rand 0.8.5",
@@ -2703,6 +2739,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.2.0"
@@ -2772,6 +2823,21 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
[[package]]
name = "futures-lite"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
dependencies = [
"fastrand 1.9.0",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-locks"
version = "0.7.1"
@@ -2994,23 +3060,11 @@ dependencies = [
"byteorder",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash 0.7.6",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash 0.7.6",
]
[[package]]
name = "hashbrown"
@@ -3018,7 +3072,7 @@ version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
dependencies = [
"ahash 0.8.3",
"ahash",
]
[[package]]
@@ -3027,7 +3081,7 @@ version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156"
dependencies = [
"ahash 0.8.3",
"ahash",
"allocator-api2",
"serde",
]
@@ -3043,11 +3097,11 @@ dependencies = [
[[package]]
name = "hashlink"
version = "0.7.0"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7"
dependencies = [
"hashbrown 0.11.2",
"hashbrown 0.14.2",
]
[[package]]
@@ -3167,6 +3221,26 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f"
[[package]]
name = "http-types"
version = "2.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad"
dependencies = [
"anyhow",
"async-channel",
"base64 0.13.1",
"futures-lite",
"infer",
"pin-project-lite",
"rand 0.7.3",
"serde",
"serde_json",
"serde_qs",
"serde_urlencoded",
"url",
]
[[package]]
name = "httparse"
version = "1.8.0"
@@ -3255,6 +3329,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "iai"
version = "0.1.1"
@@ -3535,13 +3622,19 @@ dependencies = [
"serde",
]
[[package]]
name = "infer"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac"
[[package]]
name = "inferno"
version = "0.11.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c50453ec3a6555fad17b1cd1a80d16af5bc7cb35094f64e429fd46549018c6a3"
dependencies = [
"ahash 0.8.3",
"ahash",
"indexmap 2.0.2",
"is-terminal",
"itoa",
@@ -4001,18 +4094,18 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lru"
version = "0.7.8"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a"
checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21"
dependencies = [
"hashbrown 0.12.3",
"hashbrown 0.14.2",
]
[[package]]
name = "lru"
version = "0.11.1"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21"
checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60"
dependencies = [
"hashbrown 0.14.2",
]
@@ -4117,7 +4210,7 @@ version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5"
dependencies = [
"ahash 0.8.3",
"ahash",
"metrics-macros",
"portable-atomic",
]
@@ -4185,6 +4278,26 @@ dependencies = [
"sketches-ddsketch",
]
[[package]]
name = "mev-share-sse"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e59928ecfd8f9dd3211f2eb08bdc260c78c2e2af34d1a652445a6287d3c95942"
dependencies = [
"async-sse",
"bytes",
"ethers-core",
"futures-util",
"http-types",
"pin-project-lite",
"reqwest",
"serde",
"serde_json",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "mime"
version = "0.3.17"
@@ -4278,9 +4391,27 @@ dependencies = [
[[package]]
name = "more-asserts"
version = "0.2.2"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7843ec2de400bcbc6a6328c958dc38e5359da6e93e72e37bc5246bf1ae776389"
checksum = "1fafa6961cabd9c63bcd77a45d7e3b7f3b552b70417831fb0f56db717e72407e"
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "nibble_vec"
@@ -4533,12 +4664,50 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "openssl"
version = "0.10.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c"
dependencies = [
"bitflags 2.4.1",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "option-ext"
version = "0.2.0"
@@ -4611,6 +4780,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "parking_lot"
version = "0.11.2"
@@ -5392,10 +5567,12 @@ dependencies = [
"http",
"http-body",
"hyper",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
@@ -5404,10 +5581,13 @@ dependencies = [
"serde_urlencoded",
"system-configuration",
"tokio",
"tokio-native-tls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"winreg",
]
@@ -5503,7 +5683,11 @@ dependencies = [
name = "reth-auto-seal-consensus"
version = "0.1.0-alpha.10"
dependencies = [
"clap",
"eyre",
"futures-util",
"jsonrpsee",
"reth",
"reth-beacon-consensus",
"reth-interfaces",
"reth-primitives",
@@ -5511,6 +5695,8 @@ dependencies = [
"reth-revm",
"reth-stages",
"reth-transaction-pool",
"serde_json",
"tempfile",
"tokio",
"tokio-stream",
"tracing",
@@ -6313,6 +6499,7 @@ dependencies = [
"reth-primitives",
"serde",
"serde_json",
"serde_with",
"similar-asserts",
"thiserror",
]
@@ -6845,7 +7032,7 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d"
dependencies = [
"ahash 0.8.3",
"ahash",
"cfg-if",
"hashbrown 0.13.2",
]
@@ -7032,6 +7219,17 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_qs"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6"
dependencies = [
"percent-encoding",
"serde",
"thiserror",
]
[[package]]
name = "serde_spanned"
version = "0.6.3"
@@ -7796,6 +7994,16 @@ dependencies = [
"syn 2.0.38",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
@@ -8355,6 +8563,7 @@ dependencies = [
"form_urlencoded",
"idna 0.4.0",
"percent-encoding",
"serde",
]
[[package]]
@@ -8406,6 +8615,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "vergen"
version = "8.2.5"
@@ -8432,6 +8647,12 @@ dependencies = [
"libc",
]
[[package]]
name = "waker-fn"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690"
[[package]]
name = "walkdir"
version = "2.4.0"
@@ -8529,6 +8750,19 @@ version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
[[package]]
name = "wasm-streams"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.64"

View File

@@ -51,6 +51,7 @@ members = [
"examples/cli-extension-event-hooks",
"examples/rpc-db",
"examples/manual-p2p",
"examples/beacon-api-sse",
"examples/trace-transaction-cli"
]
default-members = ["bin/reth"]
@@ -122,7 +123,7 @@ ethers-core = { version = "2.0", default-features = false }
ethers-providers = { version = "2.0", default-features = false }
ethers-signers = { version = "2.0", default-features = false }
ethers-middleware = { version = "2.0", default-features = false }
discv5 = { git = "https://github.com/sigp/discv5", rev = "d2e30e04ee62418b9e57278cee907c02b99d5bd1" }
discv5 = { git = "https://github.com/sigp/discv5", rev = "f289bbd4c57d499bb1bdb393af3c249600a1c662" }
igd = { git = "https://github.com/stevefan1999-personal/rust-igd", rev = "c2d1f83eb1612a462962453cb0703bc93258b173" }
## js

View File

@@ -1,7 +1,7 @@
use crate::utils::DbTool;
use clap::Parser;
use reth_db::{database::Database, table::Table, TableType, TableViewer, Tables};
use reth_db::{database::Database, table::Table, RawKey, RawTable, TableType, TableViewer, Tables};
use tracing::error;
/// The arguments for the `reth db get` command
@@ -12,9 +12,13 @@ pub struct Command {
/// NOTE: The dupsort tables are not supported now.
pub table: Tables,
/// The key to get content for
/// The key to get content for
#[arg(value_parser = maybe_json_value_parser)]
pub key: String,
/// Output bytes instead of human-readable decoded value
#[clap(long)]
pub raw: bool,
}
impl Command {
@@ -51,9 +55,17 @@ impl<DB: Database> TableViewer<()> for GetValueViewer<'_, DB> {
// get a key for given table
let key = self.args.table_key::<T>()?;
match self.tool.get::<T>(key)? {
let content = if self.args.raw {
self.tool
.get::<RawTable<T>>(RawKey::from(key))?
.map(|content| format!("{:?}", content.raw_value()))
} else {
self.tool.get::<T>(key)?.as_ref().map(serde_json::to_string_pretty).transpose()?
};
match content {
Some(content) => {
println!("{}", serde_json::to_string_pretty(&content)?);
println!("{}", content);
}
None => {
error!(target: "reth::cli", "No content for the given table key.");

View File

@@ -2,7 +2,7 @@ use super::tui::DbListTUI;
use crate::utils::{DbTool, ListFilter};
use clap::Parser;
use eyre::WrapErr;
use reth_db::{database::Database, table::Table, DatabaseEnvRO, TableViewer, Tables};
use reth_db::{database::Database, table::Table, DatabaseEnvRO, RawValue, TableViewer, Tables};
use reth_primitives::hex;
use std::cell::RefCell;
use tracing::error;
@@ -28,12 +28,24 @@ pub struct Command {
/// missing results since the search uses the raw uncompressed value from the database.
#[arg(long)]
search: Option<String>,
/// Minimum size of row in bytes
#[arg(long, default_value_t = 0)]
min_row_size: usize,
/// Minimum size of key in bytes
#[arg(long, default_value_t = 0)]
min_key_size: usize,
/// Minimum size of value in bytes
#[arg(long, default_value_t = 0)]
min_value_size: usize,
/// Returns the number of rows found.
#[arg(long, short)]
count: bool,
/// Dump as JSON instead of using TUI.
#[arg(long, short)]
json: bool,
/// Output bytes instead of human-readable decoded value
#[arg(long)]
raw: bool,
}
impl Command {
@@ -59,6 +71,9 @@ impl Command {
skip: self.skip,
len: self.len,
search,
min_row_size: self.min_row_size,
min_key_size: self.min_key_size,
min_value_size: self.min_value_size,
reverse: self.reverse,
only_count: self.count,
}
@@ -97,17 +112,19 @@ impl TableViewer<()> for ListTableViewer<'_> {
if self.args.count {
println!("{count} entries found.")
} else if self.args.raw {
let list = list.into_iter().map(|row| (row.0, RawValue::new(row.1).into_value())).collect::<Vec<_>>();
println!("{}", serde_json::to_string_pretty(&list)?);
} else {
println!("{}", serde_json::to_string_pretty(&list)?);
}
Ok(())
} else {
let list_filter = RefCell::new(list_filter);
DbListTUI::<_, T>::new(|skip, len| {
list_filter.borrow_mut().update_page(skip, len);
self.tool.list::<T>(&list_filter.borrow()).unwrap().0
}, self.args.skip, self.args.len, total_entries).run()
}, self.args.skip, self.args.len, total_entries, self.args.raw).run()
}
})??;

View File

@@ -3,7 +3,10 @@ use crossterm::{
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use reth_db::table::{Table, TableRow};
use reth_db::{
table::{Table, TableRow},
RawValue,
};
use std::{
io,
time::{Duration, Instant},
@@ -42,7 +45,70 @@ pub(crate) enum ViewMode {
GoToPage,
}
#[derive(Default)]
enum Entries<T: Table> {
/// Pairs of [Table::Key] and [RawValue<Table::Value>]
RawValues(Vec<(T::Key, RawValue<T::Value>)>),
/// Pairs of [Table::Key] and [Table::Value]
Values(Vec<TableRow<T>>),
}
impl<T: Table> Entries<T> {
/// Creates new empty [Entries] as [Entries::RawValues] if `raw_values == true` and as
/// [Entries::Values] if `raw == false`.
fn new_with_raw_values(raw_values: bool) -> Self {
if raw_values {
Self::RawValues(Vec::new())
} else {
Self::Values(Vec::new())
}
}
/// Sets the internal entries [Vec], converting the [Table::Value] into [RawValue<Table::Value>]
/// if needed.
fn set(&mut self, new_entries: Vec<TableRow<T>>) {
match self {
Entries::RawValues(old_entries) => {
*old_entries =
new_entries.into_iter().map(|(key, value)| (key, value.into())).collect()
}
Entries::Values(old_entries) => *old_entries = new_entries,
}
}
/// Returns the length of internal [Vec].
fn len(&self) -> usize {
match self {
Entries::RawValues(entries) => entries.len(),
Entries::Values(entries) => entries.len(),
}
}
/// Returns an iterator over keys of the internal [Vec]. For both [Entries::RawValues] and
/// [Entries::Values], this iterator will yield [Table::Key].
fn iter_keys(&self) -> EntriesKeyIter<'_, T> {
EntriesKeyIter { entries: self, index: 0 }
}
}
struct EntriesKeyIter<'a, T: Table> {
entries: &'a Entries<T>,
index: usize,
}
impl<'a, T: Table> Iterator for EntriesKeyIter<'a, T> {
type Item = &'a T::Key;
fn next(&mut self) -> Option<Self::Item> {
let item = match self.entries {
Entries::RawValues(values) => values.get(self.index).map(|(key, _)| key),
Entries::Values(values) => values.get(self.index).map(|(key, _)| key),
};
self.index += 1;
item
}
}
pub(crate) struct DbListTUI<F, T: Table>
where
F: FnMut(usize, usize) -> Vec<TableRow<T>>,
@@ -65,7 +131,7 @@ where
/// The state of the key list.
list_state: ListState,
/// Entries to show in the TUI.
entries: Vec<TableRow<T>>,
entries: Entries<T>,
}
impl<F, T: Table> DbListTUI<F, T>
@@ -73,7 +139,13 @@ where
F: FnMut(usize, usize) -> Vec<TableRow<T>>,
{
/// Create a new database list TUI
pub(crate) fn new(fetch: F, skip: usize, count: usize, total_entries: usize) -> Self {
pub(crate) fn new(
fetch: F,
skip: usize,
count: usize,
total_entries: usize,
raw: bool,
) -> Self {
Self {
fetch,
skip,
@@ -82,7 +154,7 @@ where
mode: ViewMode::Normal,
input: String::new(),
list_state: ListState::default(),
entries: Vec::new(),
entries: Entries::new_with_raw_values(raw),
}
}
@@ -148,7 +220,7 @@ where
/// Fetch the current page
fn fetch_page(&mut self) {
self.entries = (self.fetch)(self.skip, self.count);
self.entries.set((self.fetch)(self.skip, self.count));
self.reset();
}
@@ -298,10 +370,9 @@ where
let key_length = format!("{}", (app.skip + app.count).saturating_sub(1)).len();
let entries: Vec<_> = app.entries.iter().map(|(k, _)| k).collect();
let formatted_keys = entries
.into_iter()
let formatted_keys = app
.entries
.iter_keys()
.enumerate()
.map(|(i, k)| {
ListItem::new(format!("[{:0>width$}]: {k:?}", i + app.skip, width = key_length))
@@ -321,7 +392,22 @@ where
.start_corner(Corner::TopLeft);
f.render_stateful_widget(key_list, inner_chunks[0], &mut app.list_state);
let values = app.entries.iter().map(|(_, v)| v).collect::<Vec<_>>();
let values: Vec<_> = match &app.entries {
Entries::RawValues(entries) => entries
.iter()
.map(|(_, v)| {
serde_json::to_string_pretty(v)
.unwrap_or(String::from("Error serializing value"))
})
.collect(),
Entries::Values(entries) => entries
.iter()
.map(|(_, v)| {
serde_json::to_string_pretty(v)
.unwrap_or(String::from("Error serializing value"))
})
.collect(),
};
let value_display = Paragraph::new(
app.list_state

View File

@@ -130,6 +130,16 @@ impl<'a, DB: Database> DbTool<'a, DB> {
if let Ok((k, v)) = row {
let (key, value) = (k.into_key(), v.into_value());
if key.len() + value.len() < filter.min_row_size {
return None
}
if key.len() < filter.min_key_size {
return None
}
if value.len() < filter.min_value_size {
return None
}
let result = || {
if filter.only_count {
return None
@@ -213,6 +223,12 @@ pub struct ListFilter {
pub len: usize,
/// Sequence of bytes that will be searched on values and keys from the database.
pub search: Vec<u8>,
/// Minimum row size.
pub min_row_size: usize,
/// Minimum key size.
pub min_key_size: usize,
/// Minimum value size.
pub min_value_size: usize,
/// Reverse order of entries.
pub reverse: bool,
/// Only counts the number of filtered entries without decoding and returning them.
@@ -220,11 +236,6 @@ pub struct ListFilter {
}
impl ListFilter {
/// Creates a new [`ListFilter`].
pub fn new(skip: usize, len: usize, search: Vec<u8>, reverse: bool, only_count: bool) -> Self {
ListFilter { skip, len, search, reverse, only_count }
}
/// If `search` has a list of bytes, then filter for rows that have this sequence.
pub fn has_search(&self) -> bool {
!self.search.is_empty()

View File

@@ -26,3 +26,9 @@ tracing.workspace = true
[dev-dependencies]
reth-interfaces = { workspace = true, features = ["test-utils"] }
reth = { workspace = true }
tempfile = { workspace = true }
clap = { workspace = true }
jsonrpsee = { workspace = true }
eyre = { workspace = true }
serde_json = { workspace = true }

View File

@@ -305,6 +305,9 @@ impl StorageInner {
// TODO: there isn't really a parent beacon block root here, so not sure whether or not to
// call the 4788 beacon contract
// set the first block to find the correct index in bundle state
executor.set_first_block(block.number);
let (receipts, gas_used) =
executor.execute_transactions(block, U256::ZERO, Some(senders))?;

View File

@@ -0,0 +1,120 @@
//! auto-mine consensus integration test
use clap::Parser;
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
use reth::{
cli::{
components::RethNodeComponents,
ext::{NoArgs, NoArgsCliExt, RethNodeCommandConfig},
},
node::NodeCommand,
runner::CliRunner,
tasks::TaskSpawner,
};
use reth_primitives::{hex, revm_primitives::FixedBytes, ChainSpec, Genesis};
use reth_provider::CanonStateSubscriptions;
use reth_transaction_pool::TransactionPool;
use std::{sync::Arc, time::Duration};
use tokio::time::timeout;
#[derive(Debug)]
struct AutoMineConfig;
impl RethNodeCommandConfig for AutoMineConfig {
fn on_node_started<Reth: RethNodeComponents>(&mut self, components: &Reth) -> eyre::Result<()> {
let pool = components.pool();
let mut canon_events = components.events().subscribe_to_canonical_state();
components.task_executor().spawn_critical_blocking("rpc request", Box::pin(async move {
// submit tx through rpc
let raw_tx = "0x02f876820a28808477359400847735940082520894ab0840c0e43688012c1adb0f5e3fc665188f83d28a029d394a5d630544000080c080a0a044076b7e67b5deecc63f61a8d7913fab86ca365b344b5759d1fe3563b4c39ea019eab979dd000da04dfc72bb0377c092d30fd9e1cab5ae487de49586cc8b0090";
let client = HttpClientBuilder::default().build("http://127.0.0.1:8545").expect("http client should bind to default rpc port");
let response: String = client.request("eth_sendRawTransaction", rpc_params![raw_tx]).await.expect("client request should be valid");
let expected = "0xb1c6512f4fc202c04355fbda66755e0e344b152e633010e8fd75ecec09b63398";
assert_eq!(&response, expected);
// more than enough time for the next block
let duration = Duration::from_secs(15);
// wait for canon event or timeout
let update = timeout(duration, canon_events.recv())
.await
.expect("canon state should change before timeout")
.expect("canon events stream is still open");
let new_tip = update.tip();
let expected_tx_root: FixedBytes<32> = hex!("c79b5383458e63fb20c6a49d9ec7917195a59003a2af4b28a01d7c6fbbcd7e35").into();
assert_eq!(new_tip.transactions_root, expected_tx_root);
assert_eq!(new_tip.number, 1);
assert!(pool.pending_transactions().is_empty());
}));
Ok(())
}
}
#[test]
pub fn test_auto_mine() {
// create temp path for test
let temp_path = tempfile::TempDir::new().expect("tempdir is okay").into_path();
let datadir = temp_path.to_str().expect("temp path is okay");
let no_args = NoArgs::with(AutoMineConfig);
let chain = custom_chain();
let mut command = NodeCommand::<NoArgsCliExt<AutoMineConfig>>::parse_from([
"reth",
"--dev",
"--datadir",
datadir,
"--debug.max-block",
"1",
"--debug.terminate",
])
.with_ext::<NoArgsCliExt<AutoMineConfig>>(no_args);
// use custom chain spec
command.chain = chain;
let runner = CliRunner::default();
let node_command = runner.run_command_until_exit(|ctx| command.execute(ctx));
assert!(node_command.is_ok())
}
fn custom_chain() -> Arc<ChainSpec> {
let custom_genesis = r#"
{
"nonce": "0x42",
"timestamp": "0x0",
"extraData": "0x5343",
"gasLimit": "0x1388",
"difficulty": "0x400000000",
"mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"coinbase": "0x0000000000000000000000000000000000000000",
"alloc": {
"0x6Be02d1d3665660d22FF9624b7BE0551ee1Ac91b": {
"balance": "0x4a47e3c12448f4ad000000"
}
},
"number": "0x0",
"gasUsed": "0x0",
"parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"config": {
"ethash": {},
"chainId": 2600,
"homesteadBlock": 0,
"eip150Block": 0,
"eip155Block": 0,
"eip158Block": 0,
"byzantiumBlock": 0,
"constantinopleBlock": 0,
"petersburgBlock": 0,
"istanbulBlock": 0,
"berlinBlock": 0,
"londonBlock": 0,
"terminalTotalDifficulty": 0,
"terminalTotalDifficultyPassed": true,
"shanghaiTime": 0
}
}
"#;
let genesis: Genesis = serde_json::from_str(custom_genesis).unwrap();
Arc::new(genesis.into())
}

View File

@@ -0,0 +1,4 @@
//! auto-mine consensus tests
mod auto_mine;
async fn main() {}

View File

@@ -1,5 +1,5 @@
pub mod clique_middleware;
mod geth;
pub use clique_middleware::{CliqueError, CliqueMiddleware, CliqueMiddlewareError};
pub use clique_middleware::CliqueMiddleware;
pub use geth::CliqueGethInstance;

View File

@@ -1,32 +1,63 @@
//! Helpers for working with EIP-1559 base fee
/// Calculate base fee for next block. [EIP-1559](https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1559.md) spec
/// Calculate the base fee for the next block based on the EIP-1559 specification.
///
/// This function calculates the base fee for the next block according to the rules defined in the
/// EIP-1559. EIP-1559 introduces a new transaction pricing mechanism that includes a
/// fixed-per-block network fee that is burned and dynamically adjusts block sizes to handle
/// transient congestion.
///
/// For each block, the base fee per gas is determined by the gas used in the parent block and the
/// target gas (the block gas limit divided by the elasticity multiplier). The algorithm increases
/// the base fee when blocks are congested and decreases it when they are under the target gas
/// usage. The base fee per gas is always burned.
///
/// Parameters:
/// - `gas_used`: The gas used in the current block.
/// - `gas_limit`: The gas limit of the current block.
/// - `base_fee`: The current base fee per gas.
/// - `base_fee_params`: Base fee parameters such as elasticity multiplier and max change
/// denominator.
///
/// Returns:
/// The calculated base fee for the next block as a `u64`.
///
/// For more information, refer to the [EIP-1559 spec](https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1559.md).
pub fn calculate_next_block_base_fee(
gas_used: u64,
gas_limit: u64,
base_fee: u64,
base_fee_params: crate::BaseFeeParams,
) -> u64 {
// Calculate the target gas by dividing the gas limit by the elasticity multiplier.
let gas_target = gas_limit / base_fee_params.elasticity_multiplier;
if gas_used == gas_target {
return base_fee
}
if gas_used > gas_target {
let gas_used_delta = gas_used - gas_target;
let base_fee_delta = std::cmp::max(
1,
base_fee as u128 * gas_used_delta as u128 /
gas_target as u128 /
base_fee_params.max_change_denominator as u128,
);
base_fee + (base_fee_delta as u64)
} else {
let gas_used_delta = gas_target - gas_used;
let base_fee_per_gas_delta = base_fee as u128 * gas_used_delta as u128 /
gas_target as u128 /
base_fee_params.max_change_denominator as u128;
base_fee.saturating_sub(base_fee_per_gas_delta as u64)
match gas_used.cmp(&gas_target) {
// If the gas used in the current block is equal to the gas target, the base fee remains the
// same (no increase).
std::cmp::Ordering::Equal => base_fee,
// If the gas used in the current block is greater than the gas target, calculate a new
// increased base fee.
std::cmp::Ordering::Greater => {
// Calculate the increase in base fee based on the formula defined by EIP-1559.
base_fee +
(std::cmp::max(
// Ensure a minimum increase of 1.
1,
base_fee as u128 * (gas_used - gas_target) as u128 /
(gas_target as u128 * base_fee_params.max_change_denominator as u128),
) as u64)
}
// If the gas used in the current block is less than the gas target, calculate a new
// decreased base fee.
std::cmp::Ordering::Less => {
// Calculate the decrease in base fee based on the formula defined by EIP-1559.
base_fee.saturating_sub(
(base_fee as u128 * (gas_target - gas_used) as u128 /
(gas_target as u128 * base_fee_params.max_change_denominator as u128))
as u64,
)
}
}
}

View File

@@ -118,8 +118,8 @@ impl Transaction {
pub fn chain_id(&self) -> Option<u64> {
match self {
Transaction::Legacy(TxLegacy { chain_id, .. }) => *chain_id,
Transaction::Eip2930(TxEip2930 { chain_id, .. }) => Some(*chain_id),
Transaction::Eip1559(TxEip1559 { chain_id, .. }) => Some(*chain_id),
Transaction::Eip2930(TxEip2930 { chain_id, .. }) |
Transaction::Eip1559(TxEip1559 { chain_id, .. }) |
Transaction::Eip4844(TxEip4844 { chain_id, .. }) => Some(*chain_id),
}
}
@@ -128,8 +128,8 @@ impl Transaction {
pub fn set_chain_id(&mut self, chain_id: u64) {
match self {
Transaction::Legacy(TxLegacy { chain_id: ref mut c, .. }) => *c = Some(chain_id),
Transaction::Eip2930(TxEip2930 { chain_id: ref mut c, .. }) => *c = chain_id,
Transaction::Eip1559(TxEip1559 { chain_id: ref mut c, .. }) => *c = chain_id,
Transaction::Eip2930(TxEip2930 { chain_id: ref mut c, .. }) |
Transaction::Eip1559(TxEip1559 { chain_id: ref mut c, .. }) |
Transaction::Eip4844(TxEip4844 { chain_id: ref mut c, .. }) => *c = chain_id,
}
}
@@ -163,9 +163,9 @@ impl Transaction {
/// Gets the transaction's value field.
pub fn value(&self) -> TxValue {
*match self {
Transaction::Legacy(TxLegacy { value, .. }) => value,
Transaction::Eip2930(TxEip2930 { value, .. }) => value,
Transaction::Eip1559(TxEip1559 { value, .. }) => value,
Transaction::Legacy(TxLegacy { value, .. }) |
Transaction::Eip2930(TxEip2930 { value, .. }) |
Transaction::Eip1559(TxEip1559 { value, .. }) |
Transaction::Eip4844(TxEip4844 { value, .. }) => value,
}
}
@@ -173,9 +173,9 @@ impl Transaction {
/// Get the transaction's nonce.
pub fn nonce(&self) -> u64 {
match self {
Transaction::Legacy(TxLegacy { nonce, .. }) => *nonce,
Transaction::Eip2930(TxEip2930 { nonce, .. }) => *nonce,
Transaction::Eip1559(TxEip1559 { nonce, .. }) => *nonce,
Transaction::Legacy(TxLegacy { nonce, .. }) |
Transaction::Eip2930(TxEip2930 { nonce, .. }) |
Transaction::Eip1559(TxEip1559 { nonce, .. }) |
Transaction::Eip4844(TxEip4844 { nonce, .. }) => *nonce,
}
}
@@ -228,8 +228,7 @@ impl Transaction {
/// This is also commonly referred to as the "Gas Tip Cap" (`GasTipCap`).
pub fn max_priority_fee_per_gas(&self) -> Option<u128> {
match self {
Transaction::Legacy(_) => None,
Transaction::Eip2930(_) => None,
Transaction::Legacy(_) | Transaction::Eip2930(_) => None,
Transaction::Eip1559(TxEip1559 { max_priority_fee_per_gas, .. }) |
Transaction::Eip4844(TxEip4844 { max_priority_fee_per_gas, .. }) => {
Some(*max_priority_fee_per_gas)
@@ -243,9 +242,7 @@ impl Transaction {
/// This is also commonly referred to as the "blob versioned hashes" (`BlobVersionedHashes`).
pub fn blob_versioned_hashes(&self) -> Option<Vec<B256>> {
match self {
Transaction::Legacy(_) => None,
Transaction::Eip2930(_) => None,
Transaction::Eip1559(_) => None,
Transaction::Legacy(_) | Transaction::Eip2930(_) | Transaction::Eip1559(_) => None,
Transaction::Eip4844(TxEip4844 { blob_versioned_hashes, .. }) => {
Some(blob_versioned_hashes.to_vec())
}
@@ -354,9 +351,9 @@ impl Transaction {
/// Get the transaction's input field.
pub fn input(&self) -> &Bytes {
match self {
Transaction::Legacy(TxLegacy { input, .. }) => input,
Transaction::Eip2930(TxEip2930 { input, .. }) => input,
Transaction::Eip1559(TxEip1559 { input, .. }) => input,
Transaction::Legacy(TxLegacy { input, .. }) |
Transaction::Eip2930(TxEip2930 { input, .. }) |
Transaction::Eip1559(TxEip1559 { input, .. }) |
Transaction::Eip4844(TxEip4844 { input, .. }) => input,
}
}

View File

@@ -13,7 +13,7 @@ use revm::{
db::DatabaseRef,
primitives::{AccountInfo, ResultAndState, KECCAK_EMPTY},
};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::collections::{btree_map::Entry, BTreeMap, HashMap, VecDeque};
/// A type for creating geth style traces
#[derive(Clone, Debug)]
@@ -208,23 +208,66 @@ impl GethTraceBuilder {
};
let account_diffs = state.into_iter().map(|(addr, acc)| (*addr, acc));
let is_diff = prestate_config.is_diff_mode();
if !is_diff {
if prestate_config.is_default_mode() {
let mut prestate = PreStateMode::default();
for (addr, changed_acc) in account_diffs {
let db_acc = db.basic_ref(addr)?.unwrap_or_default();
let code = load_account_code(&db_acc);
// in default mode we __only__ return the touched state
for node in self.nodes.iter() {
let addr = node.trace.address;
let mut pre_state =
AccountState::from_account_info(db_acc.nonce, db_acc.balance, code);
let acc_state = match prestate.0.entry(addr) {
Entry::Vacant(entry) => {
let db_acc = db.basic_ref(addr)?.unwrap_or_default();
let code = load_account_code(&db_acc);
let acc_state =
AccountState::from_account_info(db_acc.nonce, db_acc.balance, code);
entry.insert(acc_state)
}
Entry::Occupied(entry) => entry.into_mut(),
};
// handle _touched_ storage slots
for (key, slot) in changed_acc.storage.iter() {
pre_state.storage.insert((*key).into(), slot.previous_or_original_value.into());
for (key, value) in node.touched_slots() {
match acc_state.storage.entry(key.into()) {
Entry::Vacant(entry) => {
entry.insert(value.into());
}
Entry::Occupied(_) => {
// we've already recorded this slot
}
}
}
prestate.0.insert(addr, pre_state);
}
// also need to check changed accounts for things like balance changes etc
for (addr, changed_acc) in account_diffs {
let acc_state = match prestate.0.entry(addr) {
Entry::Vacant(entry) => {
let db_acc = db.basic_ref(addr)?.unwrap_or_default();
let code = load_account_code(&db_acc);
let acc_state =
AccountState::from_account_info(db_acc.nonce, db_acc.balance, code);
entry.insert(acc_state)
}
Entry::Occupied(entry) => {
// already recorded via touched accounts
entry.into_mut()
}
};
// in case we missed anything during the trace, we need to add the changed accounts
// storage
for (key, slot) in changed_acc.storage.iter() {
match acc_state.storage.entry((*key).into()) {
Entry::Vacant(entry) => {
entry.insert(slot.previous_or_original_value.into());
}
Entry::Occupied(_) => {
// we've already recorded this slot
}
}
}
}
Ok(PreStateFrame::Default(prestate))
} else {
let mut state_diff = DiffMode::default();

View File

@@ -14,7 +14,7 @@ use revm::interpreter::{
opcode, CallContext, CallScheme, CreateScheme, InstructionResult, OpCode, SharedMemory, Stack,
};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{BTreeMap, VecDeque};
/// A unified representation of a call
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
@@ -250,6 +250,28 @@ impl CallTraceNode {
}
}
/// Returns all storage slots touched by this trace and the value this storage.
///
/// A touched slot is either a slot that was written to or read from.
///
/// If the slot is accessed more than once, the result only includes the first time it was
/// accessed, in other words in only returns the original value of the slot.
pub(crate) fn touched_slots(&self) -> BTreeMap<U256, U256> {
let mut touched_slots = BTreeMap::new();
for change in self.trace.steps.iter().filter_map(|s| s.storage_change.as_ref()) {
match touched_slots.entry(change.key) {
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(change.value);
}
std::collections::btree_map::Entry::Occupied(_) => {
// already touched
}
}
}
touched_slots
}
/// Pushes all steps onto the stack in reverse order
/// so that the first step is on top of the stack
pub(crate) fn push_steps_on_stack<'a>(

View File

@@ -135,6 +135,11 @@ impl<'a> EVMProcessor<'a> {
self.stack = stack;
}
/// Configure the executor with the given block.
pub fn set_first_block(&mut self, num: BlockNumber) {
self.first_block = Some(num);
}
/// Returns a reference to the database
pub fn db_mut(&mut self) -> &mut StateDBBox<'a, RethError> {
// Option will be removed from EVM in the future.

View File

@@ -1,18 +1,19 @@
//! Helpers for testing trace calls.
use futures::{Stream, StreamExt};
use reth_primitives::BlockId;
use jsonrpsee::core::Error as RpcError;
use reth_primitives::{BlockId, TxHash};
use reth_rpc_api::clients::TraceApiClient;
use reth_rpc_types::trace::parity::{LocalizedTransactionTrace, TraceResults, TraceType};
use std::{
collections::HashSet,
pin::Pin,
task::{Context, Poll},
};
use jsonrpsee::core::Error as RpcError;
use reth_rpc_types::trace::parity::LocalizedTransactionTrace;
/// A result type for the `trace_block` method that also captures the requested block.
pub type TraceBlockResult = Result<(Vec<LocalizedTransactionTrace>, BlockId), (RpcError, BlockId)>;
/// Type alias representing the result of replaying a transaction.
pub type ReplayTransactionResult = Result<(TraceResults, TxHash), (RpcError, TxHash)>;
/// An extension trait for the Trace API.
#[async_trait::async_trait]
@@ -35,6 +36,36 @@ pub trait TraceApiExt {
where
I: IntoIterator<Item = B>,
B: Into<BlockId>;
/// Returns a new stream that replays the transactions for the given transaction hashes.
///
/// This returns all results in order.
fn replay_transactions<I>(
&self,
tx_hashes: I,
trace_types: HashSet<TraceType>,
) -> ReplayTransactionStream<'_>
where
I: IntoIterator<Item = TxHash>;
}
/// A stream that replays the transactions for the requested hashes.
#[must_use = "streams do nothing unless polled"]
pub struct ReplayTransactionStream<'a> {
stream: Pin<Box<dyn Stream<Item = ReplayTransactionResult> + 'a>>,
}
impl<'a> Stream for ReplayTransactionStream<'a> {
type Item = ReplayTransactionResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl<'a> std::fmt::Debug for ReplayTransactionStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReplayTransactionStream").finish()
}
}
#[async_trait::async_trait]
@@ -72,6 +103,28 @@ impl<T: TraceApiClient + Sync> TraceApiExt for T {
.buffer_unordered(n);
TraceBlockStream { stream: Box::pin(stream) }
}
fn replay_transactions<I>(
&self,
tx_hashes: I,
trace_types: HashSet<TraceType>,
) -> ReplayTransactionStream<'_>
where
I: IntoIterator<Item = TxHash>,
{
let hashes = tx_hashes.into_iter().collect::<Vec<_>>();
let stream = futures::stream::iter(hashes.into_iter().map(move |hash| {
let trace_types_clone = trace_types.clone(); // Clone outside of the async block
async move {
match self.replay_transaction(hash, trace_types_clone).await {
Ok(result) => Ok((result, hash)),
Err(err) => Err((err, hash)),
}
}
}))
.buffered(10);
ReplayTransactionStream { stream: Box::pin(stream) }
}
}
/// A stream that yields the traces for the requested blocks.

View File

@@ -21,6 +21,7 @@ alloy-rlp = { workspace = true, features = ["arrayvec"] }
thiserror.workspace = true
itertools.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_with = "3.3"
serde_json.workspace = true
jsonrpsee-types = { workspace = true, optional = true }
alloy-primitives = { workspace = true, features = ["rand", "rlp"] }

View File

@@ -0,0 +1,71 @@
//! Support for the Beacon API events
//!
//! See also [ethereum-beacon-API eventstream](https://ethereum.github.io/beacon-APIs/#/Events/eventstream)
use crate::engine::PayloadAttributes;
use alloy_primitives::B256;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
/// Event for the `payload_attributes` topic of the beacon API node event stream.
///
/// This event gives block builders and relays sufficient information to construct or verify a block
/// at `proposal_slot`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PayloadAttributesEvent {
/// the identifier of the beacon hard fork at `proposal_slot`, e.g `"bellatrix"`, `"capella"`.
pub version: String,
/// Wrapped data of the event.
pub data: PayloadAttributesData,
}
impl PayloadAttributesEvent {
/// Returns the payload attributes
pub fn attributes(&self) -> &PayloadAttributes {
&self.data.payload_attributes
}
}
/// Data of the event that contains the payload attributes
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PayloadAttributesData {
/// The slot at which a block using these payload attributes may be built
#[serde_as(as = "DisplayFromStr")]
pub proposal_slot: u64,
/// the beacon block root of the parent block to be built upon.
pub parent_block_root: B256,
/// he execution block number of the parent block.
#[serde_as(as = "DisplayFromStr")]
pub parent_block_number: u64,
/// the execution block hash of the parent block.
pub parent_block_hash: B256,
/// The execution block number of the parent block.
/// the validator index of the proposer at `proposal_slot` on the chain identified by
/// `parent_block_root`.
#[serde_as(as = "DisplayFromStr")]
pub proposer_index: u64,
/// Beacon API encoding of `PayloadAttributesV<N>` as defined by the `execution-apis`
/// specification
///
/// Note: this uses the beacon API format which uses snake-case and quoted decimals rather than
/// big-endian hex.
#[serde(with = "crate::eth::engine::payload::beacon_api_payload_attributes")]
pub payload_attributes: PayloadAttributes,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serde_payload_attributes_event() {
let s = r#"{"version":"capella","data":{"proposal_slot":"173332","proposer_index":"649112","parent_block_root":"0x5a49069647f6bf8f25d76b55ce920947654ade4ba1c6ab826d16712dd62b42bf","parent_block_number":"161093","parent_block_hash":"0x608b3d140ecb5bbcd0019711ac3704ece7be8e6d100816a55db440c1bcbb0251","payload_attributes":{"timestamp":"1697982384","prev_randao":"0x3142abd98055871ebf78f0f8e758fd3a04df3b6e34d12d09114f37a737f8f01e","suggested_fee_recipient":"0x0000000000000000000000000000000000000001","withdrawals":[{"index":"2461612","validator_index":"853570","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"45016211"},{"index":"2461613","validator_index":"853571","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5269785"},{"index":"2461614","validator_index":"853572","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5275106"},{"index":"2461615","validator_index":"853573","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5235962"},{"index":"2461616","validator_index":"853574","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5252171"},{"index":"2461617","validator_index":"853575","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5221319"},{"index":"2461618","validator_index":"853576","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5260879"},{"index":"2461619","validator_index":"853577","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5285244"},{"index":"2461620","validator_index":"853578","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5266681"},{"index":"2461621","validator_index":"853579","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5271322"},{"index":"2461622","validator_index":"853580","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5231327"},{"index":"2461623","validator_index":"853581","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5276761"},{"index":"2461624","validator_index":"853582","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5246244"},{"index":"2461625","validator_index":"853583","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5261011"},{"index":"2461626","validator_index":"853584","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5276477"},{"index":"2461627","validator_index":"853585","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5275319"}]}}}"#;
let event = serde_json::from_str::<PayloadAttributesEvent>(s).unwrap();
let input = serde_json::from_str::<serde_json::Value>(s).unwrap();
let json = serde_json::to_value(event).unwrap();
assert_eq!(input, json);
}
}

View File

@@ -0,0 +1,2 @@
/// Beacon API events support.
pub mod events;

View File

@@ -8,6 +8,9 @@ pub mod payload;
mod transition;
pub use self::{cancun::*, forkchoice::*, payload::*, transition::*};
/// Beacon API types
pub mod beacon_api;
/// The list of all supported Engine capabilities available over the engine endpoint.
pub const CAPABILITIES: [&str; 12] = [
"engine_forkchoiceUpdatedV1",

View File

@@ -1,3 +1,4 @@
use crate::eth::withdrawal::BeaconAPIWithdrawal;
pub use crate::Withdrawal;
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256, U64};
use reth_primitives::{
@@ -5,6 +6,7 @@ use reth_primitives::{
BlobTransactionSidecar, SealedBlock,
};
use serde::{ser::SerializeMap, Deserialize, Serialize, Serializer};
use serde_with::{serde_as, DisplayFromStr};
/// The execution payload body response that allows for `null` values.
pub type ExecutionPayloadBodiesV1 = Vec<Option<ExecutionPayloadBodyV1>>;
@@ -411,6 +413,63 @@ pub struct PayloadAttributes {
pub parent_beacon_block_root: Option<B256>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
struct BeaconAPIPayloadAttributes {
#[serde_as(as = "DisplayFromStr")]
timestamp: u64,
prev_randao: B256,
suggested_fee_recipient: Address,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde_as(as = "Option<Vec<BeaconAPIWithdrawal>>")]
withdrawals: Option<Vec<Withdrawal>>,
#[serde(skip_serializing_if = "Option::is_none")]
parent_beacon_block_root: Option<B256>,
}
/// A helper module for serializing and deserializing the payload attributes for the beacon API.
///
/// The beacon API encoded object has equivalent fields to the [PayloadAttributes] with two
/// differences:
/// 1) `snake_case` identifiers must be used rather than `camelCase`;
/// 2) integers must be encoded as quoted decimals rather than big-endian hex.
pub mod beacon_api_payload_attributes {
use super::*;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
/// Serialize the payload attributes for the beacon API.
pub fn serialize<S>(
payload_attributes: &PayloadAttributes,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let beacon_api_payload_attributes = BeaconAPIPayloadAttributes {
timestamp: payload_attributes.timestamp.to(),
prev_randao: payload_attributes.prev_randao,
suggested_fee_recipient: payload_attributes.suggested_fee_recipient,
withdrawals: payload_attributes.withdrawals.clone(),
parent_beacon_block_root: payload_attributes.parent_beacon_block_root,
};
beacon_api_payload_attributes.serialize(serializer)
}
/// Deserialize the payload attributes for the beacon API.
pub fn deserialize<'de, D>(deserializer: D) -> Result<PayloadAttributes, D::Error>
where
D: Deserializer<'de>,
{
let beacon_api_payload_attributes = BeaconAPIPayloadAttributes::deserialize(deserializer)?;
Ok(PayloadAttributes {
timestamp: U64::from(beacon_api_payload_attributes.timestamp),
prev_randao: beacon_api_payload_attributes.prev_randao,
suggested_fee_recipient: beacon_api_payload_attributes.suggested_fee_recipient,
withdrawals: beacon_api_payload_attributes.withdrawals,
parent_beacon_block_root: beacon_api_payload_attributes.parent_beacon_block_root,
})
}
}
/// This structure contains the result of processing a payload or fork choice update.
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -862,4 +921,21 @@ mod tests {
serde_json::from_str(input);
assert!(payload_res.is_err());
}
#[test]
fn beacon_api_payload_serde() {
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
struct Event {
#[serde(with = "beacon_api_payload_attributes")]
payload: PayloadAttributes,
}
let s = r#"{"timestamp":"1697981664","prev_randao":"0x739947d9f0aed15e32ed05a978e53b55cdcfe3db4a26165890fa45a80a06c996","suggested_fee_recipient":"0x0000000000000000000000000000000000000001","withdrawals":[{"index":"2460700","validator_index":"852657","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5268915"},{"index":"2460701","validator_index":"852658","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5253066"},{"index":"2460702","validator_index":"852659","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5266666"},{"index":"2460703","validator_index":"852660","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5239026"},{"index":"2460704","validator_index":"852661","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5273516"},{"index":"2460705","validator_index":"852662","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5260842"},{"index":"2460706","validator_index":"852663","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5238925"},{"index":"2460707","validator_index":"852664","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5253956"},{"index":"2460708","validator_index":"852665","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5284374"},{"index":"2460709","validator_index":"852666","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5276798"},{"index":"2460710","validator_index":"852667","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5239682"},{"index":"2460711","validator_index":"852668","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5261544"},{"index":"2460712","validator_index":"852669","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5247034"},{"index":"2460713","validator_index":"852670","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5256750"},{"index":"2460714","validator_index":"852671","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5261929"},{"index":"2460715","validator_index":"852672","address":"0x778f5f13c4be78a3a4d7141bcb26999702f407cf","amount":"5243188"}]}"#;
let event: Event = serde_json::from_str(s).unwrap();
let input = serde_json::from_str::<serde_json::Value>(s).unwrap();
let json = serde_json::to_value(event).unwrap();
assert_eq!(input, json);
}
}

View File

@@ -15,7 +15,7 @@ mod syncing;
pub mod trace;
mod transaction;
pub mod txpool;
mod withdrawal;
pub mod withdrawal;
mod work;
pub use account::*;

View File

@@ -196,6 +196,11 @@ impl AccountChangeKind {
}
}
/// The config for the prestate tracer.
///
/// If `diffMode` is set to true, the response frame includes all the account and storage diffs for
/// the transaction. If it's missing or set to false it only returns the accounts and storage
/// necessary to execute the transaction.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PreStateConfig {
@@ -204,9 +209,17 @@ pub struct PreStateConfig {
}
impl PreStateConfig {
/// Returns true if this trace was requested with diffmode.
#[inline]
pub fn is_diff_mode(&self) -> bool {
self.diff_mode.unwrap_or_default()
}
/// Is default mode if diff_mode is not set
#[inline]
pub fn is_default_mode(&self) -> bool {
!self.is_diff_mode()
}
}
#[cfg(test)]

View File

@@ -1,7 +1,11 @@
//! Withdrawal type and serde helpers.
use alloy_primitives::{Address, U256};
use alloy_rlp::RlpEncodable;
use reth_primitives::{constants::GWEI_TO_WEI, serde_helper::u64_hex};
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::{serde_as, DeserializeAs, DisplayFromStr, SerializeAs};
/// Withdrawal represents a validator withdrawal from the consensus layer.
#[derive(Debug, Clone, PartialEq, Eq, Default, Hash, RlpEncodable, Serialize, Deserialize)]
pub struct Withdrawal {
@@ -25,6 +29,73 @@ impl Withdrawal {
}
}
/// Same as [Withdrawal] but respects the Beacon API format which uses snake-case and quoted
/// decimals.
#[serde_as]
#[derive(Serialize, Deserialize)]
pub(crate) struct BeaconAPIWithdrawal {
#[serde_as(as = "DisplayFromStr")]
index: u64,
#[serde_as(as = "DisplayFromStr")]
validator_index: u64,
address: Address,
#[serde_as(as = "DisplayFromStr")]
amount: u64,
}
impl SerializeAs<Withdrawal> for BeaconAPIWithdrawal {
fn serialize_as<S>(source: &Withdrawal, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
beacon_api_withdrawals::serialize(source, serializer)
}
}
impl<'de> DeserializeAs<'de, Withdrawal> for BeaconAPIWithdrawal {
fn deserialize_as<D>(deserializer: D) -> Result<Withdrawal, D::Error>
where
D: Deserializer<'de>,
{
beacon_api_withdrawals::deserialize(deserializer)
}
}
/// A helper serde module to convert from/to the Beacon API which uses quoted decimals rather than
/// big-endian hex.
pub mod beacon_api_withdrawals {
use super::*;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
/// Serialize the payload attributes for the beacon API.
pub fn serialize<S>(payload_attributes: &Withdrawal, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let withdrawal = BeaconAPIWithdrawal {
index: payload_attributes.index,
validator_index: payload_attributes.validator_index,
address: payload_attributes.address,
amount: payload_attributes.amount,
};
withdrawal.serialize(serializer)
}
/// Deserialize the payload attributes for the beacon API.
pub fn deserialize<'de, D>(deserializer: D) -> Result<Withdrawal, D::Error>
where
D: Deserializer<'de>,
{
let withdrawal = BeaconAPIWithdrawal::deserialize(deserializer)?;
Ok(Withdrawal {
index: withdrawal.index,
validator_index: withdrawal.validator_index,
address: withdrawal.address,
amount: withdrawal.amount,
})
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -263,7 +263,10 @@ where
.into_pre_state_config()
.map_err(|_| EthApiError::InvalidTracerConfig)?;
let mut inspector = TracingInspector::new(
TracingInspectorConfig::from_geth_config(&config),
TracingInspectorConfig::from_geth_config(&config)
// if in default mode, we need to return all touched storages, for
// which we need to record steps and statediff
.set_steps_and_state_diffs(prestate_config.is_default_mode()),
);
let frame =
@@ -490,7 +493,10 @@ where
.map_err(|_| EthApiError::InvalidTracerConfig)?;
let mut inspector = TracingInspector::new(
TracingInspectorConfig::from_geth_config(&config),
TracingInspectorConfig::from_geth_config(&config)
// if in default mode, we need to return all touched storages, for
// which we need to record steps and statediff
.set_steps_and_state_diffs(prestate_config.is_default_mode()),
);
let (res, _) = inspect(&mut *db, env, &mut inspector)?;

View File

@@ -7,17 +7,28 @@ use reth_primitives::SealedHeader;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
/// Represents the specific error type within a block error.
#[derive(Error, Debug)]
pub enum BlockErrorKind {
/// The block encountered a validation error.
#[error("Validation error: {0}")]
Validation(#[source] consensus::ConsensusError),
/// The block encountered an execution error.
#[error("Execution error: {0}")]
Execution(#[source] executor::BlockExecutionError),
}
/// A stage execution error.
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered a state validation error.
#[error("Stage encountered a validation error in block {number}: {error}.", number = block.number)]
Validation {
/// The block that failed validation.
/// The stage encountered an error related to a block.
#[error("Stage encountered a block error in block {number}: {error}.", number = block.number)]
Block {
/// The block that caused the error.
block: SealedHeader,
/// The underlying consensus error.
/// The specific error type, either consensus or execution error.
#[source]
error: consensus::ConsensusError,
error: BlockErrorKind,
},
/// The stage encountered a downloader error where the responses cannot be attached to the
/// current head.
@@ -39,16 +50,6 @@ pub enum StageError {
/// The stage encountered a database error.
#[error("An internal database error occurred: {0}")]
Database(#[from] DbError),
#[error("Stage encountered a execution error in block {number}: {error}.", number = block.number)]
/// The stage encountered a execution error
// TODO: Probably redundant, should be rolled into `Validation`
ExecutionError {
/// The block that failed execution.
block: SealedHeader,
/// The underlying execution error.
#[source]
error: executor::BlockExecutionError,
},
/// Invalid pruning configuration
#[error(transparent)]
PruningConfiguration(#[from] reth_primitives::PruneSegmentError),

View File

@@ -1,10 +1,9 @@
use crate::{
error::*, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError,
UnwindInput,
error::*, BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage,
StageError, UnwindInput,
};
use futures_util::Future;
use reth_db::database::Database;
use reth_interfaces::executor::BlockExecutionError;
use reth_primitives::{
constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, stage::StageId, BlockNumber, ChainSpec, B256,
};
@@ -423,53 +422,60 @@ where
.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH)
.max(1);
Ok(ControlFlow::Unwind { target: unwind_to, bad_block: local_head })
} else if let StageError::Validation { block, error } = err {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered a validation error: {error}"
);
} else if let StageError::Block { block, error } = err {
match error {
BlockErrorKind::Validation(validation_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered a validation error: {validation_error}"
);
// FIXME: When handling errors, we do not commit the database transaction.
// This leads to the Merkle stage not clearing its
// checkpoint, and restarting from an invalid place.
drop(provider_rw);
provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?;
provider_rw
.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
provider_rw.commit()?;
// FIXME: When handling errors, we do not commit the database
// transaction. This leads to the Merkle
// stage not clearing its checkpoint, and
// restarting from an invalid place.
drop(provider_rw);
provider_rw =
factory.provider_rw().map_err(PipelineError::Interface)?;
provider_rw.save_stage_checkpoint_progress(
StageId::MerkleExecute,
vec![],
)?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
provider_rw.commit()?;
// We unwind because of a validation error. If the unwind itself fails,
// we bail entirely, otherwise we restart the execution loop from the
// beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
} else if let StageError::ExecutionError {
block,
error: BlockExecutionError::Validation(error),
} = err
{
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered an execution error: {error}"
);
// We unwind because of a validation error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart the execution loop from the
// beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
}
BlockErrorKind::Execution(execution_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered an execution error: {execution_error}"
);
// We unwind because of an execution error. If the unwind itself fails, we
// bail entirely, otherwise we restart the execution loop from the
// beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
// We unwind because of an execution error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart
// the execution loop from the beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
}
}
} else if err.is_fatal() {
error!(
target: "sync::pipeline",
@@ -817,9 +823,11 @@ mod tests {
)
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Err(StageError::Validation {
.add_exec(Err(StageError::Block {
block: random_header(&mut generators::rng(), 5, Default::default()),
error: consensus::ConsensusError::BaseFeeMissing,
error: BlockErrorKind::Validation(
consensus::ConsensusError::BaseFeeMissing,
),
}))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),

View File

@@ -1,6 +1,6 @@
use crate::{
stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, ExecInput, ExecOutput, MetricEvent,
MetricEventsSender, Stage, StageError, UnwindInput, UnwindOutput,
stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, BlockErrorKind, ExecInput, ExecOutput,
MetricEvent, MetricEventsSender, Stage, StageError, UnwindInput, UnwindOutput,
};
use num_traits::Zero;
use reth_db::{
@@ -161,7 +161,10 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// Execute the block
let (block, senders) = block.into_components();
executor.execute_and_verify_receipt(&block, td, Some(senders)).map_err(|error| {
StageError::ExecutionError { block: block.header.clone().seal_slow(), error }
StageError::Block {
block: block.header.clone().seal_slow(),
error: BlockErrorKind::Execution(error),
}
})?;
execution_duration += time.elapsed();

View File

@@ -1,4 +1,4 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_codecs::Compact;
use reth_db::{
database::Database,
@@ -88,12 +88,12 @@ impl MerkleStage {
Ok(())
} else {
warn!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root");
Err(StageError::Validation {
Err(StageError::Block {
block: expected.clone(),
error: consensus::ConsensusError::BodyStateRootDiff {
error: BlockErrorKind::Validation(consensus::ConsensusError::BodyStateRootDiff {
got,
expected: expected.state_root,
},
}),
})
}
}

View File

@@ -1,4 +1,4 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@@ -145,10 +145,11 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
let sealed_header = provider
.sealed_header(block_number)?
.ok_or(ProviderError::HeaderNotFound(block_number.into()))?;
return Err(StageError::Validation {
return Err(StageError::Block {
block: sealed_header,
error:
error: BlockErrorKind::Validation(
consensus::ConsensusError::TransactionSignerRecoveryError,
),
})
}
SenderRecoveryStageError::StageError(err) => return Err(err),

View File

@@ -1,4 +1,4 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
@@ -82,9 +82,12 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
let (block_number, header) = entry?;
td += header.difficulty;
self.consensus
.validate_header_with_total_difficulty(&header, td)
.map_err(|error| StageError::Validation { block: header.seal_slow(), error })?;
self.consensus.validate_header_with_total_difficulty(&header, td).map_err(|error| {
StageError::Block {
block: header.seal_slow(),
error: BlockErrorKind::Validation(error),
}
})?;
cursor_td.append(block_number, td.into())?;
}

View File

@@ -14,8 +14,6 @@ use revm_primitives::{
Address, Bytes, B256, U256,
};
pub use codecs_derive::*;
/// Trait that implements the `Compact` codec.
///
/// When deriving the trait for custom structs, be aware of certain limitations/recommendations:

View File

@@ -60,25 +60,23 @@ impl DbTx for TxMock {
}
fn commit(self) -> Result<bool, DatabaseError> {
todo!()
Ok(true)
}
fn drop(self) {
todo!()
}
fn drop(self) {}
fn cursor_read<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, DatabaseError> {
todo!()
Ok(CursorMock { _cursor: 0 })
}
fn cursor_dup_read<T: DupSort>(
&self,
) -> Result<<Self as DbTxGAT<'_>>::DupCursor<T>, DatabaseError> {
todo!()
Ok(CursorMock { _cursor: 0 })
}
fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
todo!()
Ok(self._table.len())
}
}

View File

@@ -129,6 +129,12 @@ impl<V: Value> RawValue<V> {
}
}
impl<V: Value> From<V> for RawValue<V> {
fn from(value: V) -> Self {
RawValue::new(value)
}
}
impl AsRef<[u8]> for RawValue<Vec<u8>> {
fn as_ref(&self) -> &[u8] {
&self.value

View File

@@ -512,23 +512,25 @@ else()
mark_as_advanced(MDBX_USE_OFDLOCKS)
set(MDBX_AVOID_MSYNC_DEFAULT OFF)
endif()
option(MDBX_AVOID_MSYNC "Controls dirty pages tracking, spilling and persisting in MDBX_WRITEMAP mode" ${MDBX_AVOID_MSYNC_DEFAULT})
add_mdbx_option(MDBX_AVOID_MSYNC "Controls dirty pages tracking, spilling and persisting in MDBX_WRITEMAP mode" ${MDBX_AVOID_MSYNC_DEFAULT})
add_mdbx_option(MDBX_LOCKING "Locking method (Windows=-1, SysV=5, POSIX=1988, POSIX=2001, POSIX=2008, Futexes=1995)" AUTO)
mark_as_advanced(MDBX_LOCKING)
add_mdbx_option(MDBX_TRUST_RTC "Does a system have battery-backed Real-Time Clock or just a fake" AUTO)
mark_as_advanced(MDBX_TRUST_RTC)
option(MDBX_FORCE_ASSERTIONS "Force enable assertion checking" OFF)
option(MDBX_DISABLE_VALIDATION "Disable some checks to reduce an overhead and detection probability of database corruption to a values closer to the LMDB" OFF)
option(MDBX_ENABLE_REFUND "Zerocost auto-compactification during write-transactions" ON)
option(MDBX_ENABLE_MADVISE "Using POSIX' madvise() and/or similar hints" ON)
add_mdbx_option(MDBX_FORCE_ASSERTIONS "Force enable assertion checking" OFF)
add_mdbx_option(MDBX_DISABLE_VALIDATION "Disable some checks to reduce an overhead and detection probability of database corruption to a values closer to the LMDB" OFF)
mark_as_advanced(MDBX_DISABLE_VALIDATION)
add_mdbx_option(MDBX_ENABLE_REFUND "Zerocost auto-compactification during write-transactions" ON)
add_mdbx_option(MDBX_ENABLE_MADVISE "Using POSIX' madvise() and/or similar hints" ON)
if (CMAKE_TARGET_BITNESS GREATER 32)
set(MDBX_BIGFOOT_DEFAULT ON)
else()
set(MDBX_BIGFOOT_DEFAULT OFF)
endif()
option(MDBX_ENABLE_BIGFOOT "Chunking long list of retired pages during huge transactions commit to avoid use sequences of pages" ${MDBX_BIGFOOT_DEFAULT})
option(MDBX_ENABLE_PGOP_STAT "Gathering statistics for page operations" ON)
option(MDBX_ENABLE_PROFGC "Profiling of GC search and updates" OFF)
add_mdbx_option(MDBX_ENABLE_BIGFOOT "Chunking long list of retired pages during huge transactions commit to avoid use sequences of pages" ${MDBX_BIGFOOT_DEFAULT})
add_mdbx_option(MDBX_ENABLE_PGOP_STAT "Gathering statistics for page operations" ON)
add_mdbx_option(MDBX_ENABLE_PROFGC "Profiling of GC search and updates" OFF)
mark_as_advanced(MDBX_ENABLE_PROFGC)
if(NOT MDBX_AMALGAMATED_SOURCE)
if(CMAKE_CONFIGURATION_TYPES OR CMAKE_BUILD_TYPE_UPPERCASE STREQUAL "DEBUG")

View File

@@ -1 +1 @@
0.12.6.0
0.12.8.0

View File

@@ -1039,9 +1039,15 @@ macro(probe_libcxx_filesystem)
#endif
int main(int argc, const char*argv[]) {
fs::path probe(argv[0]);
std::string str(argv[0]);
fs::path probe(str);
if (argc != 1) throw fs::filesystem_error(std::string("fake"), std::error_code());
return fs::is_directory(probe.relative_path());
int r = fs::is_directory(probe.relative_path());
for (auto const& i : fs::directory_iterator(probe)) {
++r;
(void)i;
}
return r;
}
]])
set(LIBCXX_FILESYSTEM "")

View File

@@ -1,6 +1,6 @@
.\" Copyright 2015-2023 Leonid Yuriev <leo@yuriev.ru>.
.\" Copying restrictions apply. See COPYRIGHT/LICENSE.
.TH MDBX_CHK 1 "2023-04-29" "MDBX 0.12.6"
.TH MDBX_CHK 1 "2023-10-17" "MDBX 0.12.8"
.SH NAME
mdbx_chk \- MDBX checking tool
.SH SYNOPSIS

View File

@@ -2,7 +2,7 @@
.\" Copyright 2015,2016 Peter-Service R&D LLC <http://billing.ru/>.
.\" Copyright 2012-2015 Howard Chu, Symas Corp. All Rights Reserved.
.\" Copying restrictions apply. See COPYRIGHT/LICENSE.
.TH MDBX_COPY 1 "2023-04-29" "MDBX 0.12.6"
.TH MDBX_COPY 1 "2023-10-17" "MDBX 0.12.8"
.SH NAME
mdbx_copy \- MDBX environment copy tool
.SH SYNOPSIS

View File

@@ -1,7 +1,7 @@
.\" Copyright 2021-2023 Leonid Yuriev <leo@yuriev.ru>.
.\" Copyright 2014-2021 Howard Chu, Symas Corp. All Rights Reserved.
.\" Copying restrictions apply. See COPYRIGHT/LICENSE.
.TH MDBX_DROP 1 "2023-04-29" "MDBX 0.12.6"
.TH MDBX_DROP 1 "2023-10-17" "MDBX 0.12.8"
.SH NAME
mdbx_drop \- MDBX database delete tool
.SH SYNOPSIS

View File

@@ -2,7 +2,7 @@
.\" Copyright 2015,2016 Peter-Service R&D LLC <http://billing.ru/>.
.\" Copyright 2014-2015 Howard Chu, Symas Corp. All Rights Reserved.
.\" Copying restrictions apply. See COPYRIGHT/LICENSE.
.TH MDBX_DUMP 1 "2023-04-29" "MDBX 0.12.6"
.TH MDBX_DUMP 1 "2023-10-17" "MDBX 0.12.8"
.SH NAME
mdbx_dump \- MDBX environment export tool
.SH SYNOPSIS

View File

@@ -2,7 +2,7 @@
.\" Copyright 2015,2016 Peter-Service R&D LLC <http://billing.ru/>.
.\" Copyright 2014-2015 Howard Chu, Symas Corp. All Rights Reserved.
.\" Copying restrictions apply. See COPYRIGHT/LICENSE.
.TH MDBX_LOAD 1 "2023-04-29" "MDBX 0.12.6"
.TH MDBX_LOAD 1 "2023-10-17" "MDBX 0.12.8"
.SH NAME
mdbx_load \- MDBX environment import tool
.SH SYNOPSIS

View File

@@ -2,7 +2,7 @@
.\" Copyright 2015,2016 Peter-Service R&D LLC <http://billing.ru/>.
.\" Copyright 2012-2015 Howard Chu, Symas Corp. All Rights Reserved.
.\" Copying restrictions apply. See COPYRIGHT/LICENSE.
.TH MDBX_STAT 1 "2023-04-29" "MDBX 0.12.6"
.TH MDBX_STAT 1 "2023-10-17" "MDBX 0.12.8"
.SH NAME
mdbx_stat \- MDBX environment status tool
.SH SYNOPSIS

View File

@@ -12,7 +12,7 @@
* <http://www.OpenLDAP.org/license.html>. */
#define xMDBX_ALLOY 1
#define MDBX_BUILD_SOURCERY a0e7c54f688eecaf45ddd7493b737f88a97e4e8b0fdaa55c9d3b00d69e0c8548_v0_12_6_0_gc019631a
#define MDBX_BUILD_SOURCERY 30c8f70db1f021dc2bfb201ba04efdcc34fc7495127f517f9624f18c0100b8ab_v0_12_8_0_g02c7cf2a
#ifdef MDBX_CONFIG_H
#include MDBX_CONFIG_H
#endif
@@ -3734,6 +3734,7 @@ struct MDBX_env {
int me_valgrind_handle;
#endif
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
MDBX_atomic_uint32_t me_ignore_EDEADLK;
pgno_t me_poison_edge;
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */
@@ -4875,6 +4876,9 @@ atomic_store64(MDBX_atomic_uint64_t *p, const uint64_t value,
enum MDBX_memory_order order) {
STATIC_ASSERT(sizeof(MDBX_atomic_uint64_t) == 8);
#if MDBX_64BIT_ATOMIC
#if __GNUC_PREREQ(11, 0)
STATIC_ASSERT(__alignof__(MDBX_atomic_uint64_t) >= sizeof(uint64_t));
#endif /* GNU C >= 11 */
#ifdef MDBX_HAVE_C11ATOMICS
assert(atomic_is_lock_free(MDBX_c11a_rw(uint64_t, p)));
atomic_store_explicit(MDBX_c11a_rw(uint64_t, p), value, mo_c11_store(order));
@@ -7667,18 +7671,15 @@ const char *mdbx_dump_val(const MDBX_val *key, char *const buf,
char *const detent = buf + bufsize - 2;
char *ptr = buf;
*ptr++ = '<';
for (size_t i = 0; i < key->iov_len; i++) {
const ptrdiff_t left = detent - ptr;
assert(left > 0);
int len = snprintf(ptr, left, "%02x", data[i]);
if (len < 0 || len >= left)
break;
ptr += len;
}
if (ptr < detent) {
ptr[0] = '>';
ptr[1] = '\0';
for (size_t i = 0; i < key->iov_len && ptr < detent; i++) {
const char hex[16] = {'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
*ptr++ = hex[data[i] >> 4];
*ptr++ = hex[data[i] & 15];
}
if (ptr < detent)
*ptr++ = '>';
*ptr = '\0';
}
return buf;
}
@@ -10505,27 +10506,47 @@ MDBX_MAYBE_UNUSED static __always_inline size_t __builtin_clzl(size_t value) {
#define MDBX_ATTRIBUTE_TARGET(target) __attribute__((__target__(target)))
#endif /* MDBX_ATTRIBUTE_TARGET */
#if defined(__SSE2__)
#ifndef MDBX_GCC_FASTMATH_i686_SIMD_WORKAROUND
/* Workaround for GCC's bug with `-m32 -march=i686 -Ofast`
* gcc/i686-buildroot-linux-gnu/12.2.0/include/xmmintrin.h:814:1:
* error: inlining failed in call to 'always_inline' '_mm_movemask_ps':
* target specific option mismatch */
#if !defined(__FAST_MATH__) || !__FAST_MATH__ || !defined(__GNUC__) || \
defined(__e2k__) || defined(__clang__) || defined(__amd64__) || \
defined(__SSE2__)
#define MDBX_GCC_FASTMATH_i686_SIMD_WORKAROUND 0
#else
#define MDBX_GCC_FASTMATH_i686_SIMD_WORKAROUND 1
#endif
#endif /* MDBX_GCC_FASTMATH_i686_SIMD_WORKAROUND */
#if defined(__SSE2__) && defined(__SSE__)
#define MDBX_ATTRIBUTE_TARGET_SSE2 /* nope */
#elif (defined(_M_IX86_FP) && _M_IX86_FP >= 2) || defined(__amd64__)
#define __SSE2__
#define MDBX_ATTRIBUTE_TARGET_SSE2 /* nope */
#elif defined(MDBX_ATTRIBUTE_TARGET) && defined(__ia32__)
#define MDBX_ATTRIBUTE_TARGET_SSE2 MDBX_ATTRIBUTE_TARGET("sse2")
#elif defined(MDBX_ATTRIBUTE_TARGET) && defined(__ia32__) && \
!MDBX_GCC_FASTMATH_i686_SIMD_WORKAROUND
#define MDBX_ATTRIBUTE_TARGET_SSE2 MDBX_ATTRIBUTE_TARGET("sse,sse2")
#endif /* __SSE2__ */
#if defined(__AVX2__)
#define MDBX_ATTRIBUTE_TARGET_AVX2 /* nope */
#elif defined(MDBX_ATTRIBUTE_TARGET) && defined(__ia32__)
#define MDBX_ATTRIBUTE_TARGET_AVX2 MDBX_ATTRIBUTE_TARGET("avx2")
#elif defined(MDBX_ATTRIBUTE_TARGET) && defined(__ia32__) && \
!MDBX_GCC_FASTMATH_i686_SIMD_WORKAROUND
#define MDBX_ATTRIBUTE_TARGET_AVX2 MDBX_ATTRIBUTE_TARGET("sse,sse2,avx,avx2")
#endif /* __AVX2__ */
#if defined(MDBX_ATTRIBUTE_TARGET_AVX2)
#if defined(__AVX512BW__)
#define MDBX_ATTRIBUTE_TARGET_AVX512BW /* nope */
#elif defined(MDBX_ATTRIBUTE_TARGET) && defined(__ia32__) && \
!MDBX_GCC_FASTMATH_i686_SIMD_WORKAROUND && \
(__GNUC_PREREQ(6, 0) || __CLANG_PREREQ(5, 0))
#define MDBX_ATTRIBUTE_TARGET_AVX512BW MDBX_ATTRIBUTE_TARGET("avx512bw")
#define MDBX_ATTRIBUTE_TARGET_AVX512BW \
MDBX_ATTRIBUTE_TARGET("sse,sse2,avx,avx2,avx512bw")
#endif /* __AVX512BW__ */
#endif /* MDBX_ATTRIBUTE_TARGET_AVX2 for MDBX_ATTRIBUTE_TARGET_AVX512BW */
#ifdef MDBX_ATTRIBUTE_TARGET_SSE2
MDBX_ATTRIBUTE_TARGET_SSE2 static __always_inline unsigned
@@ -10599,6 +10620,15 @@ diffcmp2mask_avx2(const pgno_t *const ptr, const ptrdiff_t offset,
return _mm256_movemask_ps(*(const __m256 *)&cmp);
}
MDBX_ATTRIBUTE_TARGET_AVX2 static __always_inline unsigned
diffcmp2mask_sse2avx(const pgno_t *const ptr, const ptrdiff_t offset,
const __m128i pattern) {
const __m128i f = _mm_loadu_si128((const __m128i *)ptr);
const __m128i l = _mm_loadu_si128((const __m128i *)(ptr + offset));
const __m128i cmp = _mm_cmpeq_epi32(_mm_sub_epi32(f, l), pattern);
return _mm_movemask_ps(*(const __m128 *)&cmp);
}
MDBX_MAYBE_UNUSED __hot MDBX_ATTRIBUTE_TARGET_AVX2 static pgno_t *
scan4seq_avx2(pgno_t *range, const size_t len, const size_t seq) {
assert(seq > 0 && len > seq);
@@ -10644,7 +10674,7 @@ scan4seq_avx2(pgno_t *range, const size_t len, const size_t seq) {
}
#endif /* __SANITIZE_ADDRESS__ */
if (range - 3 > detent) {
mask = diffcmp2mask_sse2(range - 3, offset, *(const __m128i *)&pattern);
mask = diffcmp2mask_sse2avx(range - 3, offset, *(const __m128i *)&pattern);
if (mask)
return range + 28 - __builtin_clz(mask);
range -= 4;
@@ -10718,7 +10748,7 @@ scan4seq_avx512bw(pgno_t *range, const size_t len, const size_t seq) {
range -= 8;
}
if (range - 3 > detent) {
mask = diffcmp2mask_sse2(range - 3, offset, *(const __m128i *)&pattern);
mask = diffcmp2mask_sse2avx(range - 3, offset, *(const __m128i *)&pattern);
if (mask)
return range + 28 - __builtin_clz(mask);
range -= 4;
@@ -11248,7 +11278,7 @@ bailout:
#if MDBX_ENABLE_PROFGC
size_t majflt_after;
prof->xtime_cpu += osal_cputime(&majflt_after) - cputime_before;
prof->majflt += majflt_after - majflt_before;
prof->majflt += (uint32_t)(majflt_after - majflt_before);
#endif /* MDBX_ENABLE_PROFGC */
return ret;
}
@@ -12122,13 +12152,9 @@ retry:;
}
const bool inside_txn = (env->me_txn0->mt_owner == osal_thread_self());
meta_ptr_t head;
if (inside_txn | locked)
head = meta_recent(env, &env->me_txn0->tw.troika);
else {
const meta_troika_t troika = meta_tap(env);
head = meta_recent(env, &troika);
}
const meta_troika_t troika =
(inside_txn | locked) ? env->me_txn0->tw.troika : meta_tap(env);
const meta_ptr_t head = meta_recent(env, &troika);
const uint64_t unsynced_pages =
atomic_load64(&env->me_lck->mti_unsynced_pages, mo_Relaxed);
if (unsynced_pages == 0) {
@@ -12141,10 +12167,19 @@ retry:;
if (!inside_txn && locked && (env->me_flags & MDBX_WRITEMAP) &&
unlikely(head.ptr_c->mm_geo.next >
bytes2pgno(env, env->me_dxb_mmap.current))) {
rc = dxb_resize(env, head.ptr_c->mm_geo.next, head.ptr_c->mm_geo.now,
head.ptr_c->mm_geo.upper, implicit_grow);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
if (unlikely(env->me_stuck_meta >= 0) &&
troika.recent != (uint8_t)env->me_stuck_meta) {
NOTICE("skip %s since wagering meta-page (%u) is mispatch the recent "
"meta-page (%u)",
"sync datafile", env->me_stuck_meta, troika.recent);
rc = MDBX_RESULT_TRUE;
} else {
rc = dxb_resize(env, head.ptr_c->mm_geo.next, head.ptr_c->mm_geo.now,
head.ptr_c->mm_geo.upper, implicit_grow);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
}
}
const size_t autosync_threshold =
@@ -12223,6 +12258,14 @@ retry:;
eASSERT(env, inside_txn || locked);
eASSERT(env, !inside_txn || (flags & MDBX_SHRINK_ALLOWED) == 0);
if (!head.is_steady && unlikely(env->me_stuck_meta >= 0) &&
troika.recent != (uint8_t)env->me_stuck_meta) {
NOTICE("skip %s since wagering meta-page (%u) is mispatch the recent "
"meta-page (%u)",
"sync datafile", env->me_stuck_meta, troika.recent);
rc = MDBX_RESULT_TRUE;
goto bailout;
}
if (!head.is_steady || ((flags & MDBX_SAFE_NOSYNC) == 0 && unsynced_pages)) {
DEBUG("meta-head %" PRIaPGNO ", %s, sync_pending %" PRIu64,
data_page(head.ptr_c)->mp_pgno, durable_caption(head.ptr_c),
@@ -13122,7 +13165,7 @@ static int txn_renew(MDBX_txn *txn, const unsigned flags) {
}
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
txn_valgrind(env, txn);
#endif
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */
txn->mt_owner = tid;
return MDBX_SUCCESS;
}
@@ -13190,7 +13233,7 @@ int mdbx_txn_renew(MDBX_txn *txn) {
rc = txn_renew(txn, MDBX_TXN_RDONLY);
if (rc == MDBX_SUCCESS) {
txn->mt_owner = osal_thread_self();
tASSERT(txn, txn->mt_owner == osal_thread_self());
DEBUG("renew txn %" PRIaTXN "%c %p on env %p, root page %" PRIaPGNO
"/%" PRIaPGNO,
txn->mt_txnid, (txn->mt_flags & MDBX_TXN_RDONLY) ? 'r' : 'w',
@@ -13804,8 +13847,10 @@ static int txn_end(MDBX_txn *txn, const unsigned mode) {
txn->mt_txnid == slot->mr_txnid.weak &&
slot->mr_txnid.weak >= env->me_lck->mti_oldest_reader.weak);
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
atomic_add32(&env->me_ignore_EDEADLK, 1);
txn_valgrind(env, nullptr);
#endif
atomic_sub32(&env->me_ignore_EDEADLK, 1);
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */
atomic_store32(&slot->mr_snapshot_pages_used, 0, mo_Relaxed);
safe64_reset(&slot->mr_txnid, false);
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true,
@@ -13834,7 +13879,7 @@ static int txn_end(MDBX_txn *txn, const unsigned mode) {
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
if (txn == env->me_txn0)
txn_valgrind(env, nullptr);
#endif
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */
txn->mt_flags = MDBX_TXN_FINISHED;
txn->mt_owner = 0;
@@ -14249,6 +14294,14 @@ static int gcu_prepare_backlog(MDBX_txn *txn, gcu_context_t *ctx) {
}
static __inline void gcu_clean_reserved(MDBX_env *env, MDBX_val pnl) {
#if MDBX_DEBUG && (defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__))
/* Для предотвращения предупреждения Valgrind из mdbx_dump_val()
* вызванное через макрос DVAL_DEBUG() на выходе
* из cursor_set(MDBX_SET_KEY), которая вызывается ниже внутри update_gc() в
* цикле очистки и цикле заполнения зарезервированных элементов. */
memset(pnl.iov_base, 0xBB, pnl.iov_len);
#endif /* MDBX_DEBUG && (MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__) */
/* PNL is initially empty, zero out at least the length */
memset(pnl.iov_base, 0, sizeof(pgno_t));
if ((env->me_flags & (MDBX_WRITEMAP | MDBX_NOMEMINIT)) == 0)
@@ -14564,6 +14617,15 @@ retry:
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
#if MDBX_DEBUG && (defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__))
/* Для предотвращения предупреждения Valgrind из mdbx_dump_val()
* вызванное через макрос DVAL_DEBUG() на выходе
* из cursor_set(MDBX_SET_KEY), которая вызывается как выше в цикле
* очистки, так и ниже в цикле заполнения зарезервированных элементов.
*/
memset(data.iov_base, 0xBB, data.iov_len);
#endif /* MDBX_DEBUG && (MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__) */
if (retired_pages_before == MDBX_PNL_GETSIZE(txn->tw.retired_pages)) {
const size_t at = (ctx->lifo == MDBX_PNL_ASCENDING)
? left - chunk
@@ -14601,6 +14663,16 @@ retry:
rc = cursor_put_nochecklen(&ctx->cursor, &key, &data, MDBX_RESERVE);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
#if MDBX_DEBUG && (defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__))
/* Для предотвращения предупреждения Valgrind из mdbx_dump_val()
* вызванное через макрос DVAL_DEBUG() на выходе
* из cursor_set(MDBX_SET_KEY), которая вызывается как выше в цикле
* очистки, так и ниже в цикле заполнения зарезервированных элементов.
*/
memset(data.iov_base, 0xBB, data.iov_len);
#endif /* MDBX_DEBUG && (MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__) */
/* Retry if tw.retired_pages[] grew during the Put() */
} while (data.iov_len < MDBX_PNL_SIZEOF(txn->tw.retired_pages));
@@ -15085,7 +15157,7 @@ bailout:
MDBX_PNL_SETSIZE(txn->tw.relist, 0);
#if MDBX_ENABLE_PROFGC
env->me_lck->mti_pgop_stat.gc_prof.wloops += ctx->loop;
env->me_lck->mti_pgop_stat.gc_prof.wloops += (uint32_t)ctx->loop;
#endif /* MDBX_ENABLE_PROFGC */
TRACE("<<< %zu loops, rc = %d", ctx->loop, rc);
return rc;
@@ -15920,6 +15992,7 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
(size_t)(commit_txnid - txn->mt_txnid));
}
#endif
meta.unsafe_sign = MDBX_DATASIGN_NONE;
meta_set_txnid(env, &meta, commit_txnid);
rc = sync_locked(env, env->me_flags | txn->mt_flags | MDBX_SHRINK_ALLOWED,
@@ -17768,8 +17841,9 @@ __cold static int setup_dxb(MDBX_env *env, const int lck_rc,
mdbx_is_readahead_reasonable(used_bytes, 0) == MDBX_RESULT_TRUE;
#endif /* MDBX_ENABLE_MADVISE */
err = osal_mmap(env->me_flags, &env->me_dxb_mmap, env->me_dbgeo.now,
env->me_dbgeo.upper, lck_rc ? MMAP_OPTION_TRUNCATE : 0);
err = osal_mmap(
env->me_flags, &env->me_dxb_mmap, env->me_dbgeo.now, env->me_dbgeo.upper,
(lck_rc && env->me_stuck_meta < 0) ? MMAP_OPTION_TRUNCATE : 0);
if (unlikely(err != MDBX_SUCCESS))
return err;
@@ -17969,7 +18043,12 @@ __cold static int setup_dxb(MDBX_env *env, const int lck_rc,
}
const meta_ptr_t recent = meta_recent(env, &troika);
if (memcmp(&header.mm_geo, &recent.ptr_c->mm_geo, sizeof(header.mm_geo))) {
if (/* не учитываем различия в geo.next */
header.mm_geo.grow_pv != recent.ptr_c->mm_geo.grow_pv ||
header.mm_geo.shrink_pv != recent.ptr_c->mm_geo.shrink_pv ||
header.mm_geo.lower != recent.ptr_c->mm_geo.lower ||
header.mm_geo.upper != recent.ptr_c->mm_geo.upper ||
header.mm_geo.now != recent.ptr_c->mm_geo.now) {
if ((env->me_flags & MDBX_RDONLY) != 0 ||
/* recovery mode */ env->me_stuck_meta >= 0) {
WARNING("skipped update meta.geo in %s mode: from l%" PRIaPGNO
@@ -18419,8 +18498,12 @@ __cold static int __must_check_result override_meta(MDBX_env *env,
if (unlikely(MDBX_IS_ERROR(rc)))
return MDBX_PROBLEM;
if (shape && memcmp(model, shape, sizeof(MDBX_meta)) == 0)
if (shape && memcmp(model, shape, sizeof(MDBX_meta)) == 0) {
NOTICE("skip overriding meta-%zu since no changes "
"for txnid #%" PRIaTXN,
target, txnid);
return MDBX_SUCCESS;
}
if (env->me_flags & MDBX_WRITEMAP) {
#if MDBX_ENABLE_PGOP_STAT
@@ -18474,14 +18557,16 @@ __cold int mdbx_env_turn_for_recovery(MDBX_env *env, unsigned target) {
MDBX_EXCLUSIVE))
return MDBX_EPERM;
const MDBX_meta *target_meta = METAPAGE(env, target);
txnid_t new_txnid = safe64_txnid_next(constmeta_txnid(target_meta));
for (size_t n = 0; n < NUM_METAS; ++n) {
const MDBX_meta *const target_meta = METAPAGE(env, target);
txnid_t new_txnid = constmeta_txnid(target_meta);
if (new_txnid < MIN_TXNID)
new_txnid = MIN_TXNID;
for (unsigned n = 0; n < NUM_METAS; ++n) {
if (n == target)
continue;
MDBX_meta meta = *METAPAGE(env, target);
if (validate_meta(env, &meta, pgno2page(env, n), (pgno_t)n, nullptr) !=
MDBX_SUCCESS) {
MDBX_page *const page = pgno2page(env, n);
MDBX_meta meta = *page_meta(page);
if (validate_meta(env, &meta, page, n, nullptr) != MDBX_SUCCESS) {
int err = override_meta(env, n, 0, nullptr);
if (unlikely(err != MDBX_SUCCESS))
return err;
@@ -19270,7 +19355,7 @@ bailout:
} else {
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
txn_valgrind(env, nullptr);
#endif
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */
}
osal_free(env_pathname.buffer_for_free);
return rc;
@@ -20831,11 +20916,13 @@ static __hot int cursor_get(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
}
break;
case MDBX_GET_MULTIPLE:
if (unlikely(data == NULL || !(mc->mc_flags & C_INITIALIZED)))
if (unlikely(!data))
return MDBX_EINVAL;
if (unlikely(!(mc->mc_db->md_flags & MDBX_DUPFIXED)))
if (unlikely((mc->mc_db->md_flags & MDBX_DUPFIXED) == 0))
return MDBX_INCOMPATIBLE;
rc = MDBX_SUCCESS;
rc = (mc->mc_flags & C_INITIALIZED)
? MDBX_SUCCESS
: cursor_set(mc, key, data, MDBX_SET).err;
if ((mc->mc_xcursor->mx_cursor.mc_flags & (C_INITIALIZED | C_EOF)) !=
C_INITIALIZED)
break;
@@ -21194,9 +21281,6 @@ static __hot int cursor_touch(MDBX_cursor *const mc, const MDBX_val *key,
static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
MDBX_val *data, unsigned flags) {
MDBX_page *sub_root = nullptr;
MDBX_val xdata, *rdata, dkey, olddata;
MDBX_db nested_dupdb;
int err;
DKBUF_DEBUG;
MDBX_env *const env = mc->mc_txn->mt_env;
@@ -21204,7 +21288,6 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
DDBI(mc), DKEY_DEBUG(key), key->iov_len,
DVAL_DEBUG((flags & MDBX_RESERVE) ? nullptr : data), data->iov_len);
int dupdata_flag = 0;
if ((flags & MDBX_CURRENT) != 0 && (mc->mc_flags & C_SUB) == 0) {
if (unlikely(flags & (MDBX_APPEND | MDBX_NOOVERWRITE)))
return MDBX_EINVAL;
@@ -21263,10 +21346,11 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
rc = MDBX_NO_ROOT;
} else if ((flags & MDBX_CURRENT) == 0) {
bool exact = false;
MDBX_val lastkey, olddata;
if ((flags & MDBX_APPEND) && mc->mc_db->md_entries > 0) {
rc = cursor_last(mc, &dkey, &olddata);
rc = cursor_last(mc, &lastkey, &olddata);
if (likely(rc == MDBX_SUCCESS)) {
const int cmp = mc->mc_dbx->md_cmp(key, &dkey);
const int cmp = mc->mc_dbx->md_cmp(key, &lastkey);
if (likely(cmp > 0)) {
mc->mc_ki[mc->mc_top]++; /* step forward for appending */
rc = MDBX_NOTFOUND;
@@ -21331,7 +21415,7 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
}
mc->mc_flags &= ~C_DEL;
rdata = data;
MDBX_val xdata, *rdata = data;
size_t mcount = 0, dcount = 0;
if (unlikely(flags & MDBX_MULTIPLE)) {
dcount = data[1].iov_len;
@@ -21376,11 +21460,15 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
mc->mc_flags |= C_INITIALIZED;
}
bool insert_key, insert_data, do_sub = false;
insert_key = insert_data = (rc != MDBX_SUCCESS);
MDBX_val dkey, olddata;
MDBX_db nested_dupdb;
MDBX_page *sub_root = nullptr;
bool insert_key, insert_data;
uint16_t fp_flags = P_LEAF;
MDBX_page *fp = env->me_pbuf;
fp->mp_txnid = mc->mc_txn->mt_front;
insert_key = insert_data = (rc != MDBX_SUCCESS);
dkey.iov_base = nullptr;
if (insert_key) {
/* The key does not exist */
DEBUG("inserting key at index %i", mc->mc_ki[mc->mc_top]);
@@ -21555,7 +21643,6 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
/* Back up original data item */
memcpy(dkey.iov_base = fp + 1, olddata.iov_base,
dkey.iov_len = olddata.iov_len);
dupdata_flag = 1;
/* Make sub-page header for the dup items, with dummy body */
fp->mp_flags = P_LEAF | P_SUBP;
@@ -21659,11 +21746,10 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
}
}
rdata = &xdata;
flags |= F_DUPDATA;
do_sub = true;
if (!insert_key)
node_del(mc, 0);
rdata = &xdata;
flags |= F_DUPDATA;
goto new_sub;
}
@@ -21748,8 +21834,8 @@ new_sub:;
* storing the user data in the keys field, so there are strict
* size limits on dupdata. The actual data fields of the child
* DB are all zero size. */
if (do_sub) {
int xflags;
if (flags & F_DUPDATA) {
unsigned xflags;
size_t ecount;
put_sub:
xdata.iov_len = 0;
@@ -21770,13 +21856,11 @@ new_sub:;
if (sub_root)
mc->mc_xcursor->mx_cursor.mc_pg[0] = sub_root;
/* converted, write the original data first */
if (dupdata_flag) {
if (dkey.iov_base) {
rc = cursor_put_nochecklen(&mc->mc_xcursor->mx_cursor, &dkey, &xdata,
xflags);
if (unlikely(rc))
goto bad_sub;
/* we've done our job */
dkey.iov_len = 0;
}
if (!(node_flags(node) & F_SUBDATA) || sub_root) {
/* Adjust other cursors pointing to mp */
@@ -21793,7 +21877,7 @@ new_sub:;
continue;
if (m2->mc_pg[i] == mp) {
if (m2->mc_ki[i] == mc->mc_ki[i]) {
err = cursor_xinit2(m2, mx, dupdata_flag);
err = cursor_xinit2(m2, mx, dkey.iov_base != nullptr);
if (unlikely(err != MDBX_SUCCESS))
return err;
} else if (!insert_key && m2->mc_ki[i] < nkeys) {
@@ -21837,6 +21921,7 @@ new_sub:;
if (mcount < dcount) {
data[0].iov_base = ptr_disp(data[0].iov_base, data[0].iov_len);
insert_key = insert_data = false;
dkey.iov_base = nullptr;
goto more;
}
}
@@ -25091,6 +25176,10 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, MDBX_val *data,
tASSERT(txn, XCURSOR_INITED(&cx.outer) &&
cx.outer.mc_xcursor->mx_db.md_entries > 1);
rc = MDBX_EMULTIVAL;
if ((flags & MDBX_NOOVERWRITE) == 0) {
flags -= MDBX_CURRENT;
rc = cursor_del(&cx.outer, MDBX_ALLDUPS);
}
}
}
}
@@ -33276,10 +33365,10 @@ __dll_export
const struct MDBX_version_info mdbx_version = {
0,
12,
6,
8,
0,
{"2023-04-29T21:30:35+03:00", "44de01dd81ac366a7d37111eaf72726edebe5528", "c019631a8c88a98a11d814e4111a2a9ae8cb4099",
"v0.12.6-0-gc019631a"},
{"2023-10-17T18:16:29+03:00", "24f7245ccd42c9bf34a93d07de56c598302e5e46", "02c7cf2a9c1004b3d3dae73cb006c9d7ed008665",
"v0.12.8-0-g02c7cf2a"},
sourcery};
__dll_export
@@ -35027,6 +35116,11 @@ __cold static int mdbx_ipclock_failed(MDBX_env *env, osal_ipclock_t *ipc,
#error "FIXME"
#endif /* MDBX_LOCKING */
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
if (rc == EDEADLK && atomic_load32(&env->me_ignore_EDEADLK, mo_Relaxed) > 0)
return rc;
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */
ERROR("mutex (un)lock failed, %s", mdbx_strerror(err));
if (rc != EDEADLK)
env->me_flags |= MDBX_FATAL_ERROR;

View File

@@ -12,7 +12,7 @@
* <http://www.OpenLDAP.org/license.html>. */
#define xMDBX_ALLOY 1
#define MDBX_BUILD_SOURCERY a0e7c54f688eecaf45ddd7493b737f88a97e4e8b0fdaa55c9d3b00d69e0c8548_v0_12_6_0_gc019631a
#define MDBX_BUILD_SOURCERY 30c8f70db1f021dc2bfb201ba04efdcc34fc7495127f517f9624f18c0100b8ab_v0_12_8_0_g02c7cf2a
#ifdef MDBX_CONFIG_H
#include MDBX_CONFIG_H
#endif
@@ -3734,6 +3734,7 @@ struct MDBX_env {
int me_valgrind_handle;
#endif
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
MDBX_atomic_uint32_t me_ignore_EDEADLK;
pgno_t me_poison_edge;
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */
@@ -4272,6 +4273,10 @@ namespace mdbx {
"into an incompatible memory allocation scheme.");
}
[[noreturn]] __cold void throw_bad_value_size() {
throw bad_value_size(MDBX_BAD_VALSIZE);
}
__cold exception::exception(const ::mdbx::error &error) noexcept
: base(error.what()), error_(error) {}
@@ -4522,6 +4527,109 @@ bool slice::is_printable(bool disable_utf8) const noexcept {
return true;
}
#ifdef MDBX_U128_TYPE
MDBX_U128_TYPE slice::as_uint128() const {
static_assert(sizeof(MDBX_U128_TYPE) == 16, "WTF?");
if (size() == 16) {
MDBX_U128_TYPE r;
memcpy(&r, data(), sizeof(r));
return r;
} else
return as_uint64();
}
#endif /* MDBX_U128_TYPE */
uint64_t slice::as_uint64() const {
static_assert(sizeof(uint64_t) == 8, "WTF?");
if (size() == 8) {
uint64_t r;
memcpy(&r, data(), sizeof(r));
return r;
} else
return as_uint32();
}
uint32_t slice::as_uint32() const {
static_assert(sizeof(uint32_t) == 4, "WTF?");
if (size() == 4) {
uint32_t r;
memcpy(&r, data(), sizeof(r));
return r;
} else
return as_uint16();
}
uint16_t slice::as_uint16() const {
static_assert(sizeof(uint16_t) == 2, "WTF?");
if (size() == 2) {
uint16_t r;
memcpy(&r, data(), sizeof(r));
return r;
} else
return as_uint8();
}
uint8_t slice::as_uint8() const {
static_assert(sizeof(uint8_t) == 1, "WTF?");
if (size() == 1)
return *static_cast<const uint8_t *>(data());
else if (size() == 0)
return 0;
else
MDBX_CXX20_UNLIKELY throw_bad_value_size();
}
#ifdef MDBX_I128_TYPE
MDBX_I128_TYPE slice::as_int128() const {
static_assert(sizeof(MDBX_I128_TYPE) == 16, "WTF?");
if (size() == 16) {
MDBX_I128_TYPE r;
memcpy(&r, data(), sizeof(r));
return r;
} else
return as_int64();
}
#endif /* MDBX_I128_TYPE */
int64_t slice::as_int64() const {
static_assert(sizeof(int64_t) == 8, "WTF?");
if (size() == 8) {
uint64_t r;
memcpy(&r, data(), sizeof(r));
return r;
} else
return as_int32();
}
int32_t slice::as_int32() const {
static_assert(sizeof(int32_t) == 4, "WTF?");
if (size() == 4) {
int32_t r;
memcpy(&r, data(), sizeof(r));
return r;
} else
return as_int16();
}
int16_t slice::as_int16() const {
static_assert(sizeof(int16_t) == 2, "WTF?");
if (size() == 2) {
int16_t r;
memcpy(&r, data(), sizeof(r));
return r;
} else
return as_int8();
}
int8_t slice::as_int8() const {
if (size() == 1)
return *static_cast<const int8_t *>(data());
else if (size() == 0)
return 0;
else
MDBX_CXX20_UNLIKELY throw_bad_value_size();
}
//------------------------------------------------------------------------------
char *to_hex::write_bytes(char *__restrict const dest, size_t dest_size) const {

View File

@@ -1876,7 +1876,8 @@ enum MDBX_error_t {
MDBX_BAD_RSLOT = -30783,
/** Transaction is not valid for requested operation,
* e.g. had errored and be must aborted, has a child, or is invalid */
* e.g. had errored and be must aborted, has a child/nested transaction,
* or is invalid */
MDBX_BAD_TXN = -30782,
/** Invalid size or alignment of key or data for target database,
@@ -2699,11 +2700,12 @@ MDBX_DEPRECATED LIBMDBX_INLINE_API(int, mdbx_env_info,
* success. The \ref MDBX_RESULT_TRUE means no data pending for flush
* to disk, and 0 otherwise. Some possible errors are:
*
* \retval MDBX_EACCES the environment is read-only.
* \retval MDBX_BUSY the environment is used by other thread
* \retval MDBX_EACCES The environment is read-only.
* \retval MDBX_BUSY The environment is used by other thread
* and `nonblock=true`.
* \retval MDBX_EINVAL an invalid parameter was specified.
* \retval MDBX_EIO an error occurred during synchronization. */
* \retval MDBX_EINVAL An invalid parameter was specified.
* \retval MDBX_EIO An error occurred during the flushing/writing data
* to a storage medium/disk. */
LIBMDBX_API int mdbx_env_sync_ex(MDBX_env *env, bool force, bool nonblock);
/** \brief The shortcut to calling \ref mdbx_env_sync_ex() with
@@ -2846,9 +2848,9 @@ LIBMDBX_INLINE_API(int, mdbx_env_get_syncperiod,
*
* Only a single thread may call this function. All transactions, databases,
* and cursors must already be closed before calling this function. Attempts
* to use any such handles after calling this function will cause a `SIGSEGV`.
* The environment handle will be freed and must not be used again after this
* call.
* to use any such handles after calling this function is UB and would cause
* a `SIGSEGV`. The environment handle will be freed and must not be used again
* after this call.
*
* \param [in] env An environment handle returned by
* \ref mdbx_env_create().
@@ -2878,7 +2880,8 @@ LIBMDBX_INLINE_API(int, mdbx_env_get_syncperiod,
* is expected, i.e. \ref MDBX_env instance was freed in
* proper manner.
*
* \retval MDBX_EIO An error occurred during synchronization. */
* \retval MDBX_EIO An error occurred during the flushing/writing data
* to a storage medium/disk. */
LIBMDBX_API int mdbx_env_close_ex(MDBX_env *env, bool dont_sync);
/** \brief The shortcut to calling \ref mdbx_env_close_ex() with
@@ -3918,7 +3921,8 @@ LIBMDBX_API int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency);
* by current thread.
* \retval MDBX_EINVAL Transaction handle is NULL.
* \retval MDBX_ENOSPC No more disk space.
* \retval MDBX_EIO A system-level I/O error occurred.
* \retval MDBX_EIO An error occurred during the flushing/writing
* data to a storage medium/disk.
* \retval MDBX_ENOMEM Out of memory. */
LIBMDBX_INLINE_API(int, mdbx_txn_commit, (MDBX_txn * txn)) {
return mdbx_txn_commit_ex(txn, NULL);
@@ -4031,7 +4035,7 @@ LIBMDBX_API int mdbx_txn_renew(MDBX_txn *txn);
/** \brief The fours integers markers (aka "canary") associated with the
* environment.
* \ingroup c_crud
* \see mdbx_canary_set()
* \see mdbx_canary_put()
* \see mdbx_canary_get()
*
* The `x`, `y` and `z` values could be set by \ref mdbx_canary_put(), while the
@@ -4069,10 +4073,10 @@ LIBMDBX_API int mdbx_canary_put(MDBX_txn *txn, const MDBX_canary *canary);
/** \brief Returns fours integers markers (aka "canary") associated with the
* environment.
* \ingroup c_crud
* \see mdbx_canary_set()
* \see mdbx_canary_put()
*
* \param [in] txn A transaction handle returned by \ref mdbx_txn_begin().
* \param [in] canary The address of an MDBX_canary structure where the
* \param [in] canary The address of an \ref MDBX_canary structure where the
* information will be copied.
*
* \returns A non-zero error value on failure and 0 on success. */
@@ -4084,9 +4088,9 @@ LIBMDBX_API int mdbx_canary_get(const MDBX_txn *txn, MDBX_canary *canary);
* \see mdbx_get_datacmp \see mdbx_dcmp()
*
* \anchor avoid_custom_comparators
* It is recommend not using custom comparison functions, but instead
* converting the keys to one of the forms that are suitable for built-in
* comparators (for instance take look to the \ref value2key).
* \deprecated It is recommend not using custom comparison functions, but
* instead converting the keys to one of the forms that are suitable for
* built-in comparators (for instance take look to the \ref value2key).
* The reasons to not using custom comparators are:
* - The order of records could not be validated without your code.
* So `mdbx_chk` utility will reports "wrong order" errors
@@ -4316,7 +4320,7 @@ LIBMDBX_API int mdbx_dbi_dupsort_depthmask(MDBX_txn *txn, MDBX_dbi dbi,
enum MDBX_dbi_state_t {
/** DB was written in this txn */
MDBX_DBI_DIRTY = 0x01,
/** Named-DB record is older than txnID */
/** Cached Named-DB record is older than txnID */
MDBX_DBI_STALE = 0x02,
/** Named-DB handle opened in this txn */
MDBX_DBI_FRESH = 0x04,
@@ -4398,9 +4402,14 @@ LIBMDBX_API int mdbx_drop(MDBX_txn *txn, MDBX_dbi dbi, bool del);
* items requires the use of \ref mdbx_cursor_get().
*
* \note The memory pointed to by the returned values is owned by the
* database. The caller need not dispose of the memory, and may not
* modify it in any way. For values returned in a read-only transaction
* any modification attempts will cause a `SIGSEGV`.
* database. The caller MUST not dispose of the memory, and MUST not modify it
* in any way regardless in a read-only nor read-write transactions!
* For case a database opened without the \ref MDBX_WRITEMAP modification
* attempts likely will cause a `SIGSEGV`. However, when a database opened with
* the \ref MDBX_WRITEMAP or in case values returned inside read-write
* transaction are located on a "dirty" (modified and pending to commit) pages,
* such modification will silently accepted and likely will lead to DB and/or
* data corruption.
*
* \note Values returned from the database are valid only until a
* subsequent update operation, or the end of the transaction.
@@ -4650,7 +4659,7 @@ LIBMDBX_API int mdbx_replace_ex(MDBX_txn *txn, MDBX_dbi dbi,
LIBMDBX_API int mdbx_del(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
const MDBX_val *data);
/** \brief Create a cursor handle but not bind it to transaction nor DBI handle.
/** \brief Create a cursor handle but not bind it to transaction nor DBI-handle.
* \ingroup c_cursors
*
* A cursor cannot be used when its database handle is closed. Nor when its
@@ -4674,7 +4683,7 @@ LIBMDBX_API int mdbx_del(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
* \returns Created cursor handle or NULL in case out of memory. */
LIBMDBX_API MDBX_cursor *mdbx_cursor_create(void *context);
/** \brief Set application information associated with the \ref MDBX_cursor.
/** \brief Set application information associated with the cursor.
* \ingroup c_cursors
* \see mdbx_cursor_get_userctx()
*
@@ -4697,11 +4706,11 @@ LIBMDBX_API int mdbx_cursor_set_userctx(MDBX_cursor *cursor, void *ctx);
MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API void *
mdbx_cursor_get_userctx(const MDBX_cursor *cursor);
/** \brief Bind cursor to specified transaction and DBI handle.
/** \brief Bind cursor to specified transaction and DBI-handle.
* \ingroup c_cursors
*
* Using of the `mdbx_cursor_bind()` is equivalent to calling
* \ref mdbx_cursor_renew() but with specifying an arbitrary dbi handle.
* \ref mdbx_cursor_renew() but with specifying an arbitrary DBI-handle.
*
* A cursor may be associated with a new transaction, and referencing a new or
* the same database handle as it was created with. This may be done whether the
@@ -4715,7 +4724,7 @@ mdbx_cursor_get_userctx(const MDBX_cursor *cursor);
*
* \param [in] txn A transaction handle returned by \ref mdbx_txn_begin().
* \param [in] dbi A database handle returned by \ref mdbx_dbi_open().
* \param [out] cursor A cursor handle returned by \ref mdbx_cursor_create().
* \param [in] cursor A cursor handle returned by \ref mdbx_cursor_create().
*
* \returns A non-zero error value on failure and 0 on success,
* some possible errors are:
@@ -4774,15 +4783,14 @@ LIBMDBX_API int mdbx_cursor_open(MDBX_txn *txn, MDBX_dbi dbi,
* or \ref mdbx_cursor_create(). */
LIBMDBX_API void mdbx_cursor_close(MDBX_cursor *cursor);
/** \brief Renew a cursor handle.
/** \brief Renew a cursor handle for use within the given transaction.
* \ingroup c_cursors
*
* The cursor may be associated with a new transaction, and referencing a new or
* the same database handle as it was created with. This may be done whether the
* previous transaction is live or dead.
* A cursor may be associated with a new transaction whether the previous
* transaction is running or finished.
*
* Using of the `mdbx_cursor_renew()` is equivalent to calling
* \ref mdbx_cursor_bind() with the DBI handle that previously
* \ref mdbx_cursor_bind() with the DBI-handle that previously
* the cursor was used with.
*
* \note In contrast to LMDB, the MDBX allow any cursor to be re-used by using
@@ -4796,7 +4804,9 @@ LIBMDBX_API void mdbx_cursor_close(MDBX_cursor *cursor);
* some possible errors are:
* \retval MDBX_THREAD_MISMATCH Given transaction is not owned
* by current thread.
* \retval MDBX_EINVAL An invalid parameter was specified. */
* \retval MDBX_EINVAL An invalid parameter was specified.
* \retval MDBX_BAD_DBI The cursor was not bound to a DBI-handle
* or such a handle became invalid. */
LIBMDBX_API int mdbx_cursor_renew(MDBX_txn *txn, MDBX_cursor *cursor);
/** \brief Return the cursor's transaction handle.
@@ -4834,6 +4844,16 @@ LIBMDBX_API int mdbx_cursor_copy(const MDBX_cursor *src, MDBX_cursor *dest);
* to which data refers.
* \see mdbx_get()
*
* \note The memory pointed to by the returned values is owned by the
* database. The caller MUST not dispose of the memory, and MUST not modify it
* in any way regardless in a read-only nor read-write transactions!
* For case a database opened without the \ref MDBX_WRITEMAP modification
* attempts likely will cause a `SIGSEGV`. However, when a database opened with
* the \ref MDBX_WRITEMAP or in case values returned inside read-write
* transaction are located on a "dirty" (modified and pending to commit) pages,
* such modification will silently accepted and likely will lead to DB and/or
* data corruption.
*
* \param [in] cursor A cursor handle returned by \ref mdbx_cursor_open().
* \param [in,out] key The key for a retrieved item.
* \param [in,out] data The data of a retrieved item.
@@ -4860,6 +4880,16 @@ LIBMDBX_API int mdbx_cursor_get(MDBX_cursor *cursor, MDBX_val *key,
* array to which `pairs` refers.
* \see mdbx_cursor_get()
*
* \note The memory pointed to by the returned values is owned by the
* database. The caller MUST not dispose of the memory, and MUST not modify it
* in any way regardless in a read-only nor read-write transactions!
* For case a database opened without the \ref MDBX_WRITEMAP modification
* attempts likely will cause a `SIGSEGV`. However, when a database opened with
* the \ref MDBX_WRITEMAP or in case values returned inside read-write
* transaction are located on a "dirty" (modified and pending to commit) pages,
* such modification will silently accepted and likely will lead to DB and/or
* data corruption.
*
* \param [in] cursor A cursor handle returned by \ref mdbx_cursor_open().
* \param [out] count The number of key and value item returned, on success
* it always be the even because the key-value

View File

@@ -80,7 +80,8 @@
#if defined(__cpp_lib_filesystem) && __cpp_lib_filesystem >= 201703L
#include <filesystem>
#elif __has_include(<experimental/filesystem>)
#elif defined(__cpp_lib_string_view) && __cpp_lib_string_view >= 201606L && \
__has_include(<experimental/filesystem>)
#include <experimental/filesystem>
#endif
@@ -368,6 +369,7 @@ using string = ::std::basic_string<char, ::std::char_traits<char>, ALLOCATOR>;
using filehandle = ::mdbx_filehandle_t;
#if defined(DOXYGEN) || \
(defined(__cpp_lib_filesystem) && __cpp_lib_filesystem >= 201703L && \
defined(__cpp_lib_string_view) && __cpp_lib_string_view >= 201606L && \
(!defined(__MAC_OS_X_VERSION_MIN_REQUIRED) || \
__MAC_OS_X_VERSION_MIN_REQUIRED >= 101500) && \
(!defined(__IPHONE_OS_VERSION_MIN_REQUIRED) || \
@@ -394,6 +396,16 @@ using path = ::std::wstring;
using path = ::std::string;
#endif /* mdbx::path */
#if defined(__SIZEOF_INT128__) || \
(defined(_INTEGRAL_MAX_BITS) && _INTEGRAL_MAX_BITS >= 128)
#ifndef MDBX_U128_TYPE
#define MDBX_U128_TYPE __uint128_t
#endif /* MDBX_U128_TYPE */
#ifndef MDBX_I128_TYPE
#define MDBX_I128_TYPE __int128_t
#endif /* MDBX_I128_TYPE */
#endif /* __SIZEOF_INT128__ || _INTEGRAL_MAX_BITS >= 128 */
#if __cplusplus >= 201103L || defined(DOXYGEN)
/// \brief Duration in 1/65536 units of second.
using duration = ::std::chrono::duration<unsigned, ::std::ratio<1, 65536>>;
@@ -552,6 +564,7 @@ MDBX_DECLARE_EXCEPTION(transaction_overlapping);
[[noreturn]] LIBMDBX_API void throw_max_length_exceeded();
[[noreturn]] LIBMDBX_API void throw_out_range();
[[noreturn]] LIBMDBX_API void throw_allocators_mismatch();
[[noreturn]] LIBMDBX_API void throw_bad_value_size();
static MDBX_CXX14_CONSTEXPR size_t check_length(size_t bytes);
static MDBX_CXX14_CONSTEXPR size_t check_length(size_t headroom,
size_t payload);
@@ -1029,6 +1042,35 @@ struct LIBMDBX_API_TYPE slice : public ::MDBX_val {
return slice(size_t(-1));
}
template <typename POD> MDBX_CXX14_CONSTEXPR POD as_pod() const {
static_assert(::std::is_standard_layout<POD>::value &&
!::std::is_pointer<POD>::value,
"Must be a standard layout type!");
if (MDBX_LIKELY(size() == sizeof(POD)))
MDBX_CXX20_LIKELY {
POD r;
memcpy(&r, data(), sizeof(r));
return r;
}
throw_bad_value_size();
}
#ifdef MDBX_U128_TYPE
MDBX_U128_TYPE as_uint128() const;
#endif /* MDBX_U128_TYPE */
uint64_t as_uint64() const;
uint32_t as_uint32() const;
uint16_t as_uint16() const;
uint8_t as_uint8() const;
#ifdef MDBX_I128_TYPE
MDBX_I128_TYPE as_int128() const;
#endif /* MDBX_I128_TYPE */
int64_t as_int64() const;
int32_t as_int32() const;
int16_t as_int16() const;
int8_t as_int8() const;
protected:
MDBX_CXX11_CONSTEXPR slice(size_t invalid_length) noexcept
: ::MDBX_val({nullptr, invalid_length}) {}
@@ -2292,6 +2334,10 @@ public:
return buffer(::mdbx::slice::wrap(pod), make_reference, allocator);
}
template <typename POD> MDBX_CXX14_CONSTEXPR POD as_pod() const {
return slice_.as_pod<POD>();
}
/// \brief Reserves storage space.
void reserve(size_t wanna_headroom, size_t wanna_tailroom) {
wanna_headroom = ::std::min(::std::max(headroom(), wanna_headroom),
@@ -3000,7 +3046,11 @@ public:
//----------------------------------------------------------------------------
/// Database geometry for size management.
/// \brief Database geometry for size management.
/// \see env_managed::create_parameters
/// \see env_managed::env_managed(const ::std::string &pathname, const
/// create_parameters &, const operate_parameters &, bool accede)
struct LIBMDBX_API_TYPE geometry {
enum : int64_t {
default_value = -1, ///< Means "keep current or use default"
@@ -3659,6 +3709,8 @@ public:
bool accede = true);
/// \brief Additional parameters for creating a new database.
/// \see env_managed(const ::std::string &pathname, const create_parameters &,
/// const operate_parameters &, bool accede)
struct create_parameters {
env::geometry geometry;
mdbx_mode_t file_mode_bits{0640};
@@ -3969,10 +4021,20 @@ public:
size_t values_count, put_mode mode,
bool allow_partial = false);
template <typename VALUE>
size_t put_multiple(map_handle map, const slice &key,
const VALUE *values_array, size_t values_count,
put_mode mode, bool allow_partial = false) {
static_assert(::std::is_standard_layout<VALUE>::value &&
!::std::is_pointer<VALUE>::value &&
!::std::is_array<VALUE>::value,
"Must be a standard layout type!");
return put_multiple(map, key, sizeof(VALUE), values_array, values_count,
mode, allow_partial);
}
template <typename VALUE>
void put_multiple(map_handle map, const slice &key,
const ::std::vector<VALUE> &vector, put_mode mode) {
put_multiple(map, key, sizeof(VALUE), vector.data(), vector.size(), mode,
false);
put_multiple(map, key, vector.data(), vector.size(), mode);
}
inline ptrdiff_t estimate(map_handle map, pair from, pair to) const;
@@ -5134,7 +5196,7 @@ inline filehandle env::get_filehandle() const {
}
inline MDBX_env_flags_t env::get_flags() const {
unsigned bits;
unsigned bits = 0;
error::success_or_throw(::mdbx_env_get_flags(handle_, &bits));
return MDBX_env_flags_t(bits);
}

View File

@@ -34,7 +34,7 @@
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>. */
#define MDBX_BUILD_SOURCERY a0e7c54f688eecaf45ddd7493b737f88a97e4e8b0fdaa55c9d3b00d69e0c8548_v0_12_6_0_gc019631a
#define MDBX_BUILD_SOURCERY 30c8f70db1f021dc2bfb201ba04efdcc34fc7495127f517f9624f18c0100b8ab_v0_12_8_0_g02c7cf2a
#ifdef MDBX_CONFIG_H
#include MDBX_CONFIG_H
#endif
@@ -3756,6 +3756,7 @@ struct MDBX_env {
int me_valgrind_handle;
#endif
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
MDBX_atomic_uint32_t me_ignore_EDEADLK;
pgno_t me_poison_edge;
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */

View File

@@ -34,7 +34,7 @@
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>. */
#define MDBX_BUILD_SOURCERY a0e7c54f688eecaf45ddd7493b737f88a97e4e8b0fdaa55c9d3b00d69e0c8548_v0_12_6_0_gc019631a
#define MDBX_BUILD_SOURCERY 30c8f70db1f021dc2bfb201ba04efdcc34fc7495127f517f9624f18c0100b8ab_v0_12_8_0_g02c7cf2a
#ifdef MDBX_CONFIG_H
#include MDBX_CONFIG_H
#endif
@@ -3756,6 +3756,7 @@ struct MDBX_env {
int me_valgrind_handle;
#endif
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
MDBX_atomic_uint32_t me_ignore_EDEADLK;
pgno_t me_poison_edge;
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */

View File

@@ -36,7 +36,7 @@
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>. */
#define MDBX_BUILD_SOURCERY a0e7c54f688eecaf45ddd7493b737f88a97e4e8b0fdaa55c9d3b00d69e0c8548_v0_12_6_0_gc019631a
#define MDBX_BUILD_SOURCERY 30c8f70db1f021dc2bfb201ba04efdcc34fc7495127f517f9624f18c0100b8ab_v0_12_8_0_g02c7cf2a
#ifdef MDBX_CONFIG_H
#include MDBX_CONFIG_H
#endif
@@ -3758,6 +3758,7 @@ struct MDBX_env {
int me_valgrind_handle;
#endif
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
MDBX_atomic_uint32_t me_ignore_EDEADLK;
pgno_t me_poison_edge;
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */

View File

@@ -34,7 +34,7 @@
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>. */
#define MDBX_BUILD_SOURCERY a0e7c54f688eecaf45ddd7493b737f88a97e4e8b0fdaa55c9d3b00d69e0c8548_v0_12_6_0_gc019631a
#define MDBX_BUILD_SOURCERY 30c8f70db1f021dc2bfb201ba04efdcc34fc7495127f517f9624f18c0100b8ab_v0_12_8_0_g02c7cf2a
#ifdef MDBX_CONFIG_H
#include MDBX_CONFIG_H
#endif
@@ -3756,6 +3756,7 @@ struct MDBX_env {
int me_valgrind_handle;
#endif
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
MDBX_atomic_uint32_t me_ignore_EDEADLK;
pgno_t me_poison_edge;
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */

View File

@@ -34,7 +34,7 @@
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>. */
#define MDBX_BUILD_SOURCERY a0e7c54f688eecaf45ddd7493b737f88a97e4e8b0fdaa55c9d3b00d69e0c8548_v0_12_6_0_gc019631a
#define MDBX_BUILD_SOURCERY 30c8f70db1f021dc2bfb201ba04efdcc34fc7495127f517f9624f18c0100b8ab_v0_12_8_0_g02c7cf2a
#ifdef MDBX_CONFIG_H
#include MDBX_CONFIG_H
#endif
@@ -3756,6 +3756,7 @@ struct MDBX_env {
int me_valgrind_handle;
#endif
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
MDBX_atomic_uint32_t me_ignore_EDEADLK;
pgno_t me_poison_edge;
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */

View File

@@ -34,7 +34,7 @@
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>. */
#define MDBX_BUILD_SOURCERY a0e7c54f688eecaf45ddd7493b737f88a97e4e8b0fdaa55c9d3b00d69e0c8548_v0_12_6_0_gc019631a
#define MDBX_BUILD_SOURCERY 30c8f70db1f021dc2bfb201ba04efdcc34fc7495127f517f9624f18c0100b8ab_v0_12_8_0_g02c7cf2a
#ifdef MDBX_CONFIG_H
#include MDBX_CONFIG_H
#endif
@@ -3756,6 +3756,7 @@ struct MDBX_env {
int me_valgrind_handle;
#endif
#if defined(MDBX_USE_VALGRIND) || defined(__SANITIZE_ADDRESS__)
MDBX_atomic_uint32_t me_ignore_EDEADLK;
pgno_t me_poison_edge;
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */

View File

@@ -33,4 +33,14 @@ impl<T: Clone> EventListeners<T> {
pub fn push_listener(&mut self, listener: mpsc::UnboundedSender<T>) {
self.listeners.push(listener);
}
/// Returns the number of registered listeners.
pub fn len(&self) -> usize {
self.listeners.len()
}
/// Returns true if there are no registered listeners.
pub fn is_empty(&self) -> bool {
self.listeners.is_empty()
}
}

View File

@@ -0,0 +1,16 @@
[package]
name = "beacon-api-sse"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true
[dependencies]
reth.workspace = true
eyre.workspace = true
clap.workspace = true
tracing.workspace = true
futures-util.workspace = true
tokio = { workspace = true, features = ["time"] }
mev-share-sse = "0.1.5"

View File

@@ -0,0 +1,113 @@
//! Example of how to subscribe to beacon chain events via SSE.
//!
//! See also [ethereum-beacon-API eventstream](https://ethereum.github.io/beacon-APIs/#/Events/eventstream)
//!
//! Run with
//!
//! ```not_rust
//! cargo run -p beacon-api-sse -- node
//! ```
//!
//! This launches a regular reth instance and subscribes to payload attributes event stream.
//!
//! **NOTE**: This expects that the CL client is running an http server on `localhost:5052` and is
//! configured to emit payload attributes events.
//!
//! See lighthouse beacon Node API: <https://lighthouse-book.sigmaprime.io/api-bn.html#beacon-node-api>
use clap::Parser;
use futures_util::stream::StreamExt;
use mev_share_sse::{client::EventStream, EventClient};
use reth::{
cli::{
components::RethNodeComponents,
ext::{RethCliExt, RethNodeCommandConfig},
Cli,
},
rpc::types::engine::beacon_api::events::PayloadAttributesEvent,
tasks::TaskSpawner,
};
use std::net::{IpAddr, Ipv4Addr};
use tracing::{info, warn};
fn main() {
Cli::<BeaconEventsExt>::parse().run().unwrap();
}
/// The type that tells the reth CLI what extensions to use
#[derive(Debug, Default)]
#[non_exhaustive]
struct BeaconEventsExt;
impl RethCliExt for BeaconEventsExt {
/// This tells the reth CLI to install additional CLI arguments
type Node = BeaconEventsConfig;
}
/// Our custom cli args extension that adds one flag to reth default CLI.
#[derive(Debug, Clone, clap::Parser)]
struct BeaconEventsConfig {
/// Beacon Node http server address
#[arg(long = "cl.addr", default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))]
pub cl_addr: IpAddr,
/// Beacon Node http server port to listen on
#[arg(long = "cl.port", default_value_t = 5052)]
pub cl_port: u16,
}
impl BeaconEventsConfig {
/// Returns the http url of the beacon node
pub fn http_base_url(&self) -> String {
format!("http://{}:{}", self.cl_addr, self.cl_port)
}
/// Returns the URL to the events endpoint
pub fn events_url(&self) -> String {
format!("{}/eth/v1/events", self.http_base_url())
}
/// Service that subscribes to beacon chain payload attributes events
async fn run(self) {
let client = EventClient::default();
let mut subscription = self.new_payload_attributes_subscription(&client).await;
while let Some(event) = subscription.next().await {
info!("Received payload attributes: {:?}", event);
}
}
// It can take a bit until the CL endpoint is live so we retry a few times
async fn new_payload_attributes_subscription(
&self,
client: &EventClient,
) -> EventStream<PayloadAttributesEvent> {
let payloads_url = format!("{}?topics=payload_attributes", self.events_url());
loop {
match client.subscribe(&payloads_url).await {
Ok(subscription) => return subscription,
Err(err) => {
warn!("Failed to subscribe to payload attributes events: {:?}\nRetrying in 5 seconds...", err);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}
}
impl RethNodeCommandConfig for BeaconEventsConfig {
fn on_node_started<Reth: RethNodeComponents>(&mut self, components: &Reth) -> eyre::Result<()> {
components.task_executor().spawn(Box::pin(self.clone().run()));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_config() {
let args = BeaconEventsConfig::try_parse_from(["reth"]);
assert!(args.is_ok());
}
}