mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
devnet4
...
feat/opera
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8a9f0d8d09 | ||
|
|
6d1b20dbcd |
@@ -64,6 +64,10 @@ pub struct DefaultRpcServerArgs {
|
||||
ipcdisable: bool,
|
||||
ipcpath: String,
|
||||
ipc_socket_permissions: Option<String>,
|
||||
operator: bool,
|
||||
operator_addr: IpAddr,
|
||||
operator_port: u16,
|
||||
operator_api: Option<RpcModuleSelection>,
|
||||
auth_addr: IpAddr,
|
||||
auth_port: u16,
|
||||
auth_jwtsecret: Option<PathBuf>,
|
||||
@@ -189,6 +193,30 @@ impl DefaultRpcServerArgs {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default operator enabled state
|
||||
pub const fn with_operator(mut self, v: bool) -> Self {
|
||||
self.operator = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default operator address
|
||||
pub const fn with_operator_addr(mut self, v: IpAddr) -> Self {
|
||||
self.operator_addr = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default operator port
|
||||
pub const fn with_operator_port(mut self, v: u16) -> Self {
|
||||
self.operator_port = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default operator API modules
|
||||
pub fn with_operator_api(mut self, v: Option<RpcModuleSelection>) -> Self {
|
||||
self.operator_api = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default auth server address
|
||||
pub const fn with_auth_addr(mut self, v: IpAddr) -> Self {
|
||||
self.auth_addr = v;
|
||||
@@ -375,6 +403,10 @@ impl Default for DefaultRpcServerArgs {
|
||||
ipcdisable: false,
|
||||
ipcpath: constants::DEFAULT_IPC_ENDPOINT.to_string(),
|
||||
ipc_socket_permissions: None,
|
||||
operator: false,
|
||||
operator_addr: Ipv4Addr::LOCALHOST.into(),
|
||||
operator_port: constants::DEFAULT_OPERATOR_RPC_PORT,
|
||||
operator_api: None,
|
||||
auth_addr: Ipv4Addr::LOCALHOST.into(),
|
||||
auth_port: constants::DEFAULT_AUTH_PORT,
|
||||
auth_jwtsecret: None,
|
||||
@@ -470,6 +502,26 @@ pub struct RpcServerArgs {
|
||||
#[arg(long = "ipc.permissions", default_value = Resettable::from(DefaultRpcServerArgs::get_global().ipc_socket_permissions.as_ref().map(|v| v.to_string().into())))]
|
||||
pub ipc_socket_permissions: Option<String>,
|
||||
|
||||
/// Enable the Operator-RPC server.
|
||||
///
|
||||
/// This starts an additional HTTP-RPC server that can serve a different set of API modules
|
||||
/// on a separate port. Useful for exposing operator-specific APIs without affecting the
|
||||
/// public-facing RPC server.
|
||||
#[arg(long, default_value_t = DefaultRpcServerArgs::get_global().operator)]
|
||||
pub operator: bool,
|
||||
|
||||
/// Operator server address to listen on
|
||||
#[arg(long = "operator.addr", default_value_t = DefaultRpcServerArgs::get_global().operator_addr)]
|
||||
pub operator_addr: IpAddr,
|
||||
|
||||
/// Operator server port to listen on
|
||||
#[arg(long = "operator.port", default_value_t = DefaultRpcServerArgs::get_global().operator_port)]
|
||||
pub operator_port: u16,
|
||||
|
||||
/// Rpc Modules to be configured for the operator server
|
||||
#[arg(long = "operator.api", value_parser = RpcModuleSelectionValueParser::default(), default_value = Resettable::from(DefaultRpcServerArgs::get_global().operator_api.as_ref().map(|v| v.to_string().into())))]
|
||||
pub operator_api: Option<RpcModuleSelection>,
|
||||
|
||||
/// Auth server address to listen on
|
||||
#[arg(long = "authrpc.addr", default_value_t = DefaultRpcServerArgs::get_global().auth_addr)]
|
||||
pub auth_addr: IpAddr,
|
||||
@@ -503,8 +555,8 @@ pub struct RpcServerArgs {
|
||||
#[arg(long = "disable-auth-server", alias = "disable-engine-api", default_value_t = DefaultRpcServerArgs::get_global().disable_auth_server)]
|
||||
pub disable_auth_server: bool,
|
||||
|
||||
/// Hex encoded JWT secret to authenticate the regular RPC server(s), see `--http.api` and
|
||||
/// `--ws.api`.
|
||||
/// Hex encoded JWT secret to authenticate the regular RPC server(s), see `--http.api`,
|
||||
/// `--ws.api`, and `--operator.api`.
|
||||
///
|
||||
/// This is __not__ used for the authenticated engine-API RPC server, see
|
||||
/// `--authrpc.jwtsecret`.
|
||||
@@ -682,6 +734,18 @@ impl RpcServerArgs {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables the Operator-RPC server.
|
||||
pub const fn with_operator(mut self) -> Self {
|
||||
self.operator = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Configures modules for the Operator-RPC server.
|
||||
pub fn with_operator_api(mut self, operator_api: RpcModuleSelection) -> Self {
|
||||
self.operator_api = Some(operator_api);
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables the Auth IPC
|
||||
pub const fn with_auth_ipc(mut self) -> Self {
|
||||
self.auth_ipc = true;
|
||||
@@ -699,6 +763,7 @@ impl RpcServerArgs {
|
||||
/// * The `auth_port` is scaled by a factor of `instance * 100`
|
||||
/// * The `http_port` is scaled by a factor of `-instance`
|
||||
/// * The `ws_port` is scaled by a factor of `instance * 2`
|
||||
/// * The `operator_port` is scaled by a factor of `instance * 2 + 1`
|
||||
/// * The `ipcpath` is appended with the instance number: `/tmp/reth.ipc-<instance>`
|
||||
///
|
||||
/// # Panics
|
||||
@@ -718,6 +783,8 @@ impl RpcServerArgs {
|
||||
self.http_port -= instance - 1;
|
||||
// ws port is scaled by a factor of instance * 2
|
||||
self.ws_port += instance * 2 - 2;
|
||||
// operator port is scaled by a factor of instance * 2 + 1
|
||||
self.operator_port += instance * 2 - 1;
|
||||
// append instance file to ipc path
|
||||
self.ipcpath = format!("{}-{}", self.ipcpath, instance);
|
||||
}
|
||||
@@ -753,12 +820,20 @@ impl RpcServerArgs {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the operator port to zero, to allow the OS to assign a random unused port when the rpc
|
||||
/// server binds to a socket.
|
||||
pub const fn with_operator_unused_port(mut self) -> Self {
|
||||
self.operator_port = 0;
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure all ports to be set to a random unused port when bound, and set the IPC path to a
|
||||
/// random path.
|
||||
pub fn with_unused_ports(mut self) -> Self {
|
||||
self = self.with_http_unused_port();
|
||||
self = self.with_ws_unused_port();
|
||||
self = self.with_auth_unused_port();
|
||||
self = self.with_operator_unused_port();
|
||||
self = self.with_ipc_random_path();
|
||||
self
|
||||
}
|
||||
@@ -785,6 +860,9 @@ impl RpcServerArgs {
|
||||
if self.ws && self.ws_api.as_ref().is_some_and(|api| api.contains(&ns)) {
|
||||
return true;
|
||||
}
|
||||
if self.operator && self.operator_api.as_ref().is_some_and(|api| api.contains(&ns)) {
|
||||
return true;
|
||||
}
|
||||
// IPC exposes all modules when enabled
|
||||
!self.ipcdisable
|
||||
}
|
||||
@@ -813,6 +891,10 @@ impl Default for RpcServerArgs {
|
||||
ipcdisable,
|
||||
ipcpath,
|
||||
ipc_socket_permissions,
|
||||
operator,
|
||||
operator_addr,
|
||||
operator_port,
|
||||
operator_api,
|
||||
auth_addr,
|
||||
auth_port,
|
||||
auth_jwtsecret,
|
||||
@@ -857,6 +939,10 @@ impl Default for RpcServerArgs {
|
||||
ipcdisable,
|
||||
ipcpath,
|
||||
ipc_socket_permissions,
|
||||
operator,
|
||||
operator_addr,
|
||||
operator_port,
|
||||
operator_api,
|
||||
auth_addr,
|
||||
auth_port,
|
||||
auth_jwtsecret,
|
||||
@@ -1017,6 +1103,10 @@ mod tests {
|
||||
ipcdisable: false,
|
||||
ipcpath: "reth.ipc".to_string(),
|
||||
ipc_socket_permissions: Some("0o666".to_string()),
|
||||
operator: false,
|
||||
operator_addr: "127.0.0.1".parse().unwrap(),
|
||||
operator_port: constants::DEFAULT_OPERATOR_RPC_PORT,
|
||||
operator_api: None,
|
||||
auth_addr: "127.0.0.1".parse().unwrap(),
|
||||
auth_port: 8551,
|
||||
auth_jwtsecret: Some(std::path::PathBuf::from("/tmp/jwt.hex")),
|
||||
|
||||
@@ -163,6 +163,14 @@ impl RethRpcServerConfig for RpcServerArgs {
|
||||
config = config.with_ipc(RpcModuleSelection::default_ipc_modules());
|
||||
}
|
||||
|
||||
if self.operator {
|
||||
config = config.with_operator(
|
||||
self.operator_api
|
||||
.clone()
|
||||
.unwrap_or_else(|| RpcModuleSelection::standard_modules().into()),
|
||||
);
|
||||
}
|
||||
|
||||
config
|
||||
}
|
||||
|
||||
@@ -223,6 +231,20 @@ impl RethRpcServerConfig for RpcServerArgs {
|
||||
config.with_ipc(self.ipc_server_builder()).with_ipc_endpoint(self.ipcpath.clone());
|
||||
}
|
||||
|
||||
if self.operator {
|
||||
let socket_address = SocketAddr::new(self.operator_addr, self.operator_port);
|
||||
config = config
|
||||
.with_operator_address(socket_address)
|
||||
.with_operator(self.http_ws_server_builder());
|
||||
}
|
||||
|
||||
if self.operator_api.is_some() && !self.operator {
|
||||
warn!(
|
||||
target: "reth::cli",
|
||||
"The --operator.api flag is set but --operator is not enabled. Operator RPC API will not be exposed."
|
||||
);
|
||||
}
|
||||
|
||||
config
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,8 @@ pub enum ServerKind {
|
||||
WsHttp(SocketAddr),
|
||||
/// Auth.
|
||||
Auth(SocketAddr),
|
||||
/// Operator.
|
||||
Operator(SocketAddr),
|
||||
}
|
||||
|
||||
impl ServerKind {
|
||||
@@ -27,6 +29,7 @@ impl ServerKind {
|
||||
Self::WS(_) => "--ws.port",
|
||||
Self::WsHttp(_) => "--ws.port and --http.port",
|
||||
Self::Auth(_) => "--authrpc.port",
|
||||
Self::Operator(_) => "--operator.port",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -38,6 +41,7 @@ impl std::fmt::Display for ServerKind {
|
||||
Self::WS(addr) => write!(f, "{addr} (WS-RPC server)"),
|
||||
Self::WsHttp(addr) => write!(f, "{addr} (WS-HTTP-RPC server)"),
|
||||
Self::Auth(addr) => write!(f, "{addr} (AUTH server)"),
|
||||
Self::Operator(addr) => write!(f, "{addr} (Operator-RPC server)"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -141,6 +145,7 @@ mod tests {
|
||||
ServerKind::WS(addr),
|
||||
ServerKind::WsHttp(addr),
|
||||
ServerKind::Auth(addr),
|
||||
ServerKind::Operator(addr),
|
||||
];
|
||||
|
||||
for kind in &kinds {
|
||||
|
||||
@@ -397,7 +397,8 @@ where
|
||||
let mut modules = TransportRpcModules::default();
|
||||
|
||||
if !module_config.is_empty() {
|
||||
let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
|
||||
let TransportRpcModuleConfig { http, ws, ipc, operator, config } =
|
||||
module_config.clone();
|
||||
|
||||
let mut registry = self.into_registry(config.unwrap_or_default(), eth, engine_events);
|
||||
|
||||
@@ -405,6 +406,7 @@ where
|
||||
modules.http = registry.maybe_module(http.as_ref());
|
||||
modules.ws = registry.maybe_module(ws.as_ref());
|
||||
modules.ipc = registry.maybe_module(ipc.as_ref());
|
||||
modules.operator = registry.maybe_module(operator.as_ref());
|
||||
}
|
||||
|
||||
modules
|
||||
@@ -924,11 +926,13 @@ where
|
||||
let http = self.maybe_module(config.http.as_ref());
|
||||
let ws = self.maybe_module(config.ws.as_ref());
|
||||
let ipc = self.maybe_module(config.ipc.as_ref());
|
||||
let operator = self.maybe_module(config.operator.as_ref());
|
||||
|
||||
modules.config = config;
|
||||
modules.http = http;
|
||||
modules.ws = ws;
|
||||
modules.ipc = ipc;
|
||||
modules.operator = operator;
|
||||
modules
|
||||
}
|
||||
|
||||
@@ -1109,6 +1113,10 @@ pub struct RpcServerConfig<RpcMiddleware = Identity> {
|
||||
ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
|
||||
/// The Endpoint where to launch the ipc server
|
||||
ipc_endpoint: Option<String>,
|
||||
/// Configs for JSON-RPC Operator server.
|
||||
operator_server_config: Option<ServerConfigBuilder>,
|
||||
/// Address where to bind the operator server to
|
||||
operator_addr: Option<SocketAddr>,
|
||||
/// JWT secret for authentication
|
||||
jwt_secret: Option<JwtSecret>,
|
||||
/// Configurable RPC middleware
|
||||
@@ -1130,6 +1138,8 @@ impl Default for RpcServerConfig<Identity> {
|
||||
ws_addr: None,
|
||||
ipc_server_config: None,
|
||||
ipc_endpoint: None,
|
||||
operator_server_config: None,
|
||||
operator_addr: None,
|
||||
jwt_secret: None,
|
||||
rpc_middleware: Default::default(),
|
||||
}
|
||||
@@ -1179,6 +1189,16 @@ impl RpcServerConfig {
|
||||
self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
|
||||
self
|
||||
}
|
||||
|
||||
/// Configures the operator server
|
||||
///
|
||||
/// Note: this always configures an [`EthSubscriptionIdProvider`] [`IdProvider`] for
|
||||
/// convenience. To set a custom [`IdProvider`], please use [`Self::with_id_provider`].
|
||||
pub fn with_operator(mut self, config: ServerConfigBuilder) -> Self {
|
||||
self.operator_server_config =
|
||||
Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
@@ -1194,6 +1214,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
ws_addr: self.ws_addr,
|
||||
ipc_server_config: self.ipc_server_config,
|
||||
ipc_endpoint: self.ipc_endpoint,
|
||||
operator_server_config: self.operator_server_config,
|
||||
operator_addr: self.operator_addr,
|
||||
jwt_secret: self.jwt_secret,
|
||||
rpc_middleware,
|
||||
}
|
||||
@@ -1240,6 +1262,15 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Configures the [`SocketAddr`] of the operator server
|
||||
///
|
||||
/// Default is [`Ipv4Addr::LOCALHOST`] and
|
||||
/// [`reth_rpc_server_types::constants::DEFAULT_OPERATOR_RPC_PORT`]
|
||||
pub const fn with_operator_address(mut self, addr: SocketAddr) -> Self {
|
||||
self.operator_addr = Some(addr);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets a custom [`IdProvider`] for all configured transports.
|
||||
///
|
||||
/// By default all transports use [`EthSubscriptionIdProvider`]
|
||||
@@ -1254,7 +1285,10 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
|
||||
}
|
||||
if let Some(ipc) = self.ipc_server_config {
|
||||
self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
|
||||
self.ipc_server_config = Some(ipc.set_id_provider(id_provider.clone()));
|
||||
}
|
||||
if let Some(config) = self.operator_server_config {
|
||||
self.operator_server_config = Some(config.set_id_provider(id_provider));
|
||||
}
|
||||
|
||||
self
|
||||
@@ -1286,7 +1320,12 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
Some(ws_server_config.custom_tokio_runtime(tokio_runtime.clone()));
|
||||
}
|
||||
if let Some(ipc_server_config) = self.ipc_server_config {
|
||||
self.ipc_server_config = Some(ipc_server_config.custom_tokio_runtime(tokio_runtime));
|
||||
self.ipc_server_config =
|
||||
Some(ipc_server_config.custom_tokio_runtime(tokio_runtime.clone()));
|
||||
}
|
||||
if let Some(operator_server_config) = self.operator_server_config {
|
||||
self.operator_server_config =
|
||||
Some(operator_server_config.custom_tokio_runtime(tokio_runtime));
|
||||
}
|
||||
self
|
||||
}
|
||||
@@ -1297,7 +1336,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
pub const fn has_server(&self) -> bool {
|
||||
self.http_server_config.is_some() ||
|
||||
self.ws_server_config.is_some() ||
|
||||
self.ipc_server_config.is_some()
|
||||
self.ipc_server_config.is_some() ||
|
||||
self.operator_server_config.is_some()
|
||||
}
|
||||
|
||||
/// Returns the [`SocketAddr`] of the http server
|
||||
@@ -1310,6 +1350,11 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
self.ws_addr
|
||||
}
|
||||
|
||||
/// Returns the [`SocketAddr`] of the operator server
|
||||
pub const fn operator_address(&self) -> Option<SocketAddr> {
|
||||
self.operator_addr
|
||||
}
|
||||
|
||||
/// Returns the endpoint of the ipc server
|
||||
pub fn ipc_endpoint(&self) -> Option<String> {
|
||||
self.ipc_endpoint.clone()
|
||||
@@ -1335,7 +1380,7 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds and starts the configured server(s): http, ws, ipc.
|
||||
/// Builds and starts the configured server(s): http, ws, ipc, operator.
|
||||
///
|
||||
/// If both http and ws are on the same port, they are combined into one server.
|
||||
///
|
||||
@@ -1347,6 +1392,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
let mut http_handle = None;
|
||||
let mut ws_handle = None;
|
||||
let mut ipc_handle = None;
|
||||
let mut operator_handle = None;
|
||||
let mut operator_local_addr = None;
|
||||
|
||||
let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
|
||||
Ipv4Addr::LOCALHOST,
|
||||
@@ -1358,6 +1405,11 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
constants::DEFAULT_WS_RPC_PORT,
|
||||
)));
|
||||
|
||||
let operator_socket_addr = self.operator_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
|
||||
Ipv4Addr::LOCALHOST,
|
||||
constants::DEFAULT_OPERATOR_RPC_PORT,
|
||||
)));
|
||||
|
||||
let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
|
||||
let ipc_path =
|
||||
self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
|
||||
@@ -1369,6 +1421,33 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
|
||||
}
|
||||
|
||||
// Launch the operator server if configured
|
||||
if let Some(config) = self.operator_server_config &&
|
||||
let Some(module) = modules.operator.as_ref()
|
||||
{
|
||||
let server = ServerBuilder::new()
|
||||
.set_config(config.http_only().build())
|
||||
.set_http_middleware(
|
||||
tower::ServiceBuilder::new()
|
||||
.option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
|
||||
)
|
||||
.set_rpc_middleware(
|
||||
RpcServiceBuilder::default()
|
||||
.layer(RpcRequestMetrics::http(module))
|
||||
.layer(self.rpc_middleware.clone()),
|
||||
)
|
||||
.build(operator_socket_addr)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
RpcError::server_error(err, ServerKind::Operator(operator_socket_addr))
|
||||
})?;
|
||||
let addr = server.local_addr().map_err(|err| {
|
||||
RpcError::server_error(err, ServerKind::Operator(operator_socket_addr))
|
||||
})?;
|
||||
operator_local_addr = Some(addr);
|
||||
operator_handle = Some(server.start(module.clone()));
|
||||
}
|
||||
|
||||
// If both are configured on the same port, we combine them into one server.
|
||||
if self.http_addr == self.ws_addr &&
|
||||
self.http_server_config.is_some() &&
|
||||
@@ -1435,6 +1514,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
ws: ws_handle,
|
||||
ipc_endpoint: self.ipc_endpoint.clone(),
|
||||
ipc: ipc_handle,
|
||||
operator_local_addr,
|
||||
operator: operator_handle,
|
||||
jwt_secret: self.jwt_secret,
|
||||
});
|
||||
}
|
||||
@@ -1507,6 +1588,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
|
||||
ws: ws_handle,
|
||||
ipc_endpoint: self.ipc_endpoint.clone(),
|
||||
ipc: ipc_handle,
|
||||
operator_local_addr,
|
||||
operator: operator_handle,
|
||||
jwt_secret: self.jwt_secret,
|
||||
})
|
||||
}
|
||||
@@ -1531,6 +1614,8 @@ pub struct TransportRpcModuleConfig {
|
||||
ws: Option<RpcModuleSelection>,
|
||||
/// ipc module configuration
|
||||
ipc: Option<RpcModuleSelection>,
|
||||
/// operator module configuration
|
||||
operator: Option<RpcModuleSelection>,
|
||||
/// Config for the modules
|
||||
config: Option<RpcModuleConfig>,
|
||||
}
|
||||
@@ -1553,6 +1638,11 @@ impl TransportRpcModuleConfig {
|
||||
Self::default().with_ipc(ipc)
|
||||
}
|
||||
|
||||
/// Creates a new config with only operator set
|
||||
pub fn set_operator(operator: impl Into<RpcModuleSelection>) -> Self {
|
||||
Self::default().with_operator(operator)
|
||||
}
|
||||
|
||||
/// Sets the [`RpcModuleSelection`] for the http transport.
|
||||
pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
|
||||
self.http = Some(http.into());
|
||||
@@ -1571,6 +1661,12 @@ impl TransportRpcModuleConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the [`RpcModuleSelection`] for the operator transport.
|
||||
pub fn with_operator(mut self, operator: impl Into<RpcModuleSelection>) -> Self {
|
||||
self.operator = Some(operator.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets a custom [`RpcModuleConfig`] for the configured modules.
|
||||
pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
|
||||
self.config = Some(config);
|
||||
@@ -1592,6 +1688,11 @@ impl TransportRpcModuleConfig {
|
||||
&mut self.ipc
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the operator module configuration.
|
||||
pub const fn operator_mut(&mut self) -> &mut Option<RpcModuleSelection> {
|
||||
&mut self.operator
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the rpc module configuration.
|
||||
pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
|
||||
&mut self.config
|
||||
@@ -1599,7 +1700,7 @@ impl TransportRpcModuleConfig {
|
||||
|
||||
/// Returns true if no transports are configured
|
||||
pub const fn is_empty(&self) -> bool {
|
||||
self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
|
||||
self.http.is_none() && self.ws.is_none() && self.ipc.is_none() && self.operator.is_none()
|
||||
}
|
||||
|
||||
/// Returns the [`RpcModuleSelection`] for the http transport
|
||||
@@ -1617,6 +1718,11 @@ impl TransportRpcModuleConfig {
|
||||
self.ipc.as_ref()
|
||||
}
|
||||
|
||||
/// Returns the [`RpcModuleSelection`] for the operator transport
|
||||
pub const fn operator(&self) -> Option<&RpcModuleSelection> {
|
||||
self.operator.as_ref()
|
||||
}
|
||||
|
||||
/// Returns the [`RpcModuleConfig`] for the configured modules
|
||||
pub const fn config(&self) -> Option<&RpcModuleConfig> {
|
||||
self.config.as_ref()
|
||||
@@ -1624,7 +1730,10 @@ impl TransportRpcModuleConfig {
|
||||
|
||||
/// Returns true if the given module is configured for any transport.
|
||||
pub fn contains_any(&self, module: &RethRpcModule) -> bool {
|
||||
self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
|
||||
self.contains_http(module) ||
|
||||
self.contains_ws(module) ||
|
||||
self.contains_ipc(module) ||
|
||||
self.contains_operator(module)
|
||||
}
|
||||
|
||||
/// Returns true if the given module is configured for the http transport.
|
||||
@@ -1642,6 +1751,11 @@ impl TransportRpcModuleConfig {
|
||||
self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
|
||||
}
|
||||
|
||||
/// Returns true if the given module is configured for the operator transport.
|
||||
pub fn contains_operator(&self, module: &RethRpcModule) -> bool {
|
||||
self.operator.as_ref().is_some_and(|operator| operator.contains(module))
|
||||
}
|
||||
|
||||
/// Ensures that both http and ws are configured and that they are configured to use the same
|
||||
/// port.
|
||||
fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
|
||||
@@ -1677,6 +1791,8 @@ pub struct TransportRpcModules<Context = ()> {
|
||||
ws: Option<RpcModule<Context>>,
|
||||
/// rpcs module for ipc
|
||||
ipc: Option<RpcModule<Context>>,
|
||||
/// rpcs module for operator
|
||||
operator: Option<RpcModule<Context>>,
|
||||
}
|
||||
|
||||
// === impl TransportRpcModules ===
|
||||
@@ -1710,6 +1826,13 @@ impl TransportRpcModules {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the [`RpcModule`] for the operator transport.
|
||||
/// This will overwrite current module, if any.
|
||||
pub fn with_operator(mut self, operator: RpcModule<()>) -> Self {
|
||||
self.operator = Some(operator);
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the [`TransportRpcModuleConfig`] used to configure this instance.
|
||||
pub const fn module_config(&self) -> &TransportRpcModuleConfig {
|
||||
&self.config
|
||||
@@ -1732,7 +1855,10 @@ impl TransportRpcModules {
|
||||
self.merge_ws(other.clone())?;
|
||||
}
|
||||
if self.module_config().contains_ipc(&module) {
|
||||
self.merge_ipc(other)?;
|
||||
self.merge_ipc(other.clone())?;
|
||||
}
|
||||
if self.module_config().contains_operator(&module) {
|
||||
self.merge_operator(other)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1795,6 +1921,21 @@ impl TransportRpcModules {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// Merge the given [Methods] in the configured operator methods.
|
||||
///
|
||||
/// Fails if any of the methods in other is present already.
|
||||
///
|
||||
/// Returns [Ok(false)] if no operator transport is configured.
|
||||
pub fn merge_operator(
|
||||
&mut self,
|
||||
other: impl Into<Methods>,
|
||||
) -> Result<bool, RegisterMethodError> {
|
||||
if let Some(ref mut operator) = self.operator {
|
||||
return operator.merge(other.into()).map(|_| true)
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// Merge the given [`Methods`] in all configured methods.
|
||||
///
|
||||
/// Fails if any of the methods in other is present already.
|
||||
@@ -1805,7 +1946,8 @@ impl TransportRpcModules {
|
||||
let other = other.into();
|
||||
self.merge_http(other.clone())?;
|
||||
self.merge_ws(other.clone())?;
|
||||
self.merge_ipc(other)?;
|
||||
self.merge_ipc(other.clone())?;
|
||||
self.merge_operator(other)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1838,6 +1980,9 @@ impl TransportRpcModules {
|
||||
if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
|
||||
let _ = methods.merge(m);
|
||||
}
|
||||
if let Some(m) = self.operator_methods(|name| f(name, &methods)) {
|
||||
let _ = methods.merge(m);
|
||||
}
|
||||
methods
|
||||
}
|
||||
|
||||
@@ -1871,6 +2016,16 @@ impl TransportRpcModules {
|
||||
self.ipc.as_ref().map(|module| methods_by(module, filter))
|
||||
}
|
||||
|
||||
/// Returns all [`Methods`] installed for the operator server based in the given closure.
|
||||
///
|
||||
/// Returns `None` if no operator support is configured.
|
||||
pub fn operator_methods<F>(&self, filter: F) -> Option<Methods>
|
||||
where
|
||||
F: FnMut(&str) -> bool,
|
||||
{
|
||||
self.operator.as_ref().map(|module| methods_by(module, filter))
|
||||
}
|
||||
|
||||
/// Removes the method with the given name from the configured http methods.
|
||||
///
|
||||
/// Returns `true` if the method was found and removed, `false` otherwise.
|
||||
@@ -1937,6 +2092,28 @@ impl TransportRpcModules {
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes the method with the given name from the configured operator methods.
|
||||
///
|
||||
/// Returns `true` if the method was found and removed, `false` otherwise.
|
||||
///
|
||||
/// Be aware that a subscription consist of two methods, `subscribe` and `unsubscribe` and
|
||||
/// it's the caller responsibility to remove both `subscribe` and `unsubscribe` methods for
|
||||
/// subscriptions.
|
||||
pub fn remove_operator_method(&mut self, method_name: &'static str) -> bool {
|
||||
if let Some(operator_module) = &mut self.operator {
|
||||
operator_module.remove_method(method_name).is_some()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes the given methods from the configured operator methods.
|
||||
pub fn remove_operator_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
|
||||
for name in methods {
|
||||
self.remove_operator_method(name);
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes the method with the given name from all configured transports.
|
||||
///
|
||||
/// Returns `true` if the method was found and removed, `false` otherwise.
|
||||
@@ -1944,8 +2121,9 @@ impl TransportRpcModules {
|
||||
let http_removed = self.remove_http_method(method_name);
|
||||
let ws_removed = self.remove_ws_method(method_name);
|
||||
let ipc_removed = self.remove_ipc_method(method_name);
|
||||
let operator_removed = self.remove_operator_method(method_name);
|
||||
|
||||
http_removed || ws_removed || ipc_removed
|
||||
http_removed || ws_removed || ipc_removed || operator_removed
|
||||
}
|
||||
|
||||
/// Renames a method in all configured transports by:
|
||||
@@ -1999,6 +2177,21 @@ impl TransportRpcModules {
|
||||
self.merge_ws(other)
|
||||
}
|
||||
|
||||
/// Replace the given [Methods] in the configured operator methods.
|
||||
///
|
||||
/// Fails if any of the methods in other is present already or if the method being removed is
|
||||
/// not present
|
||||
///
|
||||
/// Returns [Ok(false)] if no operator transport is configured.
|
||||
pub fn replace_operator(
|
||||
&mut self,
|
||||
other: impl Into<Methods>,
|
||||
) -> Result<bool, RegisterMethodError> {
|
||||
let other = other.into();
|
||||
self.remove_operator_methods(other.method_names());
|
||||
self.merge_operator(other)
|
||||
}
|
||||
|
||||
/// Replaces the method with the given name from all configured transports.
|
||||
///
|
||||
/// Returns `true` if the method was found and replaced, `false` otherwise
|
||||
@@ -2009,7 +2202,8 @@ impl TransportRpcModules {
|
||||
let other = other.into();
|
||||
self.replace_http(other.clone())?;
|
||||
self.replace_ws(other.clone())?;
|
||||
self.replace_ipc(other)?;
|
||||
self.replace_ipc(other.clone())?;
|
||||
self.replace_operator(other)?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -2049,6 +2243,18 @@ impl TransportRpcModules {
|
||||
self.merge_ipc(other)
|
||||
}
|
||||
|
||||
/// Adds or replaces given [`Methods`] in operator module.
|
||||
///
|
||||
/// Returns `true` if the methods were replaced or added, `false` otherwise.
|
||||
pub fn add_or_replace_operator(
|
||||
&mut self,
|
||||
other: impl Into<Methods>,
|
||||
) -> Result<bool, RegisterMethodError> {
|
||||
let other = other.into();
|
||||
self.remove_operator_methods(other.method_names());
|
||||
self.merge_operator(other)
|
||||
}
|
||||
|
||||
/// Adds or replaces given [`Methods`] in all configured network modules.
|
||||
pub fn add_or_replace_configured(
|
||||
&mut self,
|
||||
@@ -2057,7 +2263,8 @@ impl TransportRpcModules {
|
||||
let other = other.into();
|
||||
self.add_or_replace_http(other.clone())?;
|
||||
self.add_or_replace_ws(other.clone())?;
|
||||
self.add_or_replace_ipc(other)?;
|
||||
self.add_or_replace_ipc(other.clone())?;
|
||||
self.add_or_replace_operator(other)?;
|
||||
Ok(())
|
||||
}
|
||||
/// Adds or replaces the given [`Methods`] in the transport modules where the specified
|
||||
@@ -2075,7 +2282,10 @@ impl TransportRpcModules {
|
||||
self.add_or_replace_ws(other.clone())?;
|
||||
}
|
||||
if self.module_config().contains_ipc(&module) {
|
||||
self.add_or_replace_ipc(other)?;
|
||||
self.add_or_replace_ipc(other.clone())?;
|
||||
}
|
||||
if self.module_config().contains_operator(&module) {
|
||||
self.add_or_replace_operator(other)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -2112,6 +2322,8 @@ pub struct RpcServerHandle {
|
||||
ws: Option<ServerHandle>,
|
||||
ipc_endpoint: Option<String>,
|
||||
ipc: Option<jsonrpsee::server::ServerHandle>,
|
||||
operator_local_addr: Option<SocketAddr>,
|
||||
operator: Option<ServerHandle>,
|
||||
jwt_secret: Option<JwtSecret>,
|
||||
}
|
||||
|
||||
@@ -2144,6 +2356,11 @@ impl RpcServerHandle {
|
||||
self.ws_local_addr
|
||||
}
|
||||
|
||||
/// Returns the [`SocketAddr`] of the operator server if started.
|
||||
pub const fn operator_local_addr(&self) -> Option<SocketAddr> {
|
||||
self.operator_local_addr
|
||||
}
|
||||
|
||||
/// Tell the server to stop without waiting for the server to stop.
|
||||
pub fn stop(self) -> Result<(), AlreadyStoppedError> {
|
||||
if let Some(handle) = self.http {
|
||||
@@ -2158,6 +2375,10 @@ impl RpcServerHandle {
|
||||
handle.stop()?
|
||||
}
|
||||
|
||||
if let Some(handle) = self.operator {
|
||||
handle.stop()?
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2176,6 +2397,11 @@ impl RpcServerHandle {
|
||||
self.ws_local_addr.map(|addr| format!("ws://{addr}"))
|
||||
}
|
||||
|
||||
/// Returns the url to the operator server
|
||||
pub fn operator_url(&self) -> Option<String> {
|
||||
self.operator_local_addr.map(|addr| format!("http://{addr}"))
|
||||
}
|
||||
|
||||
/// Returns a http client connected to the server.
|
||||
pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
|
||||
let url = self.http_url()?;
|
||||
@@ -2431,6 +2657,7 @@ mod tests {
|
||||
)),
|
||||
ws: None,
|
||||
ipc: None,
|
||||
operator: None,
|
||||
config: None,
|
||||
}
|
||||
)
|
||||
@@ -2445,6 +2672,7 @@ mod tests {
|
||||
http: Some(RpcModuleSelection::Selection(Default::default())),
|
||||
ws: None,
|
||||
ipc: None,
|
||||
operator: None,
|
||||
config: None,
|
||||
}
|
||||
)
|
||||
@@ -2660,6 +2888,7 @@ mod tests {
|
||||
http: Some(http_module),
|
||||
ws: Some(ws_module),
|
||||
ipc: Some(ipc_module),
|
||||
operator: None,
|
||||
};
|
||||
|
||||
// Create new methods: one to replace an existing method, one to add a new one
|
||||
@@ -2693,8 +2922,13 @@ mod tests {
|
||||
// Create a config that enables RethRpcModule::Eth for HTTP only
|
||||
let config = TransportRpcModuleConfig::default().with_http([RethRpcModule::Eth]);
|
||||
|
||||
let mut modules =
|
||||
TransportRpcModules { config, http: Some(RpcModule::new(())), ws: None, ipc: None };
|
||||
let mut modules = TransportRpcModules {
|
||||
config,
|
||||
http: Some(RpcModule::new(())),
|
||||
ws: None,
|
||||
ipc: None,
|
||||
operator: None,
|
||||
};
|
||||
|
||||
// Track whether closure was called
|
||||
let mut closure_called = false;
|
||||
|
||||
@@ -9,6 +9,9 @@ pub const DEFAULT_WS_RPC_PORT: u16 = 8546;
|
||||
/// The default port for the auth server.
|
||||
pub const DEFAULT_AUTH_PORT: u16 = 8551;
|
||||
|
||||
/// The default port for the operator RPC server.
|
||||
pub const DEFAULT_OPERATOR_RPC_PORT: u16 = 8547;
|
||||
|
||||
/// The default maximum block range allowed to filter
|
||||
pub const DEFAULT_MAX_BLOCKS_PER_FILTER: u64 = 100_000;
|
||||
|
||||
|
||||
@@ -357,6 +357,26 @@ RPC:
|
||||
|
||||
If not specified, the permissions will be set by the system's umask.
|
||||
|
||||
--operator
|
||||
Enable the Operator-RPC server.
|
||||
|
||||
This starts an additional HTTP-RPC server that can serve a different set of API modules on a separate port. Useful for exposing operator-specific APIs without affecting the public-facing RPC server.
|
||||
|
||||
--operator.addr <OPERATOR_ADDR>
|
||||
Operator server address to listen on
|
||||
|
||||
[default: 127.0.0.1]
|
||||
|
||||
--operator.port <OPERATOR_PORT>
|
||||
Operator server port to listen on
|
||||
|
||||
[default: 8547]
|
||||
|
||||
--operator.api <OPERATOR_API>
|
||||
Rpc Modules to be configured for the operator server
|
||||
|
||||
[possible values: admin, debug, eth, net, trace, txpool, web3, rpc, reth, ots, flashbots, miner, mev, testing]
|
||||
|
||||
--authrpc.addr <AUTH_ADDR>
|
||||
Auth server address to listen on
|
||||
|
||||
@@ -388,7 +408,7 @@ RPC:
|
||||
This will prevent the authenticated engine-API server from starting. Use this if you're running a node that doesn't need to serve engine API requests.
|
||||
|
||||
--rpc.jwtsecret <HEX>
|
||||
Hex encoded JWT secret to authenticate the regular RPC server(s), see `--http.api` and `--ws.api`.
|
||||
Hex encoded JWT secret to authenticate the regular RPC server(s), see `--http.api`, `--ws.api`, and `--operator.api`.
|
||||
|
||||
This is __not__ used for the authenticated engine-API RPC server, see `--authrpc.jwtsecret`.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user