This commit is contained in:
Alexey Shekhirin
2025-12-15 01:45:09 +00:00
parent 3c9ad31344
commit 6507c93578
10 changed files with 1758 additions and 3 deletions

471
Cargo.lock generated
View File

@@ -1375,7 +1375,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"event-listener 2.5.3",
"futures-core",
]
@@ -1450,6 +1450,15 @@ dependencies = [
"rustc_version 0.4.1",
]
[[package]]
name = "atoi"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528"
dependencies = [
"num-traits",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@@ -2608,6 +2617,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@@ -2911,6 +2929,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb"
dependencies = [
"const-oid",
"pem-rfc7468",
"zeroize",
]
@@ -3101,7 +3120,7 @@ dependencies = [
"enr",
"fnv",
"futures",
"hashlink",
"hashlink 0.9.1",
"hex",
"hkdf",
"lazy_static",
@@ -3157,6 +3176,12 @@ dependencies = [
"litrs",
]
[[package]]
name = "dotenvy"
version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
[[package]]
name = "dunce"
version = "1.0.5"
@@ -3414,6 +3439,17 @@ dependencies = [
"version_check",
]
[[package]]
name = "etcetera"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943"
dependencies = [
"cfg-if",
"home",
"windows-sys 0.48.0",
]
[[package]]
name = "ethereum_hashing"
version = "0.7.0"
@@ -3471,6 +3507,17 @@ version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "event-listener"
version = "5.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "example-beacon-api-sidecar-fetcher"
version = "0.1.0"
@@ -4023,6 +4070,17 @@ dependencies = [
"rustc_version 0.2.3",
]
[[package]]
name = "flume"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
dependencies = [
"futures-core",
"futures-sink",
"spin 0.9.8",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -4090,7 +4148,7 @@ dependencies = [
"diatomic-waker",
"futures-core",
"pin-project-lite",
"spin",
"spin 0.10.0",
]
[[package]]
@@ -4135,6 +4193,17 @@ dependencies = [
"futures-util",
]
[[package]]
name = "futures-intrusive"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f"
dependencies = [
"futures-core",
"lock_api",
"parking_lot",
]
[[package]]
name = "futures-io"
version = "0.3.31"
@@ -4475,6 +4544,15 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "hashlink"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
dependencies = [
"hashbrown 0.15.5",
]
[[package]]
name = "hdrhistogram"
version = "7.5.4"
@@ -4578,6 +4656,15 @@ dependencies = [
"digest 0.10.7",
]
[[package]]
name = "home"
version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "http"
version = "1.4.0"
@@ -5089,6 +5176,15 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "ipnetwork"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e"
dependencies = [
"serde",
]
[[package]]
name = "iri-string"
version = "0.7.9"
@@ -5437,6 +5533,9 @@ name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
dependencies = [
"spin 0.9.8",
]
[[package]]
name = "libc"
@@ -5529,6 +5628,17 @@ dependencies = [
"zstd-sys",
]
[[package]]
name = "libsqlite3-sys"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "libz-sys"
version = "1.1.23"
@@ -5699,6 +5809,28 @@ dependencies = [
"regex-automata",
]
[[package]]
name = "maxminddb"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6087e5d8ea14861bb7c7f573afbc7be3798d3ef0fae87ec4fd9a4de9a127c3c"
dependencies = [
"ipnetwork",
"log",
"memchr",
"serde",
]
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest 0.10.7",
]
[[package]]
name = "memchr"
version = "2.7.6"
@@ -5752,11 +5884,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd7399781913e5393588a8d8c6a2867bf85fb38eaf2502fdce465aad2dc6f034"
dependencies = [
"base64 0.22.1",
"http-body-util",
"hyper",
"hyper-util",
"indexmap 2.12.1",
"ipnet",
"metrics",
"metrics-util",
"quanta",
"thiserror 1.0.69",
"tokio",
"tracing",
]
[[package]]
@@ -6060,6 +6198,22 @@ dependencies = [
"serde",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e661dda6640fad38e827a6d4a310ff4763082116fe217f279885c97f511bb0b7"
dependencies = [
"lazy_static",
"libm",
"num-integer",
"num-iter",
"num-traits",
"rand 0.8.5",
"smallvec",
"zeroize",
]
[[package]]
name = "num-complex"
version = "0.4.6"
@@ -6575,6 +6729,15 @@ dependencies = [
"serde_core",
]
[[package]]
name = "pem-rfc7468"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412"
dependencies = [
"base64ct",
]
[[package]]
name = "percent-encoding"
version = "2.3.2"
@@ -6676,6 +6839,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs1"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f"
dependencies = [
"der",
"pkcs8",
"spki",
]
[[package]]
name = "pkcs8"
version = "0.10.2"
@@ -10518,6 +10692,32 @@ dependencies = [
"strum 0.27.2",
]
[[package]]
name = "reth-scraper"
version = "1.9.3"
dependencies = [
"alloy-primitives",
"clap",
"eyre",
"futures",
"humantime",
"maxminddb",
"metrics",
"metrics-exporter-prometheus",
"reth-chainspec",
"reth-discv4",
"reth-ecies",
"reth-ethereum",
"reth-network-peers",
"reth-tracing",
"secp256k1 0.30.0",
"serde",
"serde_json",
"sqlx",
"tokio",
"tracing",
]
[[package]]
name = "reth-stages"
version = "1.9.3"
@@ -11391,6 +11591,26 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746"
[[package]]
name = "rsa"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40a0376c50d0358279d9d643e4bf7b7be212f1f4ff1da9070a7b54d22ef75c88"
dependencies = [
"const-oid",
"digest 0.10.7",
"num-bigint-dig",
"num-integer",
"num-traits",
"pkcs1",
"pkcs8",
"rand_core 0.6.4",
"signature",
"spki",
"subtle",
"zeroize",
]
[[package]]
name = "rstest"
version = "0.24.0"
@@ -12230,6 +12450,15 @@ dependencies = [
"sha1",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "spin"
version = "0.10.0"
@@ -12246,6 +12475,194 @@ dependencies = [
"der",
]
[[package]]
name = "sqlx"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc"
dependencies = [
"sqlx-core",
"sqlx-macros",
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
]
[[package]]
name = "sqlx-core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6"
dependencies = [
"base64 0.22.1",
"bytes",
"crc",
"crossbeam-queue",
"either",
"event-listener 5.4.1",
"futures-core",
"futures-intrusive",
"futures-io",
"futures-util",
"hashbrown 0.15.5",
"hashlink 0.10.0",
"indexmap 2.12.1",
"log",
"memchr",
"once_cell",
"percent-encoding",
"serde",
"serde_json",
"sha2",
"smallvec",
"thiserror 2.0.17",
"tokio",
"tokio-stream",
"tracing",
"url",
]
[[package]]
name = "sqlx-macros"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d"
dependencies = [
"proc-macro2",
"quote",
"sqlx-core",
"sqlx-macros-core",
"syn 2.0.111",
]
[[package]]
name = "sqlx-macros-core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b"
dependencies = [
"dotenvy",
"either",
"heck",
"hex",
"once_cell",
"proc-macro2",
"quote",
"serde",
"serde_json",
"sha2",
"sqlx-core",
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
"syn 2.0.111",
"tokio",
"url",
]
[[package]]
name = "sqlx-mysql"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526"
dependencies = [
"atoi",
"base64 0.22.1",
"bitflags 2.10.0",
"byteorder",
"bytes",
"crc",
"digest 0.10.7",
"dotenvy",
"either",
"futures-channel",
"futures-core",
"futures-io",
"futures-util",
"generic-array",
"hex",
"hkdf",
"hmac",
"itoa",
"log",
"md-5",
"memchr",
"once_cell",
"percent-encoding",
"rand 0.8.5",
"rsa",
"serde",
"sha1",
"sha2",
"smallvec",
"sqlx-core",
"stringprep",
"thiserror 2.0.17",
"tracing",
"whoami",
]
[[package]]
name = "sqlx-postgres"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46"
dependencies = [
"atoi",
"base64 0.22.1",
"bitflags 2.10.0",
"byteorder",
"crc",
"dotenvy",
"etcetera",
"futures-channel",
"futures-core",
"futures-util",
"hex",
"hkdf",
"hmac",
"home",
"itoa",
"log",
"md-5",
"memchr",
"once_cell",
"rand 0.8.5",
"serde",
"serde_json",
"sha2",
"smallvec",
"sqlx-core",
"stringprep",
"thiserror 2.0.17",
"tracing",
"whoami",
]
[[package]]
name = "sqlx-sqlite"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
dependencies = [
"atoi",
"flume",
"futures-channel",
"futures-core",
"futures-executor",
"futures-intrusive",
"futures-util",
"libsqlite3-sys",
"log",
"percent-encoding",
"serde",
"serde_urlencoded",
"sqlx-core",
"thiserror 2.0.17",
"tracing",
"url",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.1"
@@ -12258,6 +12675,17 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stringprep"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
"unicode-properties",
]
[[package]]
name = "strsim"
version = "0.11.1"
@@ -13229,12 +13657,33 @@ version = "2.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
[[package]]
name = "unicode-bidi"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5"
[[package]]
name = "unicode-ident"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
[[package]]
name = "unicode-normalization"
version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fd4f6878c9cb28d874b009da9e8d183b5abc80117c40bbd187a1fde336be6e8"
dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-properties"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d"
[[package]]
name = "unicode-segmentation"
version = "1.12.0"
@@ -13476,6 +13925,12 @@ dependencies = [
"wit-bindgen",
]
[[package]]
name = "wasite"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasm-bindgen"
version = "0.2.106"
@@ -13617,6 +14072,16 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "whoami"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d"
dependencies = [
"libredox",
"wasite",
]
[[package]]
name = "widestring"
version = "1.2.1"

View File

@@ -11,6 +11,7 @@ exclude = [".github/"]
members = [
"bin/reth-bench/",
"bin/reth-bench-compare/",
"bin/reth-scraper/",
"bin/reth/",
"crates/storage/rpc-provider/",
"crates/chain-state/",

View File

@@ -0,0 +1,56 @@
[package]
name = "reth-scraper"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "P2P network scraper that discovers Ethereum nodes and collects their metadata"
default-run = "reth-scraper"
[lints]
workspace = true
[dependencies]
# reth
reth-chainspec.workspace = true
reth-discv4.workspace = true
reth-ethereum = { workspace = true, features = ["network"] }
reth-ecies.workspace = true
reth-network-peers.workspace = true
reth-tracing.workspace = true
# alloy
alloy-primitives.workspace = true
# crypto
secp256k1 = { workspace = true, features = ["global-context", "std", "recovery"] }
# async
futures.workspace = true
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
# cli
clap = { workspace = true, features = ["derive", "env"] }
eyre.workspace = true
humantime.workspace = true
tracing.workspace = true
# metrics
metrics.workspace = true
metrics-exporter-prometheus = { workspace = true, features = ["http-listener"] }
# serde
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
# storage
sqlx = { version = "0.8", features = ["sqlite", "runtime-tokio"] }
# geoip
maxminddb = "0.24"
[[bin]]
name = "reth-scraper"
path = "src/main.rs"

View File

@@ -0,0 +1,224 @@
use crate::{geoip::GeoIpResolver, metrics, types::NodeInfo};
use futures::StreamExt;
use reth_chainspec::{ChainSpec, EthChainSpec};
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4ConfigBuilder, DEFAULT_DISCOVERY_ADDRESS};
use reth_ecies::stream::ECIESStream;
use reth_ethereum::{
chainspec::{Chain, EthereumHardfork, Head},
network::{
config::rng_secret_key,
eth_wire::{HelloMessage, P2PStream, UnauthedEthStream, UnauthedP2PStream, UnifiedStatus},
EthNetworkPrimitives,
},
};
use reth_network_peers::{pk2id, NodeRecord};
use secp256k1::{SecretKey, SECP256K1};
use std::{
collections::HashSet,
net::IpAddr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::{net::TcpStream, sync::Semaphore};
type AuthedP2PStream = P2PStream<ECIESStream<TcpStream>>;
pub struct CrawlerConfig {
pub chainspec: Arc<ChainSpec>,
pub bootnodes: Vec<NodeRecord>,
pub workers: usize,
}
pub struct Crawler {
config: CrawlerConfig,
geoip: Arc<GeoIpResolver>,
key: SecretKey,
}
impl Crawler {
pub fn new(config: CrawlerConfig, geoip: GeoIpResolver) -> Self {
Self { config, geoip: Arc::new(geoip), key: rng_secret_key() }
}
pub const fn secret_key(&self) -> SecretKey {
self.key
}
pub async fn run(
&self,
tx: tokio::sync::mpsc::Sender<NodeInfo>,
stop_rx: tokio::sync::watch::Receiver<bool>,
) -> eyre::Result<()> {
let our_enr = NodeRecord::from_secret_key(DEFAULT_DISCOVERY_ADDRESS, &self.key);
let bootnodes = if self.config.bootnodes.is_empty() {
self.config.chainspec.bootnodes().unwrap_or_default()
} else {
self.config.bootnodes.clone()
};
let mut discv4_cfg = Discv4ConfigBuilder::default();
discv4_cfg.add_boot_nodes(bootnodes.clone()).lookup_interval(Duration::from_secs(1));
let discv4 = Discv4::spawn(our_enr.udp_addr(), our_enr, self.key, discv4_cfg.build()).await?;
let mut discv4_stream = discv4.update_stream().await?;
let semaphore = Arc::new(Semaphore::new(self.config.workers));
let active_workers = Arc::new(AtomicU64::new(0));
let seen_nodes: Arc<tokio::sync::Mutex<HashSet<alloy_primitives::B512>>> =
Arc::new(tokio::sync::Mutex::new(HashSet::new()));
let bootnodes_set: HashSet<_> = bootnodes.into_iter().collect();
loop {
tokio::select! {
update = discv4_stream.next() => {
let Some(update) = update else { break };
if let DiscoveryUpdate::Added(peer) = update {
if bootnodes_set.contains(&peer) {
continue;
}
{
let mut seen = seen_nodes.lock().await;
if seen.contains(&peer.id) {
continue;
}
seen.insert(peer.id);
}
metrics::inc_discovered();
let permit = semaphore.clone().acquire_owned().await?;
let tx = tx.clone();
let key = self.key;
let geoip = self.geoip.clone();
let chainspec = self.config.chainspec.clone();
let active = active_workers.clone();
active.fetch_add(1, Ordering::Relaxed);
metrics::set_active_workers(active.load(Ordering::Relaxed));
tokio::spawn(async move {
match handshake_peer(peer, key, &chainspec, geoip.as_ref()).await {
Ok(info) => {
metrics::inc_handshake_success();
tracing::info!(
ip = %info.ip,
client = %info.client_version,
country = info.country_code.as_deref().unwrap_or("??"),
"Discovered node"
);
let _ = tx.send(info).await;
}
Err(e) => {
metrics::inc_handshake_failed();
tracing::debug!("Failed handshake with {}: {}", peer.address, e);
}
}
active.fetch_sub(1, Ordering::Relaxed);
metrics::set_active_workers(active.load(Ordering::Relaxed));
drop(permit);
});
}
}
result = async {
let mut rx = stop_rx.clone();
rx.changed().await
} => {
if result.is_ok() && *stop_rx.borrow() {
break;
}
}
}
}
Ok(())
}
}
async fn handshake_peer(
peer: NodeRecord,
key: SecretKey,
chainspec: &ChainSpec,
geoip: &GeoIpResolver,
) -> eyre::Result<NodeInfo> {
let timeout = Duration::from_secs(10);
let (p2p_stream, hello) = tokio::time::timeout(timeout, handshake_p2p(peer, key)).await??;
let eth_status = tokio::time::timeout(timeout, handshake_eth(p2p_stream, chainspec))
.await
.ok()
.and_then(|r| r.ok());
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let ip: IpAddr = peer.address;
let country_code = geoip.lookup(ip);
let capabilities: Vec<String> =
hello.capabilities.iter().map(|c| format!("{}/{}", c.name, c.version)).collect();
let eth_version = eth_status.as_ref().map(|s| s.version as u8);
let chain_id = eth_status.as_ref().map(|s| s.chain.id());
Ok(NodeInfo {
node_id: peer.id,
enode: format!("enode://{}@{}:{}", peer.id, peer.address, peer.tcp_port),
ip,
tcp_port: peer.tcp_port,
udp_port: peer.udp_port,
client_version: hello.client_version,
capabilities,
eth_version,
chain_id,
country_code,
first_seen: now,
last_seen: now,
last_error: None,
last_checked: Some(now),
is_alive: true,
consecutive_failures: 0,
})
}
async fn handshake_p2p(
peer: NodeRecord,
key: SecretKey,
) -> eyre::Result<(AuthedP2PStream, HelloMessage)> {
let outgoing = TcpStream::connect((peer.address, peer.tcp_port)).await?;
let ecies_stream = ECIESStream::connect(outgoing, key, peer.id).await?;
let our_peer_id = pk2id(&key.public_key(SECP256K1));
let our_hello = HelloMessage::builder(our_peer_id).build();
Ok(UnauthedP2PStream::new(ecies_stream).handshake(our_hello).await?)
}
async fn handshake_eth(p2p_stream: AuthedP2PStream, chainspec: &ChainSpec) -> eyre::Result<UnifiedStatus> {
let fork_filter = chainspec.fork_filter(Head {
timestamp: chainspec.fork(EthereumHardfork::Shanghai).as_timestamp().unwrap_or(0),
..Default::default()
});
let unified_status = UnifiedStatus::builder()
.chain(Chain::from_id(chainspec.chain_id()))
.genesis(chainspec.genesis_hash())
.forkid(chainspec.hardfork_fork_id(EthereumHardfork::Shanghai).unwrap())
.build();
let eth_version = p2p_stream
.shared_capabilities()
.eth_version()
.map_err(|_| eyre::eyre!("No ETH capability"))?;
let status = UnifiedStatus { version: eth_version, ..unified_status };
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
let (_, their_status) =
eth_unauthed.handshake::<EthNetworkPrimitives>(status, fork_filter).await?;
Ok(their_status)
}

View File

@@ -0,0 +1,40 @@
use maxminddb::Reader;
use std::{net::IpAddr, sync::Arc};
#[derive(Clone)]
pub struct GeoIpResolver {
reader: Option<Arc<Reader<Vec<u8>>>>,
}
impl GeoIpResolver {
pub fn new(db_path: Option<&str>) -> eyre::Result<Self> {
let paths = if let Some(path) = db_path {
vec![path.to_string()]
} else {
vec![
"./GeoLite2-Country.mmdb".to_string(),
"/usr/share/GeoIP/GeoLite2-Country.mmdb".to_string(),
format!(
"{}/.local/share/GeoIP/GeoLite2-Country.mmdb",
std::env::var("HOME").unwrap_or_default()
),
]
};
for path in paths {
if let Ok(reader) = Reader::open_readfile(&path) {
tracing::info!("Loaded GeoIP database from {}", path);
return Ok(Self { reader: Some(Arc::new(reader)) });
}
}
tracing::warn!("No GeoIP database found, country lookups will be disabled");
Ok(Self { reader: None })
}
pub fn lookup(&self, ip: IpAddr) -> Option<String> {
let reader = self.reader.as_ref()?;
let result: maxminddb::geoip2::Country<'_> = reader.lookup(ip).ok()?;
result.country?.iso_code.map(|s| s.to_string())
}
}

View File

@@ -0,0 +1,378 @@
//! P2P network scraper that discovers Ethereum nodes and collects their metadata.
#![warn(unused_crate_dependencies)]
#![allow(unreachable_pub)]
mod crawler;
mod geoip;
mod metrics;
mod rechecker;
mod storage;
mod types;
use clap::{Parser, Subcommand};
use crawler::{Crawler, CrawlerConfig};
use geoip::GeoIpResolver;
use metrics_exporter_prometheus::PrometheusBuilder;
use rechecker::{RecheckConfig, Rechecker};
use reth_chainspec::ChainSpec;
use reth_ethereum::chainspec::{HOLESKY, MAINNET, SEPOLIA};
use reth_network_peers::NodeRecord;
use reth_tracing::{tracing::info, RethTracer, Tracer};
use std::{sync::Arc, time::Duration};
use storage::Storage;
#[derive(Parser)]
#[command(name = "reth-scraper")]
#[command(about = "P2P network scraper that discovers Ethereum nodes and collects their metadata")]
struct Cli {
#[arg(long, default_value = "nodes.db")]
db: String,
/// Chain to crawl: mainnet, sepolia, holesky
#[arg(long, default_value = "mainnet")]
chain: String,
/// Custom bootnodes (comma-separated enode URLs)
#[arg(long, value_delimiter = ',')]
bootnodes: Option<Vec<String>>,
#[arg(long, default_value = "50")]
workers: usize,
#[arg(long, default_value = "9001")]
metrics_port: u16,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// One-shot crawl of the network
Crawl,
/// Continuous crawling mode with discovery and rechecker
Run {
/// How long to run discovery before starting next cycle
#[arg(long, default_value = "5m")]
interval: humantime::Duration,
/// How often to recheck existing nodes for liveness
#[arg(long, default_value = "30s")]
recheck_interval: humantime::Duration,
/// Consider nodes stale if not checked within this duration
#[arg(long, default_value = "1h")]
recheck_max_age: humantime::Duration,
/// Number of nodes to recheck per batch
#[arg(long, default_value = "100")]
recheck_batch_size: u32,
/// Mark node as dead after this many consecutive failures
#[arg(long, default_value = "3")]
max_failures: u32,
},
/// Show statistics from the database
Stats {
#[arg(long)]
by_version: bool,
#[arg(long)]
by_country: bool,
},
/// Export nodes to JSON or CSV
Export {
#[arg(long, default_value = "json")]
format: String,
},
}
fn parse_chainspec(chain: &str) -> eyre::Result<Arc<ChainSpec>> {
match chain.to_lowercase().as_str() {
"mainnet" | "1" => Ok(MAINNET.clone()),
"sepolia" | "11155111" => Ok(SEPOLIA.clone()),
"holesky" | "17000" => Ok(HOLESKY.clone()),
_ => Err(eyre::eyre!(
"Unknown chain: {}. Supported: mainnet, sepolia, holesky",
chain
)),
}
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
let _ = RethTracer::new().init()?;
let cli = Cli::parse();
let storage = Storage::new(&cli.db).await?;
storage.init_schema().await?;
let chainspec = parse_chainspec(&cli.chain)?;
let bootnodes: Vec<NodeRecord> = cli
.bootnodes
.unwrap_or_default()
.iter()
.filter_map(|s| s.parse().ok())
.collect();
match cli.command {
Commands::Crawl => {
start_metrics_server(cli.metrics_port)?;
run_crawl(&storage, chainspec, bootnodes, cli.workers, None).await?;
}
Commands::Run {
interval,
recheck_interval,
recheck_max_age,
recheck_batch_size,
max_failures,
} => {
start_metrics_server(cli.metrics_port)?;
run_continuous(
&storage,
chainspec,
bootnodes,
cli.workers,
interval.into(),
recheck_interval.into(),
recheck_max_age.into(),
recheck_batch_size,
max_failures,
)
.await?;
}
Commands::Stats { by_version, by_country } => {
let total = storage.count_nodes().await?;
let alive = storage.count_alive_nodes().await?;
println!("Total nodes: {} ({} alive)", total, alive);
if by_version {
println!("\nBy client version (alive only):");
for (version, cnt) in storage.get_stats_by_version().await? {
println!(" {}: {}", version, cnt);
}
}
if by_country {
println!("\nBy country (alive only):");
for (country, cnt) in storage.get_stats_by_country().await? {
println!(" {}: {}", country, cnt);
}
}
}
Commands::Export { format } => {
let nodes = storage.get_all_nodes().await?;
match format.as_str() {
"json" => {
println!("{}", serde_json::to_string_pretty(&nodes)?);
}
"csv" => {
println!("node_id,enode,ip,tcp_port,udp_port,client_version,capabilities,eth_version,chain_id,country_code,first_seen,last_seen,is_alive");
for node in nodes {
println!(
"{:?},{},{},{},{},{},{},{},{},{},{},{},{}",
node.node_id,
node.enode,
node.ip,
node.tcp_port,
node.udp_port,
node.client_version.replace(',', ";"),
node.capabilities_string(),
node.eth_version.map(|v| v.to_string()).unwrap_or_default(),
node.chain_id.map(|v| v.to_string()).unwrap_or_default(),
node.country_code.unwrap_or_default(),
node.first_seen,
node.last_seen,
node.is_alive
);
}
}
_ => {
return Err(eyre::eyre!("Unknown format: {}. Use 'json' or 'csv'", format));
}
}
}
}
Ok(())
}
fn start_metrics_server(port: u16) -> eyre::Result<()> {
metrics::describe_metrics();
PrometheusBuilder::new().with_http_listener(([0, 0, 0, 0], port)).install()?;
info!("Metrics server started on port {}", port);
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn run_continuous(
storage: &Storage,
chainspec: Arc<ChainSpec>,
bootnodes: Vec<NodeRecord>,
workers: usize,
crawl_interval: Duration,
recheck_interval: Duration,
recheck_max_age: Duration,
recheck_batch_size: u32,
max_failures: u32,
) -> eyre::Result<()> {
let geoip = GeoIpResolver::new(None)?;
let crawler_config = CrawlerConfig {
chainspec: chainspec.clone(),
bootnodes,
workers,
};
let crawler = Crawler::new(crawler_config, geoip.clone());
let recheck_config = RecheckConfig {
chainspec,
batch_size: recheck_batch_size,
max_age_secs: recheck_max_age.as_secs(),
check_interval: recheck_interval,
max_failures,
workers: workers / 2, // Use half the workers for rechecking
};
let rechecker = Rechecker::new(recheck_config, storage.clone(), geoip, crawler.secret_key());
let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
let (stop_tx, stop_rx) = tokio::sync::watch::channel(false);
// Storage handler task
let storage_handle = {
let storage = storage.clone();
tokio::spawn(async move {
let mut count = 0u64;
while let Some(node) = rx.recv().await {
if let Err(e) = storage.upsert_node(&node).await {
tracing::error!("Failed to store node: {}", e);
} else {
count += 1;
if count.is_multiple_of(100) && storage.count_nodes().await.is_ok_and(|total| {
metrics::set_unique_nodes(total as u64);
true
}) {}
}
}
count
})
};
// Crawler task - discovers new nodes
let crawler_stop_rx = stop_rx.clone();
let crawler_handle = tokio::spawn(async move { crawler.run(tx, crawler_stop_rx).await });
// Rechecker task - verifies existing nodes are still alive
let rechecker_stop_rx = stop_rx.clone();
let rechecker_handle = tokio::spawn(async move { rechecker.run(rechecker_stop_rx).await });
// Periodic status logging
let status_storage = storage.clone();
let status_stop_rx = stop_rx.clone();
let status_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(crawl_interval);
loop {
tokio::select! {
_ = interval.tick() => {
if let (Ok(total), Ok(alive)) = (
status_storage.count_nodes().await,
status_storage.count_alive_nodes().await,
) {
info!("Status: {} total nodes, {} alive", total, alive);
metrics::set_unique_nodes(total as u64);
metrics::set_alive_nodes(alive as u64);
}
}
_ = async {
let mut rx = status_stop_rx.clone();
let _ = rx.changed().await;
} => {
if *status_stop_rx.borrow() {
break;
}
}
}
}
});
// Wait for Ctrl+C
tokio::signal::ctrl_c().await?;
info!("Received Ctrl+C, shutting down...");
let _ = stop_tx.send(true);
// Wait for all tasks to finish
let _ = crawler_handle.await;
let _ = rechecker_handle.await;
let _ = status_handle.await;
drop(stop_tx);
let count = storage_handle.await?;
let total = storage.count_nodes().await?;
let alive = storage.count_alive_nodes().await?;
info!(
"Shutdown complete. Discovered {} new nodes, {} total ({} alive)",
count, total, alive
);
Ok(())
}
async fn run_crawl(
storage: &Storage,
chainspec: Arc<ChainSpec>,
bootnodes: Vec<NodeRecord>,
workers: usize,
duration: Option<Duration>,
) -> eyre::Result<()> {
let geoip = GeoIpResolver::new(None)?;
let config = CrawlerConfig { chainspec, bootnodes, workers };
let crawler = Crawler::new(config, geoip);
let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
let (stop_tx, stop_rx) = tokio::sync::watch::channel(false);
let storage_handle = {
let storage = storage.clone();
tokio::spawn(async move {
let mut count = 0u64;
while let Some(node) = rx.recv().await {
if let Err(e) = storage.upsert_node(&node).await {
tracing::error!("Failed to store node: {}", e);
} else {
count += 1;
if count.is_multiple_of(100) && storage.count_nodes().await.is_ok_and(|total| {
metrics::set_unique_nodes(total as u64);
info!("Stored {} nodes ({} unique)", count, total);
true
}) {}
}
}
count
})
};
let crawler_handle = tokio::spawn(async move { crawler.run(tx, stop_rx).await });
if let Some(dur) = duration {
tokio::time::sleep(dur).await;
} else {
tokio::signal::ctrl_c().await?;
info!("Received Ctrl+C, shutting down...");
}
let _ = stop_tx.send(true);
let _ = crawler_handle.await;
drop(stop_tx);
let count = storage_handle.await?;
let total = storage.count_nodes().await?;
info!("Crawl complete. Stored {} new nodes, {} total unique nodes", count, total);
Ok(())
}
impl Clone for Storage {
fn clone(&self) -> Self {
Self { pool: self.pool.clone() }
}
}

View File

@@ -0,0 +1,44 @@
use metrics::{counter, describe_counter, describe_gauge, gauge};
pub fn describe_metrics() {
describe_counter!("scraper_nodes_discovered", "Total nodes discovered via discv4");
describe_counter!("scraper_handshakes_success", "Successful P2P handshakes");
describe_counter!("scraper_handshakes_failed", "Failed P2P handshakes");
describe_counter!("scraper_recheck_success", "Successful node rechecks");
describe_counter!("scraper_recheck_failed", "Failed node rechecks");
describe_gauge!("scraper_unique_nodes", "Number of unique nodes in database");
describe_gauge!("scraper_alive_nodes", "Number of alive nodes in database");
describe_gauge!("scraper_active_workers", "Number of active handshake workers");
}
pub fn inc_discovered() {
counter!("scraper_nodes_discovered").increment(1);
}
pub fn inc_handshake_success() {
counter!("scraper_handshakes_success").increment(1);
}
pub fn inc_handshake_failed() {
counter!("scraper_handshakes_failed").increment(1);
}
pub fn set_unique_nodes(n: u64) {
gauge!("scraper_unique_nodes").set(n as f64);
}
pub fn set_active_workers(n: u64) {
gauge!("scraper_active_workers").set(n as f64);
}
pub fn inc_recheck_success() {
counter!("scraper_recheck_success").increment(1);
}
pub fn inc_recheck_failed() {
counter!("scraper_recheck_failed").increment(1);
}
pub fn set_alive_nodes(n: u64) {
gauge!("scraper_alive_nodes").set(n as f64);
}

View File

@@ -0,0 +1,204 @@
use crate::{geoip::GeoIpResolver, metrics, storage::Storage};
use reth_chainspec::ChainSpec;
use reth_ecies::stream::ECIESStream;
use reth_ethereum::{
chainspec::{Chain, EthereumHardfork, Head},
network::{
eth_wire::{HelloMessage, P2PStream, UnauthedEthStream, UnauthedP2PStream, UnifiedStatus},
EthNetworkPrimitives,
},
};
use reth_network_peers::{pk2id, NodeRecord};
use secp256k1::{SecretKey, SECP256K1};
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::{net::TcpStream, sync::Semaphore};
type AuthedP2PStream = P2PStream<ECIESStream<TcpStream>>;
pub struct RecheckConfig {
pub chainspec: Arc<ChainSpec>,
pub batch_size: u32,
pub max_age_secs: u64,
pub check_interval: Duration,
pub max_failures: u32,
pub workers: usize,
}
pub struct Rechecker {
config: RecheckConfig,
storage: Storage,
geoip: Arc<GeoIpResolver>,
key: SecretKey,
}
impl Rechecker {
pub fn new(config: RecheckConfig, storage: Storage, geoip: GeoIpResolver, key: SecretKey) -> Self {
Self { config, storage, geoip: Arc::new(geoip), key }
}
pub async fn run(&self, mut stop_rx: tokio::sync::watch::Receiver<bool>) -> eyre::Result<()> {
let semaphore = Arc::new(Semaphore::new(self.config.workers));
let active_workers = Arc::new(AtomicU64::new(0));
loop {
tokio::select! {
_ = tokio::time::sleep(self.config.check_interval) => {
self.recheck_batch(&semaphore, &active_workers).await?;
}
result = stop_rx.changed() => {
if result.is_ok() && *stop_rx.borrow() {
break;
}
}
}
}
Ok(())
}
async fn recheck_batch(
&self,
semaphore: &Arc<Semaphore>,
active_workers: &Arc<AtomicU64>,
) -> eyre::Result<()> {
let stale_nodes = self
.storage
.get_stale_nodes(self.config.batch_size, self.config.max_age_secs)
.await?;
if stale_nodes.is_empty() {
return Ok(());
}
tracing::info!(count = stale_nodes.len(), "Rechecking stale nodes");
let mut handles = Vec::new();
for stale in stale_nodes {
let permit = semaphore.clone().acquire_owned().await?;
let storage = self.storage.clone();
let geoip = self.geoip.clone();
let chainspec = self.config.chainspec.clone();
let key = self.key;
let max_failures = self.config.max_failures;
let active = active_workers.clone();
let node_record = stale.to_node_record();
active.fetch_add(1, Ordering::Relaxed);
let handle = tokio::spawn(async move {
let result = recheck_peer(node_record, key, &chainspec).await;
match result {
Ok(hello) => {
let country = geoip.lookup(stale.ip);
if let Err(e) = storage
.update_node_alive(&stale.node_id, &hello.client_version, country.as_deref())
.await
{
tracing::error!("Failed to update node: {}", e);
} else {
metrics::inc_recheck_success();
tracing::debug!(
ip = %stale.ip,
client = %hello.client_version,
"Node still alive"
);
}
}
Err(e) => {
let error_msg = e.to_string();
if let Err(e) = storage
.increment_failure(&stale.node_id, &error_msg, max_failures)
.await
{
tracing::error!("Failed to update node failure: {}", e);
} else {
metrics::inc_recheck_failed();
tracing::debug!(ip = %stale.ip, error = %error_msg, "Node unreachable");
}
}
}
active.fetch_sub(1, Ordering::Relaxed);
drop(permit);
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
if let Ok(alive) = self.storage.count_alive_nodes().await {
metrics::set_alive_nodes(alive as u64);
}
Ok(())
}
}
async fn recheck_peer(
peer: NodeRecord,
key: SecretKey,
chainspec: &ChainSpec,
) -> eyre::Result<HelloMessage> {
let timeout = Duration::from_secs(10);
let (p2p_stream, hello) = tokio::time::timeout(timeout, handshake_p2p(peer, key)).await??;
// Also do ETH handshake to verify they're on the right chain
let _ = tokio::time::timeout(timeout, handshake_eth(p2p_stream, chainspec)).await;
Ok(hello)
}
async fn handshake_p2p(
peer: NodeRecord,
key: SecretKey,
) -> eyre::Result<(AuthedP2PStream, HelloMessage)> {
let outgoing = TcpStream::connect((peer.address, peer.tcp_port)).await?;
let ecies_stream = ECIESStream::connect(outgoing, key, peer.id).await?;
let our_peer_id = pk2id(&key.public_key(SECP256K1));
let our_hello = HelloMessage::builder(our_peer_id).build();
Ok(UnauthedP2PStream::new(ecies_stream).handshake(our_hello).await?)
}
async fn handshake_eth(
p2p_stream: AuthedP2PStream,
chainspec: &ChainSpec,
) -> eyre::Result<UnifiedStatus> {
use reth_chainspec::EthChainSpec;
let fork_filter = chainspec.fork_filter(Head {
timestamp: chainspec.fork(EthereumHardfork::Shanghai).as_timestamp().unwrap_or(0),
..Default::default()
});
let unified_status = UnifiedStatus::builder()
.chain(Chain::from_id(chainspec.chain_id()))
.genesis(chainspec.genesis_hash())
.forkid(chainspec.hardfork_fork_id(EthereumHardfork::Shanghai).unwrap())
.build();
let eth_version = p2p_stream
.shared_capabilities()
.eth_version()
.map_err(|_| eyre::eyre!("No ETH capability"))?;
let status = UnifiedStatus { version: eth_version, ..unified_status };
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
let (_, their_status) =
eth_unauthed.handshake::<EthNetworkPrimitives>(status, fork_filter).await?;
Ok(their_status)
}

View File

@@ -0,0 +1,315 @@
use crate::types::NodeInfo;
use alloy_primitives::B512;
use reth_network_peers::NodeRecord;
use sqlx::{sqlite::SqlitePoolOptions, SqlitePool};
use std::{
net::IpAddr,
time::{SystemTime, UNIX_EPOCH},
};
pub struct Storage {
pub(crate) pool: SqlitePool,
}
impl Storage {
pub async fn new(path: &str) -> eyre::Result<Self> {
let url = format!("sqlite:{}?mode=rwc", path);
let pool = SqlitePoolOptions::new().max_connections(5).connect(&url).await?;
Ok(Self { pool })
}
pub async fn init_schema(&self) -> eyre::Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS nodes (
node_id TEXT PRIMARY KEY,
enode TEXT NOT NULL,
ip TEXT NOT NULL,
tcp_port INTEGER NOT NULL,
udp_port INTEGER NOT NULL,
client_version TEXT NOT NULL,
capabilities TEXT NOT NULL,
eth_version INTEGER,
chain_id INTEGER,
country_code TEXT,
first_seen INTEGER NOT NULL,
last_seen INTEGER NOT NULL,
last_error TEXT,
last_checked INTEGER,
is_alive INTEGER NOT NULL DEFAULT 1,
consecutive_failures INTEGER NOT NULL DEFAULT 0
)
"#,
)
.execute(&self.pool)
.await?;
// Add columns if they don't exist (for existing databases)
let _ = sqlx::query("ALTER TABLE nodes ADD COLUMN last_checked INTEGER")
.execute(&self.pool)
.await;
let _ =
sqlx::query("ALTER TABLE nodes ADD COLUMN is_alive INTEGER NOT NULL DEFAULT 1")
.execute(&self.pool)
.await;
let _ = sqlx::query(
"ALTER TABLE nodes ADD COLUMN consecutive_failures INTEGER NOT NULL DEFAULT 0",
)
.execute(&self.pool)
.await;
Ok(())
}
pub async fn upsert_node(&self, node: &NodeInfo) -> eyre::Result<()> {
let node_id = format!("{:?}", node.node_id);
let capabilities = node.capabilities_string();
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() as i64;
sqlx::query(
r#"
INSERT INTO nodes (node_id, enode, ip, tcp_port, udp_port, client_version, capabilities, eth_version, chain_id, country_code, first_seen, last_seen, last_error, last_checked, is_alive, consecutive_failures)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, 1, 0)
ON CONFLICT(node_id) DO UPDATE SET
enode = excluded.enode,
ip = excluded.ip,
tcp_port = excluded.tcp_port,
udp_port = excluded.udp_port,
client_version = excluded.client_version,
capabilities = excluded.capabilities,
eth_version = excluded.eth_version,
chain_id = excluded.chain_id,
country_code = excluded.country_code,
last_seen = excluded.last_seen,
last_error = excluded.last_error,
last_checked = excluded.last_checked,
is_alive = 1,
consecutive_failures = 0
"#,
)
.bind(&node_id)
.bind(&node.enode)
.bind(node.ip.to_string())
.bind(node.tcp_port as i32)
.bind(node.udp_port as i32)
.bind(&node.client_version)
.bind(&capabilities)
.bind(node.eth_version.map(|v| v as i32))
.bind(node.chain_id.map(|v| v as i64))
.bind(&node.country_code)
.bind(now)
.bind(now)
.bind(&node.last_error)
.bind(now)
.execute(&self.pool)
.await?;
Ok(())
}
/// Get nodes that need to be rechecked (oldest `last_checked` first)
pub async fn get_stale_nodes(
&self,
limit: u32,
max_age_secs: u64,
) -> eyre::Result<Vec<StaleNode>> {
let cutoff =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() as i64 - max_age_secs as i64;
let rows: Vec<(String, String, i32, i32)> = sqlx::query_as(
r#"
SELECT node_id, ip, tcp_port, udp_port
FROM nodes
WHERE is_alive = 1 AND (last_checked IS NULL OR last_checked < ?1)
ORDER BY last_checked ASC NULLS FIRST
LIMIT ?2
"#,
)
.bind(cutoff)
.bind(limit as i32)
.fetch_all(&self.pool)
.await?;
rows.into_iter()
.filter_map(|(node_id, ip, tcp_port, udp_port)| {
Some(StaleNode {
node_id: node_id.parse().ok()?,
ip: ip.parse().ok()?,
tcp_port: tcp_port as u16,
udp_port: udp_port as u16,
})
})
.collect::<Vec<_>>()
.pipe(Ok)
}
/// Update a node after successful recheck
pub async fn update_node_alive(
&self,
node_id: &B512,
client_version: &str,
country_code: Option<&str>,
) -> eyre::Result<()> {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() as i64;
let node_id_str = format!("{:?}", node_id);
sqlx::query(
r#"
UPDATE nodes SET
client_version = ?1,
country_code = ?2,
last_seen = ?3,
last_checked = ?3,
is_alive = 1,
consecutive_failures = 0,
last_error = NULL
WHERE node_id = ?4
"#,
)
.bind(client_version)
.bind(country_code)
.bind(now)
.bind(&node_id_str)
.execute(&self.pool)
.await?;
Ok(())
}
/// Increment failure count for a node, mark dead after threshold
pub async fn increment_failure(
&self,
node_id: &B512,
error: &str,
max_failures: u32,
) -> eyre::Result<()> {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() as i64;
let node_id_str = format!("{:?}", node_id);
sqlx::query(
r#"
UPDATE nodes SET
last_checked = ?1,
last_error = ?2,
consecutive_failures = consecutive_failures + 1,
is_alive = CASE WHEN consecutive_failures + 1 >= ?3 THEN 0 ELSE 1 END
WHERE node_id = ?4
"#,
)
.bind(now)
.bind(error)
.bind(max_failures as i32)
.bind(&node_id_str)
.execute(&self.pool)
.await?;
Ok(())
}
#[allow(clippy::type_complexity)]
pub async fn get_all_nodes(&self) -> eyre::Result<Vec<NodeInfo>> {
type NodeRow = (
String,
String,
String,
i32,
i32,
String,
String,
Option<i32>,
Option<i64>,
Option<String>,
i64,
i64,
Option<String>,
Option<i64>,
i32,
i32,
);
let rows: Vec<NodeRow> = sqlx::query_as(
"SELECT node_id, enode, ip, tcp_port, udp_port, client_version, capabilities, eth_version, chain_id, country_code, first_seen, last_seen, last_error, last_checked, is_alive, consecutive_failures FROM nodes",
)
.fetch_all(&self.pool)
.await?;
rows.into_iter()
.map(|row| {
Ok(NodeInfo {
node_id: row.0.parse().unwrap_or_default(),
enode: row.1,
ip: row.2.parse()?,
tcp_port: row.3 as u16,
udp_port: row.4 as u16,
client_version: row.5,
capabilities: row.6.split(',').map(String::from).collect(),
eth_version: row.7.map(|v| v as u8),
chain_id: row.8.map(|v| v as u64),
country_code: row.9,
first_seen: row.10 as u64,
last_seen: row.11 as u64,
last_error: row.12,
last_checked: row.13.map(|v| v as u64),
is_alive: row.14 != 0,
consecutive_failures: row.15 as u32,
})
})
.collect()
}
pub async fn get_stats_by_version(&self) -> eyre::Result<Vec<(String, i64)>> {
let rows: Vec<(String, i64)> = sqlx::query_as(
"SELECT client_version, COUNT(*) as count FROM nodes WHERE is_alive = 1 GROUP BY client_version ORDER BY count DESC",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_stats_by_country(&self) -> eyre::Result<Vec<(String, i64)>> {
let rows: Vec<(String, i64)> = sqlx::query_as(
"SELECT COALESCE(country_code, 'Unknown') as country, COUNT(*) as count FROM nodes WHERE is_alive = 1 GROUP BY country_code ORDER BY count DESC",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn count_nodes(&self) -> eyre::Result<i64> {
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM nodes")
.fetch_one(&self.pool)
.await?;
Ok(count)
}
pub async fn count_alive_nodes(&self) -> eyre::Result<i64> {
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM nodes WHERE is_alive = 1")
.fetch_one(&self.pool)
.await?;
Ok(count)
}
}
/// A node that needs to be rechecked
#[derive(Debug, Clone)]
pub struct StaleNode {
pub node_id: B512,
pub ip: IpAddr,
pub tcp_port: u16,
pub udp_port: u16,
}
impl StaleNode {
pub const fn to_node_record(&self) -> NodeRecord {
NodeRecord {
address: self.ip,
tcp_port: self.tcp_port,
udp_port: self.udp_port,
id: self.node_id,
}
}
}
trait Pipe: Sized {
fn pipe<R>(self, f: impl FnOnce(Self) -> R) -> R {
f(self)
}
}
impl<T> Pipe for T {}

View File

@@ -0,0 +1,28 @@
use alloy_primitives::B512;
use std::net::IpAddr;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NodeInfo {
pub node_id: B512,
pub enode: String,
pub ip: IpAddr,
pub tcp_port: u16,
pub udp_port: u16,
pub client_version: String,
pub capabilities: Vec<String>,
pub eth_version: Option<u8>,
pub chain_id: Option<u64>,
pub country_code: Option<String>,
pub first_seen: u64,
pub last_seen: u64,
pub last_error: Option<String>,
pub last_checked: Option<u64>,
pub is_alive: bool,
pub consecutive_failures: u32,
}
impl NodeInfo {
pub fn capabilities_string(&self) -> String {
self.capabilities.join(",")
}
}