Files
reth/crates/optimism/rpc/src/sequencer.rs
2025-05-01 06:40:14 +00:00

284 lines
9.4 KiB
Rust

//! Helpers for optimism specific RPC implementations.
use crate::SequencerClientError;
use alloy_json_rpc::{RpcRecv, RpcSend};
use alloy_primitives::{hex, B256};
use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, RpcClient as Client};
use alloy_rpc_types_eth::erc4337::TransactionConditional;
use alloy_transport_http::Http;
use reth_optimism_txpool::supervisor::metrics::SequencerMetrics;
use std::{str::FromStr, sync::Arc, time::Instant};
use thiserror::Error;
use tracing::warn;
/// Sequencer client error
#[derive(Error, Debug)]
pub enum Error {
/// Invalid scheme
#[error("Invalid scheme of sequencer url: {0}")]
InvalidScheme(String),
/// Invalid header or value provided.
#[error("Invalid header: {0}")]
InvalidHeader(String),
/// Invalid url
#[error("Invalid sequencer url: {0}")]
InvalidUrl(String),
/// Establishing a connection to the sequencer endpoint resulted in an error.
#[error("Failed to connect to sequencer: {0}")]
TransportError(
#[from]
#[source]
alloy_transport::TransportError,
),
/// Reqwest failed to init client
#[error("Failed to init reqwest client for sequencer: {0}")]
ReqwestError(
#[from]
#[source]
reqwest::Error,
),
}
/// A client to interact with a Sequencer
#[derive(Debug, Clone)]
pub struct SequencerClient {
inner: Arc<SequencerClientInner>,
}
impl SequencerClientInner {
/// Creates a new instance with the given endpoint and client.
pub(crate) fn new(sequencer_endpoint: String, client: Client) -> Self {
let metrics = SequencerMetrics::default();
Self { sequencer_endpoint, client, metrics }
}
}
impl SequencerClient {
/// Creates a new [`SequencerClient`] for the given URL.
///
/// If the URL is a websocket endpoint we connect a websocket instance.
pub async fn new(sequencer_endpoint: impl Into<String>) -> Result<Self, Error> {
Self::new_with_headers(sequencer_endpoint, Default::default()).await
}
/// Creates a new `SequencerClient` for the given URL with the given headers
///
/// This expects headers in the form: `header=value`
pub async fn new_with_headers(
sequencer_endpoint: impl Into<String>,
headers: Vec<String>,
) -> Result<Self, Error> {
let sequencer_endpoint = sequencer_endpoint.into();
let endpoint = BuiltInConnectionString::from_str(&sequencer_endpoint)?;
if let BuiltInConnectionString::Http(url) = endpoint {
let mut builder = reqwest::Client::builder()
// we force use tls to prevent native issues
.use_rustls_tls();
if !headers.is_empty() {
let mut header_map = reqwest::header::HeaderMap::new();
for header in headers {
if let Some((key, value)) = header.split_once('=') {
header_map.insert(
key.trim()
.parse::<reqwest::header::HeaderName>()
.map_err(|err| Error::InvalidHeader(err.to_string()))?,
value
.trim()
.parse::<reqwest::header::HeaderValue>()
.map_err(|err| Error::InvalidHeader(err.to_string()))?,
);
}
}
builder = builder.default_headers(header_map);
}
let client = builder.build()?;
Self::with_http_client(url, client)
} else {
let client = ClientBuilder::default().connect_with(endpoint).await?;
let inner = SequencerClientInner::new(sequencer_endpoint, client);
Ok(Self { inner: Arc::new(inner) })
}
}
/// Creates a new [`SequencerClient`] with http transport with the given http client.
pub fn with_http_client(
sequencer_endpoint: impl Into<String>,
client: reqwest::Client,
) -> Result<Self, Error> {
let sequencer_endpoint: String = sequencer_endpoint.into();
let url = sequencer_endpoint
.parse()
.map_err(|_| Error::InvalidUrl(sequencer_endpoint.clone()))?;
let http_client = Http::with_client(client, url);
let is_local = http_client.guess_local();
let client = ClientBuilder::default().transport(http_client, is_local);
let inner = SequencerClientInner::new(sequencer_endpoint, client);
Ok(Self { inner: Arc::new(inner) })
}
/// Returns the network of the client
pub fn endpoint(&self) -> &str {
&self.inner.sequencer_endpoint
}
/// Returns the client
pub fn client(&self) -> &Client {
&self.inner.client
}
/// Returns a reference to the [`SequencerMetrics`] for tracking client metrics.
fn metrics(&self) -> &SequencerMetrics {
&self.inner.metrics
}
/// Sends a [`alloy_rpc_client::RpcCall`] request to the sequencer endpoint.
pub async fn request<Params: RpcSend, Resp: RpcRecv>(
&self,
method: &str,
params: Params,
) -> Result<Resp, SequencerClientError> {
let resp =
self.client().request::<Params, Resp>(method.to_string(), params).await.inspect_err(
|err| {
warn!(
target: "rpc::sequencer",
%err,
"HTTP request to sequencer failed",
);
},
)?;
Ok(resp)
}
/// Forwards a transaction to the sequencer endpoint.
pub async fn forward_raw_transaction(&self, tx: &[u8]) -> Result<B256, SequencerClientError> {
let start = Instant::now();
let rlp_hex = hex::encode_prefixed(tx);
let tx_hash =
self.request("eth_sendRawTransaction", (rlp_hex,)).await.inspect_err(|err| {
warn!(
target: "rpc::eth",
%err,
"Failed to forward transaction to sequencer",
);
})?;
self.metrics().record_forward_latency(start.elapsed());
Ok(tx_hash)
}
/// Forwards a transaction conditional to the sequencer endpoint.
pub async fn forward_raw_transaction_conditional(
&self,
tx: &[u8],
condition: TransactionConditional,
) -> Result<B256, SequencerClientError> {
let start = Instant::now();
let rlp_hex = hex::encode_prefixed(tx);
let tx_hash = self
.request("eth_sendRawTransactionConditional", (rlp_hex, condition))
.await
.inspect_err(|err| {
warn!(
target: "rpc::eth",
%err,
"Failed to forward transaction conditional for sequencer",
);
})?;
self.metrics().record_forward_latency(start.elapsed());
Ok(tx_hash)
}
}
#[derive(Debug)]
struct SequencerClientInner {
/// The endpoint of the sequencer
sequencer_endpoint: String,
/// The client
client: Client,
// Metrics for tracking sequencer forwarding
metrics: SequencerMetrics,
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::U64;
#[tokio::test]
async fn test_http_body_str() {
let client = SequencerClient::new("http://localhost:8545").await.unwrap();
let request = client
.client()
.make_request("eth_getBlockByNumber", (U64::from(10),))
.serialize()
.unwrap()
.take_request();
let body = request.get();
assert_eq!(
body,
r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
);
let condition = TransactionConditional::default();
let request = client
.client()
.make_request(
"eth_sendRawTransactionConditional",
(format!("0x{}", hex::encode("abcd")), condition),
)
.serialize()
.unwrap()
.take_request();
let body = request.get();
assert_eq!(
body,
r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
);
}
#[tokio::test]
#[ignore = "Start if WS is reachable at ws://localhost:8546"]
async fn test_ws_body_str() {
let client = SequencerClient::new("ws://localhost:8546").await.unwrap();
let request = client
.client()
.make_request("eth_getBlockByNumber", (U64::from(10),))
.serialize()
.unwrap()
.take_request();
let body = request.get();
assert_eq!(
body,
r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
);
let condition = TransactionConditional::default();
let request = client
.client()
.make_request(
"eth_sendRawTransactionConditional",
(format!("0x{}", hex::encode("abcd")), condition),
)
.serialize()
.unwrap()
.take_request();
let body = request.get();
assert_eq!(
body,
r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
);
}
}