From cdf5d8e9ee3d9004716211f3b0a4caff8a173250 Mon Sep 17 00:00:00 2001 From: Jennifer Date: Sat, 10 Aug 2024 15:02:49 +0100 Subject: [PATCH] Add generic BlockStateNotificationStream type for pending, safe, finalized watchers (#10249) --- crates/chain-state/src/lib.rs | 6 ++-- crates/chain-state/src/notifications.rs | 42 +++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/crates/chain-state/src/lib.rs b/crates/chain-state/src/lib.rs index 50a1031110..48d996881c 100644 --- a/crates/chain-state/src/lib.rs +++ b/crates/chain-state/src/lib.rs @@ -16,9 +16,9 @@ pub use chain_info::ChainInfoTracker; mod notifications; pub use notifications::{ - CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream, - CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream, - ForkChoiceSubscriptions, + BlockStateNotificationStream, CanonStateNotification, CanonStateNotificationSender, + CanonStateNotificationStream, CanonStateNotifications, CanonStateSubscriptions, + ForkChoiceNotifications, ForkChoiceStream, ForkChoiceSubscriptions, }; mod memory_overlay; diff --git a/crates/chain-state/src/notifications.rs b/crates/chain-state/src/notifications.rs index 66a1342e7c..780f036ebf 100644 --- a/crates/chain-state/src/notifications.rs +++ b/crates/chain-state/src/notifications.rs @@ -9,8 +9,11 @@ use std::{ sync::Arc, task::{ready, Context, Poll}, }; -use tokio::sync::broadcast; -use tokio_stream::{wrappers::BroadcastStream, Stream}; +use tokio::sync::{broadcast, watch}; +use tokio_stream::{ + wrappers::{BroadcastStream, WatchStream}, + Stream, +}; use tracing::debug; /// Type alias for a receiver that receives [`CanonStateNotification`] @@ -175,3 +178,38 @@ impl Stream for ForkChoiceStream { } } } + +/// A stream for block state watch channels (pending, safe or finalized watchers) +#[derive(Debug)] +#[pin_project::pin_project] +pub struct BlockStateNotificationStream { + #[pin] + st: WatchStream>, +} + +impl BlockStateNotificationStream { + /// Creates a new `BlockStateNotificationStream` + pub fn new(rx: watch::Receiver>) -> Self { + Self { st: WatchStream::new(rx) } + } +} + +impl Stream for BlockStateNotificationStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + return match ready!(self.as_mut().project().st.poll_next(cx)) { + Some(notification) => { + if notification.is_some() { + return Poll::Ready(notification); + } else { + // skip None values + continue; + } + } + None => Poll::Ready(None), + } + } + } +}