Add generic BlockStateNotificationStream type for pending, safe, finalized watchers (#10249)

This commit is contained in:
Jennifer
2024-08-10 15:02:49 +01:00
committed by GitHub
parent faa38cec4d
commit cdf5d8e9ee
2 changed files with 43 additions and 5 deletions

View File

@@ -16,9 +16,9 @@ pub use chain_info::ChainInfoTracker;
mod notifications;
pub use notifications::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream,
ForkChoiceSubscriptions,
BlockStateNotificationStream, CanonStateNotification, CanonStateNotificationSender,
CanonStateNotificationStream, CanonStateNotifications, CanonStateSubscriptions,
ForkChoiceNotifications, ForkChoiceStream, ForkChoiceSubscriptions,
};
mod memory_overlay;

View File

@@ -9,8 +9,11 @@ use std::{
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::broadcast;
use tokio_stream::{wrappers::BroadcastStream, Stream};
use tokio::sync::{broadcast, watch};
use tokio_stream::{
wrappers::{BroadcastStream, WatchStream},
Stream,
};
use tracing::debug;
/// Type alias for a receiver that receives [`CanonStateNotification`]
@@ -175,3 +178,38 @@ impl Stream for ForkChoiceStream {
}
}
}
/// A stream for block state watch channels (pending, safe or finalized watchers)
#[derive(Debug)]
#[pin_project::pin_project]
pub struct BlockStateNotificationStream<T> {
#[pin]
st: WatchStream<Option<T>>,
}
impl<T: Clone + Sync + Send + 'static> BlockStateNotificationStream<T> {
/// Creates a new `BlockStateNotificationStream`
pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
Self { st: WatchStream::new(rx) }
}
}
impl<T: Clone + Sync + Send + 'static> Stream for BlockStateNotificationStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(notification) => {
if notification.is_some() {
return Poll::Ready(notification);
} else {
// skip None values
continue;
}
}
None => Poll::Ready(None),
}
}
}
}