diff --git a/Cargo.lock b/Cargo.lock index 0c047823b4..fbe3c9c1a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1393,17 +1393,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - [[package]] name = "async-compression" version = "0.4.41" @@ -1416,20 +1405,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "async-sse" -version = "5.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e6fa871e4334a622afd6bb2f611635e8083a6f5e2936c0f90f37c7ef9856298" -dependencies = [ - "async-channel", - "futures-lite 1.13.0", - "http-types", - "log", - "memchr", - "pin-project-lite", -] - [[package]] name = "async-stream" version = "0.3.6" @@ -1544,7 +1519,7 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" dependencies = [ - "fastrand 2.4.1", + "fastrand", "tokio", ] @@ -1585,12 +1560,6 @@ dependencies = [ "match-lookup", ] -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.21.7" @@ -1791,7 +1760,7 @@ dependencies = [ "float16", "futures-channel", "futures-concurrency", - "futures-lite 2.6.1", + "futures-lite", "hashbrown 0.16.1", "icu_normalizer", "indexmap 2.13.1", @@ -2487,15 +2456,6 @@ dependencies = [ "digest 0.10.7", ] -[[package]] -name = "concurrent-queue" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "console" version = "0.15.11" @@ -3515,12 +3475,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "example-beacon-api-sidecar-fetcher" version = "0.1.0" @@ -3544,10 +3498,12 @@ name = "example-beacon-api-sse" version = "0.0.0" dependencies = [ "alloy-rpc-types-beacon", + "bytes", "clap", "futures-util", - "mev-share-sse", + "reqwest 0.13.2", "reth-ethereum", + "serde_json", "tokio", "tracing", ] @@ -3886,15 +3842,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8eb564c5c7423d25c886fb561d1e4ee69f72354d16918afa32c08811f6b6a55" -[[package]] -name = "fastrand" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] - [[package]] name = "fastrand" version = "2.4.1" @@ -4117,7 +4064,7 @@ checksum = "175cd8cca9e1d45b87f18ffa75088f2099e3c4fe5e2f83e42de112560bea8ea6" dependencies = [ "fixedbitset", "futures-core", - "futures-lite 2.6.1", + "futures-lite", "pin-project", "smallvec", ] @@ -4145,28 +4092,13 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand 1.9.0", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-lite" version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" dependencies = [ - "fastrand 2.4.1", + "fastrand", "futures-core", "futures-io", "parking", @@ -4255,17 +4187,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "getrandom" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - [[package]] name = "getrandom" version = "0.2.17" @@ -4275,7 +4196,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi 0.11.1+wasi-snapshot-preview1", + "wasi", "wasm-bindgen", ] @@ -4649,26 +4570,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c" -[[package]] -name = "http-types" -version = "2.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" -dependencies = [ - "anyhow", - "async-channel", - "base64 0.13.1", - "futures-lite 1.13.0", - "infer", - "pin-project-lite", - "rand 0.7.3", - "serde", - "serde_json", - "serde_qs", - "serde_urlencoded", - "url", -] - [[package]] name = "httparse" version = "1.10.1" @@ -5012,12 +4913,6 @@ dependencies = [ "rustversion", ] -[[package]] -name = "infer" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" - [[package]] name = "inotify" version = "0.11.1" @@ -5073,15 +4968,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "instant" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" -dependencies = [ - "cfg-if", -] - [[package]] name = "interprocess" version = "2.4.0" @@ -5900,26 +5786,6 @@ dependencies = [ "sketches-ddsketch", ] -[[package]] -name = "mev-share-sse" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd9e517b6c1d1143b35b716ec1107a493b2ce1143a35cbb9788e81f69c6f574c" -dependencies = [ - "alloy-rpc-types-mev", - "async-sse", - "bytes", - "futures-util", - "http-types", - "pin-project-lite", - "reqwest 0.12.28", - "serde", - "serde_json", - "thiserror 2.0.18", - "tokio", - "tracing", -] - [[package]] name = "mime" version = "0.3.17" @@ -5960,7 +5826,7 @@ checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", "log", - "wasi 0.11.1+wasi-snapshot-preview1", + "wasi", "windows-sys 0.61.2", ] @@ -6571,7 +6437,7 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "135ace3a761e564ec88c03a77317a7c6b80bb7f7135ef2544dbe054243b89737" dependencies = [ - "fastrand 2.4.1", + "fastrand", "phf_shared", ] @@ -6934,7 +6800,7 @@ dependencies = [ "libc", "once_cell", "raw-cpuid", - "wasi 0.11.1+wasi-snapshot-preview1", + "wasi", "web-sys", "winapi", ] @@ -7037,19 +6903,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc", -] - [[package]] name = "rand" version = "0.8.5" @@ -7073,16 +6926,6 @@ dependencies = [ "serde", ] -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", -] - [[package]] name = "rand_chacha" version = "0.3.1" @@ -7103,15 +6946,6 @@ dependencies = [ "rand_core 0.9.5", ] -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", -] - [[package]] name = "rand_core" version = "0.6.4" @@ -7131,15 +6965,6 @@ dependencies = [ "serde", ] -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", -] - [[package]] name = "rand_xorshift" version = "0.4.0" @@ -7379,14 +7204,12 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams 0.4.2", "web-sys", ] @@ -7428,7 +7251,7 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams 0.5.0", + "wasm-streams", "web-sys", ] @@ -11375,17 +11198,6 @@ dependencies = [ "zmij", ] -[[package]] -name = "serde_qs" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" -dependencies = [ - "percent-encoding", - "serde", - "thiserror 1.0.69", -] - [[package]] name = "serde_spanned" version = "1.1.1" @@ -11859,7 +11671,7 @@ version = "3.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ - "fastrand 2.4.1", + "fastrand", "getrandom 0.4.2", "once_cell", "rustix", @@ -12873,12 +12685,6 @@ dependencies = [ "libc", ] -[[package]] -name = "waker-fn" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" - [[package]] name = "walkdir" version = "2.5.0" @@ -12898,12 +12704,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -13005,19 +12805,6 @@ dependencies = [ "wasmparser", ] -[[package]] -name = "wasm-streams" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" -dependencies = [ - "futures-util", - "js-sys", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "wasm-streams" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index 233c42b7e2..d7c9ccc52d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -671,7 +671,6 @@ indexmap = "2" interprocess = "2.2.0" lz4_flex = { version = "0.12", default-features = false } memmap2 = "0.9.4" -mev-share-sse = { version = "0.5.0", default-features = false } num-traits = "0.2.15" page_size = "0.6.0" plain_hasher = "0.2" diff --git a/examples/beacon-api-sse/Cargo.toml b/examples/beacon-api-sse/Cargo.toml index b0a8984122..1f30eeebc8 100644 --- a/examples/beacon-api-sse/Cargo.toml +++ b/examples/beacon-api-sse/Cargo.toml @@ -10,8 +10,10 @@ reth-ethereum = { workspace = true, features = ["node", "cli"] } alloy-rpc-types-beacon.workspace = true +bytes.workspace = true clap.workspace = true futures-util.workspace = true -mev-share-sse.workspace = true +reqwest.workspace = true +serde_json.workspace = true tokio = { workspace = true, features = ["time"] } tracing.workspace = true diff --git a/examples/beacon-api-sse/src/main.rs b/examples/beacon-api-sse/src/main.rs index f339053703..3a218f7085 100644 --- a/examples/beacon-api-sse/src/main.rs +++ b/examples/beacon-api-sse/src/main.rs @@ -18,9 +18,9 @@ #![warn(unused_crate_dependencies)] use alloy_rpc_types_beacon::events::PayloadAttributesEvent; +use bytes::BytesMut; use clap::Parser; use futures_util::stream::StreamExt; -use mev_share_sse::{client::EventStream, EventClient}; use reth_ethereum::{ cli::{chainspec::EthereumChainSpecParser, interface::Cli}, node::EthereumNode, @@ -64,36 +64,102 @@ impl BeaconEventsConfig { /// Service that subscribes to beacon chain payload attributes events async fn run(self) { - let client = EventClient::default(); - - let mut subscription = self.new_payload_attributes_subscription(&client).await; - - while let Some(event) = subscription.next().await { - info!("Received payload attributes: {:?}", event); - } - } - - // It can take a bit until the CL endpoint is live so we retry a few times - async fn new_payload_attributes_subscription( - &self, - client: &EventClient, - ) -> EventStream { + let client = reqwest::Client::new(); let payloads_url = format!("{}?topics=payload_attributes", self.events_url()); + loop { - match client.subscribe(&payloads_url).await { - Ok(subscription) => return subscription, + let response = match client.get(&payloads_url).send().await { + Ok(resp) => match resp.error_for_status() { + Ok(resp) => resp, + Err(err) => { + warn!(?err, "Beacon SSE endpoint returned error status, retrying in 5s"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + continue; + } + }, Err(err) => { - warn!( - "Failed to subscribe to payload attributes events: {:?}\nRetrying in 5 seconds...", - err - ); + warn!(?err, "Failed to connect to beacon SSE endpoint, retrying in 5s"); tokio::time::sleep(std::time::Duration::from_secs(5)).await; + continue; + } + }; + + let mut stream = response.bytes_stream(); + let mut buf = BytesMut::new(); + + while let Some(chunk) = stream.next().await { + match chunk { + Ok(bytes) => { + buf.extend_from_slice(&bytes); + + while let Some((pos, delim_len)) = find_event_boundary(&buf) { + let event_bytes = buf.split_to(pos); + let _ = buf.split_to(delim_len); + + let event_str = String::from_utf8_lossy(&event_bytes); + if let Some(data) = extract_sse_data(&event_str) { + match serde_json::from_str::(&data) { + Ok(event) => { + info!("Received payload attributes: {:?}", event); + } + Err(err) => { + warn!(?err, "Failed to deserialize payload attributes"); + } + } + } + } + } + Err(err) => { + warn!(?err, "SSE stream error, reconnecting in 5s"); + break; + } } } + + warn!("SSE stream ended, reconnecting in 5s"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } } +/// Finds the position and length of the first double-newline SSE event delimiter in the buffer. +fn find_event_boundary(buf: &[u8]) -> Option<(usize, usize)> { + for i in 0..buf.len().saturating_sub(1) { + if buf[i] == b'\n' && buf[i + 1] == b'\n' { + return Some((i, 2)); + } + if i + 3 < buf.len() && + buf[i] == b'\r' && + buf[i + 1] == b'\n' && + buf[i + 2] == b'\r' && + buf[i + 3] == b'\n' + { + return Some((i, 4)); + } + } + None +} + +/// Extracts the `data:` field value from an SSE event string. +/// +/// Multiple `data:` lines are joined with newlines per the SSE spec. +fn extract_sse_data(event: &str) -> Option { + let mut out = String::new(); + for line in event.lines() { + if let Some(data) = line.strip_prefix("data:") { + if !out.is_empty() { + out.push('\n'); + } + out.push_str(data.trim_start()); + } + } + if out.is_empty() { + None + } else { + Some(out) + } +} + #[cfg(test)] mod tests { use super::*; @@ -103,4 +169,34 @@ mod tests { let args = BeaconEventsConfig::try_parse_from(["reth"]); assert!(args.is_ok()); } + + #[test] + fn find_boundary_lf() { + let buf = b"data: hello\n\ndata: world\n\n"; + let (pos, len) = find_event_boundary(buf).unwrap(); + assert_eq!(pos, 11); + assert_eq!(len, 2); + } + + #[test] + fn find_boundary_crlf() { + let buf = b"data: hello\r\n\r\ndata: world\r\n\r\n"; + let (pos, len) = find_event_boundary(buf).unwrap(); + assert_eq!(pos, 11); + assert_eq!(len, 4); + } + + #[test] + fn extract_single_data_line() { + let event = "event: payload_attributes\ndata: {\"key\":\"value\"}"; + let data = extract_sse_data(event).unwrap(); + assert_eq!(data, "{\"key\":\"value\"}"); + } + + #[test] + fn extract_multiple_data_lines() { + let event = "data: line1\ndata: line2"; + let data = extract_sse_data(event).unwrap(); + assert_eq!(data, "line1\nline2"); + } }