From f59a82e4c627cd0fa93b40321f29b7f98799ae92 Mon Sep 17 00:00:00 2001 From: Shane K Moore <41407272+shane-moore@users.noreply.github.com> Date: Thu, 19 Jun 2025 11:46:34 -0700 Subject: [PATCH] chore: add node synced helper (#16928) --- crates/rpc/rpc/src/eth/helpers/mod.rs | 3 + .../rpc/rpc/src/eth/helpers/sync_listener.rs | 133 ++++++++++++++++++ crates/rpc/rpc/src/eth/mod.rs | 2 +- crates/rpc/rpc/src/lib.rs | 2 +- 4 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 crates/rpc/rpc/src/eth/helpers/sync_listener.rs diff --git a/crates/rpc/rpc/src/eth/helpers/mod.rs b/crates/rpc/rpc/src/eth/helpers/mod.rs index 03e0443a15..15fcf612d9 100644 --- a/crates/rpc/rpc/src/eth/helpers/mod.rs +++ b/crates/rpc/rpc/src/eth/helpers/mod.rs @@ -2,6 +2,7 @@ //! files. pub mod signer; +pub mod sync_listener; pub mod types; mod block; @@ -13,3 +14,5 @@ mod spec; mod state; mod trace; mod transaction; + +pub use sync_listener::SyncListener; diff --git a/crates/rpc/rpc/src/eth/helpers/sync_listener.rs b/crates/rpc/rpc/src/eth/helpers/sync_listener.rs new file mode 100644 index 0000000000..13c8de19b0 --- /dev/null +++ b/crates/rpc/rpc/src/eth/helpers/sync_listener.rs @@ -0,0 +1,133 @@ +//! A utility Future to asynchronously wait until a node has finished syncing. + +use futures::Stream; +use pin_project::pin_project; +use reth_network_api::NetworkInfo; +use std::{ + future::Future, + pin::Pin, + task::{ready, Context, Poll}, +}; + +/// This future resolves once the node is no longer syncing: [`NetworkInfo::is_syncing`]. +#[must_use = "futures do nothing unless polled"] +#[pin_project] +#[derive(Debug)] +pub struct SyncListener { + #[pin] + tick: St, + network_info: N, +} + +impl SyncListener { + /// Create a new [`SyncListener`] using the given tick stream. + pub const fn new(network_info: N, tick: St) -> Self { + Self { tick, network_info } + } +} + +impl Future for SyncListener +where + N: NetworkInfo, + St: Stream + Unpin, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + if !this.network_info.is_syncing() { + return Poll::Ready(()); + } + + loop { + let tick_event = ready!(this.tick.as_mut().poll_next(cx)); + + match tick_event { + Some(_) => { + if !this.network_info.is_syncing() { + return Poll::Ready(()); + } + } + None => return Poll::Ready(()), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_rpc_types_admin::EthProtocolInfo; + use futures::stream; + use reth_network_api::{NetworkError, NetworkStatus}; + use std::{ + net::{IpAddr, SocketAddr}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + }; + + #[derive(Clone)] + struct TestNetwork { + syncing: Arc, + } + + impl NetworkInfo for TestNetwork { + fn local_addr(&self) -> SocketAddr { + (IpAddr::from([0, 0, 0, 0]), 0).into() + } + + async fn network_status(&self) -> Result { + #[allow(deprecated)] + Ok(NetworkStatus { + client_version: "test".to_string(), + protocol_version: 5, + eth_protocol_info: EthProtocolInfo { + network: 1, + difficulty: None, + genesis: Default::default(), + config: Default::default(), + head: Default::default(), + }, + }) + } + + fn chain_id(&self) -> u64 { + 1 + } + + fn is_syncing(&self) -> bool { + self.syncing.load(Ordering::SeqCst) + } + + fn is_initially_syncing(&self) -> bool { + self.is_syncing() + } + } + + #[tokio::test] + async fn completes_immediately_if_not_syncing() { + let network = TestNetwork { syncing: Arc::new(AtomicBool::new(false)) }; + let fut = SyncListener::new(network, stream::pending::<()>()); + fut.await; + } + + #[tokio::test] + async fn resolves_when_syncing_stops() { + use tokio::sync::mpsc::unbounded_channel; + use tokio_stream::wrappers::UnboundedReceiverStream; + + let syncing = Arc::new(AtomicBool::new(true)); + let network = TestNetwork { syncing: syncing.clone() }; + let (tx, rx) = unbounded_channel(); + let listener = SyncListener::new(network, UnboundedReceiverStream::new(rx)); + let handle = tokio::spawn(listener); + + syncing.store(false, Ordering::Relaxed); + let _ = tx.send(()); + + handle.await.unwrap(); + } +} diff --git a/crates/rpc/rpc/src/eth/mod.rs b/crates/rpc/rpc/src/eth/mod.rs index b4dca3b9f2..af8619de86 100644 --- a/crates/rpc/rpc/src/eth/mod.rs +++ b/crates/rpc/rpc/src/eth/mod.rs @@ -15,6 +15,6 @@ pub use core::{EthApi, EthApiFor}; pub use filter::EthFilter; pub use pubsub::EthPubSub; -pub use helpers::signer::DevSigner; +pub use helpers::{signer::DevSigner, sync_listener::SyncListener}; pub use reth_rpc_eth_api::{EthApiServer, EthApiTypes, FullEthApiServer, RpcNodeCore}; diff --git a/crates/rpc/rpc/src/lib.rs b/crates/rpc/rpc/src/lib.rs index bac57b6303..690fb33e87 100644 --- a/crates/rpc/rpc/src/lib.rs +++ b/crates/rpc/rpc/src/lib.rs @@ -49,7 +49,7 @@ mod web3; pub use admin::AdminApi; pub use debug::DebugApi; pub use engine::{EngineApi, EngineEthApi}; -pub use eth::{EthApi, EthApiBuilder, EthBundle, EthFilter, EthPubSub}; +pub use eth::{helpers::SyncListener, EthApi, EthApiBuilder, EthBundle, EthFilter, EthPubSub}; pub use miner::MinerApi; pub use net::NetApi; pub use otterscan::OtterscanApi;