diff --git a/Cargo.lock b/Cargo.lock index 4e8762e071..bda76e8ca3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7986,6 +7986,7 @@ dependencies = [ "assert_matches", "auto_impl", "dashmap", + "derive_more", "itertools 0.13.0", "metrics", "parking_lot 0.12.3", diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 6cf456665b..bca77d0c54 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -48,6 +48,7 @@ metrics.workspace = true # misc auto_impl.workspace = true +derive_more.workspace = true itertools.workspace = true pin-project.workspace = true parking_lot.workspace = true diff --git a/crates/storage/provider/src/traits/chain.rs b/crates/storage/provider/src/traits/chain.rs index 98957fcfdc..878e67a9f2 100644 --- a/crates/storage/provider/src/traits/chain.rs +++ b/crates/storage/provider/src/traits/chain.rs @@ -2,7 +2,8 @@ use crate::{BlockReceipts, Chain}; use auto_impl::auto_impl; -use reth_primitives::SealedBlockWithSenders; +use derive_more::{Deref, DerefMut}; +use reth_primitives::{SealedBlockWithSenders, SealedHeader}; use std::{ pin::Pin, sync::Arc, @@ -34,7 +35,7 @@ pub trait CanonStateSubscriptions: Send + Sync { } } -/// A Stream of [CanonStateNotification]. +/// A Stream of [`CanonStateNotification`]. #[derive(Debug)] #[pin_project::pin_project] pub struct CanonStateNotificationStream { @@ -139,3 +140,44 @@ impl CanonStateNotification { receipts } } + +/// Wrapper around a broadcast receiver that receives fork choice notifications. +#[derive(Debug, Deref, DerefMut)] +pub struct ForkChoiceNotifications(broadcast::Receiver); + +/// A trait that allows to register to fork choice related events +/// and get notified when a new fork choice is available. +pub trait ForkChoiceSubscriptions: Send + Sync { + /// Get notified when a new head of the chain is selected. + fn subscribe_to_fork_choice(&self) -> ForkChoiceNotifications; + + /// Convenience method to get a stream of the new head of the chain. + fn fork_choice_stream(&self) -> ForkChoiceStream { + ForkChoiceStream { st: BroadcastStream::new(self.subscribe_to_fork_choice().0) } + } +} + +/// A stream of the fork choices in the form of [`SealedHeader`]. +#[derive(Debug)] +#[pin_project::pin_project] +pub struct ForkChoiceStream { + #[pin] + st: BroadcastStream, +} + +impl Stream for ForkChoiceStream { + type Item = SealedHeader; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + return match ready!(self.as_mut().project().st.poll_next(cx)) { + Some(Ok(notification)) => Poll::Ready(Some(notification)), + Some(Err(err)) => { + debug!(%err, "finalized header notification stream lagging behind"); + continue + } + None => Poll::Ready(None), + }; + } + } +} diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 16071edfff..466a9e2908 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -21,7 +21,8 @@ pub use state::StateWriter; mod chain; pub use chain::{ CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream, - CanonStateNotifications, CanonStateSubscriptions, + CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream, + ForkChoiceSubscriptions, }; mod spec;