mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-27 16:18:08 -05:00
feat: subscribe to finalized and safe headers (#9402)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7986,6 +7986,7 @@ dependencies = [
|
||||
"assert_matches",
|
||||
"auto_impl",
|
||||
"dashmap",
|
||||
"derive_more",
|
||||
"itertools 0.13.0",
|
||||
"metrics",
|
||||
"parking_lot 0.12.3",
|
||||
|
||||
@@ -48,6 +48,7 @@ metrics.workspace = true
|
||||
|
||||
# misc
|
||||
auto_impl.workspace = true
|
||||
derive_more.workspace = true
|
||||
itertools.workspace = true
|
||||
pin-project.workspace = true
|
||||
parking_lot.workspace = true
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
|
||||
use crate::{BlockReceipts, Chain};
|
||||
use auto_impl::auto_impl;
|
||||
use reth_primitives::SealedBlockWithSenders;
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use reth_primitives::{SealedBlockWithSenders, SealedHeader};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
@@ -34,7 +35,7 @@ pub trait CanonStateSubscriptions: Send + Sync {
|
||||
}
|
||||
}
|
||||
|
||||
/// A Stream of [CanonStateNotification].
|
||||
/// A Stream of [`CanonStateNotification`].
|
||||
#[derive(Debug)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct CanonStateNotificationStream {
|
||||
@@ -139,3 +140,44 @@ impl CanonStateNotification {
|
||||
receipts
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper around a broadcast receiver that receives fork choice notifications.
|
||||
#[derive(Debug, Deref, DerefMut)]
|
||||
pub struct ForkChoiceNotifications(broadcast::Receiver<SealedHeader>);
|
||||
|
||||
/// A trait that allows to register to fork choice related events
|
||||
/// and get notified when a new fork choice is available.
|
||||
pub trait ForkChoiceSubscriptions: Send + Sync {
|
||||
/// Get notified when a new head of the chain is selected.
|
||||
fn subscribe_to_fork_choice(&self) -> ForkChoiceNotifications;
|
||||
|
||||
/// Convenience method to get a stream of the new head of the chain.
|
||||
fn fork_choice_stream(&self) -> ForkChoiceStream {
|
||||
ForkChoiceStream { st: BroadcastStream::new(self.subscribe_to_fork_choice().0) }
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream of the fork choices in the form of [`SealedHeader`].
|
||||
#[derive(Debug)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct ForkChoiceStream {
|
||||
#[pin]
|
||||
st: BroadcastStream<SealedHeader>,
|
||||
}
|
||||
|
||||
impl Stream for ForkChoiceStream {
|
||||
type Item = SealedHeader;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
return match ready!(self.as_mut().project().st.poll_next(cx)) {
|
||||
Some(Ok(notification)) => Poll::Ready(Some(notification)),
|
||||
Some(Err(err)) => {
|
||||
debug!(%err, "finalized header notification stream lagging behind");
|
||||
continue
|
||||
}
|
||||
None => Poll::Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,8 @@ pub use state::StateWriter;
|
||||
mod chain;
|
||||
pub use chain::{
|
||||
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
|
||||
CanonStateNotifications, CanonStateSubscriptions,
|
||||
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream,
|
||||
ForkChoiceSubscriptions,
|
||||
};
|
||||
|
||||
mod spec;
|
||||
|
||||
Reference in New Issue
Block a user