refactor(optimism): Extract responsibility to connect to a flashblock websocket stream (#18158)

This commit is contained in:
Roman Hodulák
2025-09-01 11:22:04 +02:00
committed by GitHub
parent 61b8015c84
commit e9a57a72c8
4 changed files with 84 additions and 27 deletions

View File

@@ -1,4 +1,4 @@
use crate::{ExecutionPayloadBaseV1, FlashBlockService, FlashBlockWsStream};
use crate::{ExecutionPayloadBaseV1, FlashBlockService, WsFlashBlockStream};
use futures_util::StreamExt;
use reth_chain_state::CanonStateSubscriptions;
use reth_evm::ConfigureEvm;
@@ -39,7 +39,7 @@ where
> + Unpin
+ 'static,
{
let stream = FlashBlockWsStream::new(ws_url);
let stream = WsFlashBlockStream::new(ws_url);
let mut service = FlashBlockService::new(stream, evm_config, provider);
let (tx, rx) = watch::channel(None);

View File

@@ -5,7 +5,7 @@ pub use payload::{
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, Metadata,
};
pub use service::FlashBlockService;
pub use ws::FlashBlockWsStream;
pub use ws::{WsConnect, WsFlashBlockStream};
mod app;
mod payload;

View File

@@ -1,4 +1,4 @@
pub use stream::FlashBlockWsStream;
pub use stream::{WsConnect, WsFlashBlockStream};
mod decoding;
mod stream;

View File

@@ -10,7 +10,7 @@ use std::{
use tokio::net::TcpStream;
use tokio_tungstenite::{
connect_async,
tungstenite::{handshake::client::Response, Error, Message},
tungstenite::{Error, Message},
MaybeTlsStream, WebSocketStream,
};
use url::Url;
@@ -21,26 +21,45 @@ use url::Url;
///
/// If the connection fails, the error is returned and connection retried. The number of retries is
/// unbounded.
pub struct FlashBlockWsStream {
pub struct WsFlashBlockStream<Stream, Connector> {
ws_url: Url,
state: State,
connect: ConnectFuture,
stream: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
connector: Connector,
connect: ConnectFuture<Stream>,
stream: Option<Stream>,
}
impl FlashBlockWsStream {
impl WsFlashBlockStream<WssStream, WsConnector> {
/// Creates a new websocket stream over `ws_url`.
pub fn new(ws_url: Url) -> Self {
Self {
ws_url,
state: State::default(),
connect: Box::pin(async move { Err(Error::ConnectionClosed) }),
connector: WsConnector,
connect: Box::pin(async move { Err(Error::ConnectionClosed)? }),
stream: None,
}
}
}
impl Stream for FlashBlockWsStream {
impl<S, C> WsFlashBlockStream<S, C> {
/// Creates a new websocket stream over `ws_url`.
pub fn with_connector(ws_url: Url, connector: C) -> Self {
Self {
ws_url,
state: State::default(),
connector,
connect: Box::pin(async move { Err(Error::ConnectionClosed)? }),
stream: None,
}
}
}
impl<S, C> Stream for WsFlashBlockStream<S, C>
where
S: Stream<Item = Result<Message, Error>> + Unpin,
C: WsConnect<Stream = S> + Clone + Send + Sync + 'static + Unpin,
{
type Item = eyre::Result<FlashBlock>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -50,11 +69,11 @@ impl Stream for FlashBlockWsStream {
if self.state == State::Connect {
match ready!(self.connect.poll_unpin(cx)) {
Ok((stream, _)) => self.stream(stream),
Ok(stream) => self.stream(stream),
Err(err) => {
self.state = State::Initial;
return Poll::Ready(Some(Err(err.into())))
return Poll::Ready(Some(Err(err)));
}
}
}
@@ -73,28 +92,32 @@ impl Stream for FlashBlockWsStream {
}
}
impl FlashBlockWsStream {
impl<S, C> WsFlashBlockStream<S, C>
where
C: WsConnect<Stream = S> + Clone + Send + Sync + 'static,
{
fn connect(&mut self) {
let ws_url = self.ws_url.clone();
let connector = self.connector.clone();
Pin::new(&mut self.connect)
.set(Box::pin(async move { connect_async(ws_url.as_str()).await }));
Pin::new(&mut self.connect).set(Box::pin(async move { connector.connect(ws_url).await }));
self.state = State::Connect;
}
fn stream(&mut self, stream: WebSocketStream<MaybeTlsStream<TcpStream>>) {
self.stream.replace(stream.split().1);
fn stream(&mut self, stream: S) {
self.stream.replace(stream);
self.state = State::Stream;
}
}
impl Debug for FlashBlockWsStream {
impl<S: Debug, C: Debug> Debug for WsFlashBlockStream<S, C> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlashBlockStream")
.field("ws_url", &self.ws_url)
.field("state", &self.state)
.field("connector", &self.connector)
.field("connect", &"Pin<Box<dyn Future<..>>>")
.field("stream", &self.stream)
.finish()
@@ -109,11 +132,45 @@ enum State {
Stream,
}
type ConnectFuture = Pin<
Box<
dyn Future<Output = Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), Error>>
+ Send
+ Sync
+ 'static,
>,
>;
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
type WssStream = SplitStream<WsStream>;
type ConnectFuture<Stream> =
Pin<Box<dyn Future<Output = eyre::Result<Stream>> + Send + Sync + 'static>>;
/// The `WsConnect` trait allows for connecting to a websocket.
///
/// Implementors of the `WsConnect` trait are called 'connectors'.
///
/// Connectors are defined by one method, [`connect()`]. A call to [`connect()`] attempts to
/// establish a secure websocket connection and return an asynchronous stream of [`Message`]s
/// wrapped in a [`Result`].
///
/// [`connect()`]: Self::connect
pub trait WsConnect {
/// An associated `Stream` of [`Message`]s wrapped in a [`Result`] that this connection returns.
type Stream;
/// Asynchronously connects to a websocket hosted on `ws_url`.
///
/// See the [`WsConnect`] documentation for details.
fn connect(
&self,
ws_url: Url,
) -> impl Future<Output = eyre::Result<Self::Stream>> + Send + Sync;
}
/// Establishes a secure websocket subscription.
///
/// See the [`WsConnect`] documentation for details.
#[derive(Debug, Clone)]
pub struct WsConnector;
impl WsConnect for WsConnector {
type Stream = WssStream;
async fn connect(&self, ws_url: Url) -> eyre::Result<WssStream> {
let (stream, _response) = connect_async(ws_url.as_str()).await?;
Ok(stream.split().1)
}
}