refactor(examples): replace mev-share-sse with reqwest bytes_stream in beacon-api-sse (#23458)

Co-authored-by: Matthias Seitz <19890894+mattsse@users.noreply.github.com>
This commit is contained in:
Derek Cofausper
2026-04-11 12:33:34 +02:00
committed by GitHub
parent 0c278f5fab
commit 7035bbcf3a
4 changed files with 133 additions and 249 deletions

239
Cargo.lock generated
View File

@@ -1393,17 +1393,6 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.4.41"
@@ -1416,20 +1405,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-sse"
version = "5.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e6fa871e4334a622afd6bb2f611635e8083a6f5e2936c0f90f37c7ef9856298"
dependencies = [
"async-channel",
"futures-lite 1.13.0",
"http-types",
"log",
"memchr",
"pin-project-lite",
]
[[package]]
name = "async-stream"
version = "0.3.6"
@@ -1544,7 +1519,7 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef"
dependencies = [
"fastrand 2.4.1",
"fastrand",
"tokio",
]
@@ -1585,12 +1560,6 @@ dependencies = [
"match-lookup",
]
[[package]]
name = "base64"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.21.7"
@@ -1791,7 +1760,7 @@ dependencies = [
"float16",
"futures-channel",
"futures-concurrency",
"futures-lite 2.6.1",
"futures-lite",
"hashbrown 0.16.1",
"icu_normalizer",
"indexmap 2.13.1",
@@ -2487,15 +2456,6 @@ dependencies = [
"digest 0.10.7",
]
[[package]]
name = "concurrent-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console"
version = "0.15.11"
@@ -3515,12 +3475,6 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "example-beacon-api-sidecar-fetcher"
version = "0.1.0"
@@ -3544,10 +3498,12 @@ name = "example-beacon-api-sse"
version = "0.0.0"
dependencies = [
"alloy-rpc-types-beacon",
"bytes",
"clap",
"futures-util",
"mev-share-sse",
"reqwest 0.13.2",
"reth-ethereum",
"serde_json",
"tokio",
"tracing",
]
@@ -3886,15 +3842,6 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8eb564c5c7423d25c886fb561d1e4ee69f72354d16918afa32c08811f6b6a55"
[[package]]
name = "fastrand"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
dependencies = [
"instant",
]
[[package]]
name = "fastrand"
version = "2.4.1"
@@ -4117,7 +4064,7 @@ checksum = "175cd8cca9e1d45b87f18ffa75088f2099e3c4fe5e2f83e42de112560bea8ea6"
dependencies = [
"fixedbitset",
"futures-core",
"futures-lite 2.6.1",
"futures-lite",
"pin-project",
"smallvec",
]
@@ -4145,28 +4092,13 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]]
name = "futures-lite"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
dependencies = [
"fastrand 1.9.0",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-lite"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad"
dependencies = [
"fastrand 2.4.1",
"fastrand",
"futures-core",
"futures-io",
"parking",
@@ -4255,17 +4187,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "getrandom"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
dependencies = [
"cfg-if",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.2.17"
@@ -4275,7 +4196,7 @@ dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi 0.11.1+wasi-snapshot-preview1",
"wasi",
"wasm-bindgen",
]
@@ -4649,26 +4570,6 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c"
[[package]]
name = "http-types"
version = "2.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad"
dependencies = [
"anyhow",
"async-channel",
"base64 0.13.1",
"futures-lite 1.13.0",
"infer",
"pin-project-lite",
"rand 0.7.3",
"serde",
"serde_json",
"serde_qs",
"serde_urlencoded",
"url",
]
[[package]]
name = "httparse"
version = "1.10.1"
@@ -5012,12 +4913,6 @@ dependencies = [
"rustversion",
]
[[package]]
name = "infer"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac"
[[package]]
name = "inotify"
version = "0.11.1"
@@ -5073,15 +4968,6 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "instant"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
dependencies = [
"cfg-if",
]
[[package]]
name = "interprocess"
version = "2.4.0"
@@ -5900,26 +5786,6 @@ dependencies = [
"sketches-ddsketch",
]
[[package]]
name = "mev-share-sse"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd9e517b6c1d1143b35b716ec1107a493b2ce1143a35cbb9788e81f69c6f574c"
dependencies = [
"alloy-rpc-types-mev",
"async-sse",
"bytes",
"futures-util",
"http-types",
"pin-project-lite",
"reqwest 0.12.28",
"serde",
"serde_json",
"thiserror 2.0.18",
"tokio",
"tracing",
]
[[package]]
name = "mime"
version = "0.3.17"
@@ -5960,7 +5826,7 @@ checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1"
dependencies = [
"libc",
"log",
"wasi 0.11.1+wasi-snapshot-preview1",
"wasi",
"windows-sys 0.61.2",
]
@@ -6571,7 +6437,7 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "135ace3a761e564ec88c03a77317a7c6b80bb7f7135ef2544dbe054243b89737"
dependencies = [
"fastrand 2.4.1",
"fastrand",
"phf_shared",
]
@@ -6934,7 +6800,7 @@ dependencies = [
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.1+wasi-snapshot-preview1",
"wasi",
"web-sys",
"winapi",
]
@@ -7037,19 +6903,6 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09"
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom 0.1.16",
"libc",
"rand_chacha 0.2.2",
"rand_core 0.5.1",
"rand_hc",
]
[[package]]
name = "rand"
version = "0.8.5"
@@ -7073,16 +6926,6 @@ dependencies = [
"serde",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core 0.5.1",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
@@ -7103,15 +6946,6 @@ dependencies = [
"rand_core 0.9.5",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom 0.1.16",
]
[[package]]
name = "rand_core"
version = "0.6.4"
@@ -7131,15 +6965,6 @@ dependencies = [
"serde",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "rand_xorshift"
version = "0.4.0"
@@ -7379,14 +7204,12 @@ dependencies = [
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tokio-util",
"tower",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams 0.4.2",
"web-sys",
]
@@ -7428,7 +7251,7 @@ dependencies = [
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams 0.5.0",
"wasm-streams",
"web-sys",
]
@@ -11375,17 +11198,6 @@ dependencies = [
"zmij",
]
[[package]]
name = "serde_qs"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6"
dependencies = [
"percent-encoding",
"serde",
"thiserror 1.0.69",
]
[[package]]
name = "serde_spanned"
version = "1.1.1"
@@ -11859,7 +11671,7 @@ version = "3.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
dependencies = [
"fastrand 2.4.1",
"fastrand",
"getrandom 0.4.2",
"once_cell",
"rustix",
@@ -12873,12 +12685,6 @@ dependencies = [
"libc",
]
[[package]]
name = "waker-fn"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7"
[[package]]
name = "walkdir"
version = "2.5.0"
@@ -12898,12 +12704,6 @@ dependencies = [
"try-lock",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"
@@ -13005,19 +12805,6 @@ dependencies = [
"wasmparser",
]
[[package]]
name = "wasm-streams"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "wasm-streams"
version = "0.5.0"

View File

@@ -671,7 +671,6 @@ indexmap = "2"
interprocess = "2.2.0"
lz4_flex = { version = "0.12", default-features = false }
memmap2 = "0.9.4"
mev-share-sse = { version = "0.5.0", default-features = false }
num-traits = "0.2.15"
page_size = "0.6.0"
plain_hasher = "0.2"

View File

@@ -10,8 +10,10 @@ reth-ethereum = { workspace = true, features = ["node", "cli"] }
alloy-rpc-types-beacon.workspace = true
bytes.workspace = true
clap.workspace = true
futures-util.workspace = true
mev-share-sse.workspace = true
reqwest.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["time"] }
tracing.workspace = true

View File

@@ -18,9 +18,9 @@
#![warn(unused_crate_dependencies)]
use alloy_rpc_types_beacon::events::PayloadAttributesEvent;
use bytes::BytesMut;
use clap::Parser;
use futures_util::stream::StreamExt;
use mev_share_sse::{client::EventStream, EventClient};
use reth_ethereum::{
cli::{chainspec::EthereumChainSpecParser, interface::Cli},
node::EthereumNode,
@@ -64,36 +64,102 @@ impl BeaconEventsConfig {
/// Service that subscribes to beacon chain payload attributes events
async fn run(self) {
let client = EventClient::default();
let mut subscription = self.new_payload_attributes_subscription(&client).await;
while let Some(event) = subscription.next().await {
info!("Received payload attributes: {:?}", event);
}
}
// It can take a bit until the CL endpoint is live so we retry a few times
async fn new_payload_attributes_subscription(
&self,
client: &EventClient,
) -> EventStream<PayloadAttributesEvent> {
let client = reqwest::Client::new();
let payloads_url = format!("{}?topics=payload_attributes", self.events_url());
loop {
match client.subscribe(&payloads_url).await {
Ok(subscription) => return subscription,
let response = match client.get(&payloads_url).send().await {
Ok(resp) => match resp.error_for_status() {
Ok(resp) => resp,
Err(err) => {
warn!(?err, "Beacon SSE endpoint returned error status, retrying in 5s");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
},
Err(err) => {
warn!(
"Failed to subscribe to payload attributes events: {:?}\nRetrying in 5 seconds...",
err
);
warn!(?err, "Failed to connect to beacon SSE endpoint, retrying in 5s");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
};
let mut stream = response.bytes_stream();
let mut buf = BytesMut::new();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
buf.extend_from_slice(&bytes);
while let Some((pos, delim_len)) = find_event_boundary(&buf) {
let event_bytes = buf.split_to(pos);
let _ = buf.split_to(delim_len);
let event_str = String::from_utf8_lossy(&event_bytes);
if let Some(data) = extract_sse_data(&event_str) {
match serde_json::from_str::<PayloadAttributesEvent>(&data) {
Ok(event) => {
info!("Received payload attributes: {:?}", event);
}
Err(err) => {
warn!(?err, "Failed to deserialize payload attributes");
}
}
}
}
}
Err(err) => {
warn!(?err, "SSE stream error, reconnecting in 5s");
break;
}
}
}
warn!("SSE stream ended, reconnecting in 5s");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
/// Finds the position and length of the first double-newline SSE event delimiter in the buffer.
fn find_event_boundary(buf: &[u8]) -> Option<(usize, usize)> {
for i in 0..buf.len().saturating_sub(1) {
if buf[i] == b'\n' && buf[i + 1] == b'\n' {
return Some((i, 2));
}
if i + 3 < buf.len() &&
buf[i] == b'\r' &&
buf[i + 1] == b'\n' &&
buf[i + 2] == b'\r' &&
buf[i + 3] == b'\n'
{
return Some((i, 4));
}
}
None
}
/// Extracts the `data:` field value from an SSE event string.
///
/// Multiple `data:` lines are joined with newlines per the SSE spec.
fn extract_sse_data(event: &str) -> Option<String> {
let mut out = String::new();
for line in event.lines() {
if let Some(data) = line.strip_prefix("data:") {
if !out.is_empty() {
out.push('\n');
}
out.push_str(data.trim_start());
}
}
if out.is_empty() {
None
} else {
Some(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -103,4 +169,34 @@ mod tests {
let args = BeaconEventsConfig::try_parse_from(["reth"]);
assert!(args.is_ok());
}
#[test]
fn find_boundary_lf() {
let buf = b"data: hello\n\ndata: world\n\n";
let (pos, len) = find_event_boundary(buf).unwrap();
assert_eq!(pos, 11);
assert_eq!(len, 2);
}
#[test]
fn find_boundary_crlf() {
let buf = b"data: hello\r\n\r\ndata: world\r\n\r\n";
let (pos, len) = find_event_boundary(buf).unwrap();
assert_eq!(pos, 11);
assert_eq!(len, 4);
}
#[test]
fn extract_single_data_line() {
let event = "event: payload_attributes\ndata: {\"key\":\"value\"}";
let data = extract_sse_data(event).unwrap();
assert_eq!(data, "{\"key\":\"value\"}");
}
#[test]
fn extract_multiple_data_lines() {
let event = "data: line1\ndata: line2";
let data = extract_sse_data(event).unwrap();
assert_eq!(data, "line1\nline2");
}
}