Compare commits

...

2 Commits

Author SHA1 Message Date
Derek Cofausper
8a9f0d8d09 docs(cli): regenerate CLI reference for --operator flags
Co-Authored-By: zhygis <5236121+Zygimantass@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019cd270-e524-7019-a339-aa431e8bfc78
2026-03-09 13:20:43 +00:00
Derek Cofausper
6d1b20dbcd feat(rpc): add operator RPC server (--operator)
Adds a new HTTP JSON-RPC server that can be enabled via --operator and
configured independently from the public-facing HTTP/WS servers. It
listens on a separate port (default 8547) and serves a configurable set
of API modules via --operator.api.

Useful for exposing operator-specific APIs (e.g. admin, debug) without
affecting the public-facing RPC server configuration.

New CLI flags:
  --operator              Enable the operator server
  --operator.addr         Bind address (default 127.0.0.1)
  --operator.port         Port (default 8547)
  --operator.api          API modules to expose

Co-authored-by: zhygis <5236121+Zygimantass@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019cd270-e524-7019-a339-aa431e8bfc78
2026-03-09 12:31:02 +00:00
6 changed files with 392 additions and 18 deletions

View File

@@ -64,6 +64,10 @@ pub struct DefaultRpcServerArgs {
ipcdisable: bool,
ipcpath: String,
ipc_socket_permissions: Option<String>,
operator: bool,
operator_addr: IpAddr,
operator_port: u16,
operator_api: Option<RpcModuleSelection>,
auth_addr: IpAddr,
auth_port: u16,
auth_jwtsecret: Option<PathBuf>,
@@ -189,6 +193,30 @@ impl DefaultRpcServerArgs {
self
}
/// Set the default operator enabled state
pub const fn with_operator(mut self, v: bool) -> Self {
self.operator = v;
self
}
/// Set the default operator address
pub const fn with_operator_addr(mut self, v: IpAddr) -> Self {
self.operator_addr = v;
self
}
/// Set the default operator port
pub const fn with_operator_port(mut self, v: u16) -> Self {
self.operator_port = v;
self
}
/// Set the default operator API modules
pub fn with_operator_api(mut self, v: Option<RpcModuleSelection>) -> Self {
self.operator_api = v;
self
}
/// Set the default auth server address
pub const fn with_auth_addr(mut self, v: IpAddr) -> Self {
self.auth_addr = v;
@@ -375,6 +403,10 @@ impl Default for DefaultRpcServerArgs {
ipcdisable: false,
ipcpath: constants::DEFAULT_IPC_ENDPOINT.to_string(),
ipc_socket_permissions: None,
operator: false,
operator_addr: Ipv4Addr::LOCALHOST.into(),
operator_port: constants::DEFAULT_OPERATOR_RPC_PORT,
operator_api: None,
auth_addr: Ipv4Addr::LOCALHOST.into(),
auth_port: constants::DEFAULT_AUTH_PORT,
auth_jwtsecret: None,
@@ -470,6 +502,26 @@ pub struct RpcServerArgs {
#[arg(long = "ipc.permissions", default_value = Resettable::from(DefaultRpcServerArgs::get_global().ipc_socket_permissions.as_ref().map(|v| v.to_string().into())))]
pub ipc_socket_permissions: Option<String>,
/// Enable the Operator-RPC server.
///
/// This starts an additional HTTP-RPC server that can serve a different set of API modules
/// on a separate port. Useful for exposing operator-specific APIs without affecting the
/// public-facing RPC server.
#[arg(long, default_value_t = DefaultRpcServerArgs::get_global().operator)]
pub operator: bool,
/// Operator server address to listen on
#[arg(long = "operator.addr", default_value_t = DefaultRpcServerArgs::get_global().operator_addr)]
pub operator_addr: IpAddr,
/// Operator server port to listen on
#[arg(long = "operator.port", default_value_t = DefaultRpcServerArgs::get_global().operator_port)]
pub operator_port: u16,
/// Rpc Modules to be configured for the operator server
#[arg(long = "operator.api", value_parser = RpcModuleSelectionValueParser::default(), default_value = Resettable::from(DefaultRpcServerArgs::get_global().operator_api.as_ref().map(|v| v.to_string().into())))]
pub operator_api: Option<RpcModuleSelection>,
/// Auth server address to listen on
#[arg(long = "authrpc.addr", default_value_t = DefaultRpcServerArgs::get_global().auth_addr)]
pub auth_addr: IpAddr,
@@ -503,8 +555,8 @@ pub struct RpcServerArgs {
#[arg(long = "disable-auth-server", alias = "disable-engine-api", default_value_t = DefaultRpcServerArgs::get_global().disable_auth_server)]
pub disable_auth_server: bool,
/// Hex encoded JWT secret to authenticate the regular RPC server(s), see `--http.api` and
/// `--ws.api`.
/// Hex encoded JWT secret to authenticate the regular RPC server(s), see `--http.api`,
/// `--ws.api`, and `--operator.api`.
///
/// This is __not__ used for the authenticated engine-API RPC server, see
/// `--authrpc.jwtsecret`.
@@ -682,6 +734,18 @@ impl RpcServerArgs {
self
}
/// Enables the Operator-RPC server.
pub const fn with_operator(mut self) -> Self {
self.operator = true;
self
}
/// Configures modules for the Operator-RPC server.
pub fn with_operator_api(mut self, operator_api: RpcModuleSelection) -> Self {
self.operator_api = Some(operator_api);
self
}
/// Enables the Auth IPC
pub const fn with_auth_ipc(mut self) -> Self {
self.auth_ipc = true;
@@ -699,6 +763,7 @@ impl RpcServerArgs {
/// * The `auth_port` is scaled by a factor of `instance * 100`
/// * The `http_port` is scaled by a factor of `-instance`
/// * The `ws_port` is scaled by a factor of `instance * 2`
/// * The `operator_port` is scaled by a factor of `instance * 2 + 1`
/// * The `ipcpath` is appended with the instance number: `/tmp/reth.ipc-<instance>`
///
/// # Panics
@@ -718,6 +783,8 @@ impl RpcServerArgs {
self.http_port -= instance - 1;
// ws port is scaled by a factor of instance * 2
self.ws_port += instance * 2 - 2;
// operator port is scaled by a factor of instance * 2 + 1
self.operator_port += instance * 2 - 1;
// append instance file to ipc path
self.ipcpath = format!("{}-{}", self.ipcpath, instance);
}
@@ -753,12 +820,20 @@ impl RpcServerArgs {
self
}
/// Set the operator port to zero, to allow the OS to assign a random unused port when the rpc
/// server binds to a socket.
pub const fn with_operator_unused_port(mut self) -> Self {
self.operator_port = 0;
self
}
/// Configure all ports to be set to a random unused port when bound, and set the IPC path to a
/// random path.
pub fn with_unused_ports(mut self) -> Self {
self = self.with_http_unused_port();
self = self.with_ws_unused_port();
self = self.with_auth_unused_port();
self = self.with_operator_unused_port();
self = self.with_ipc_random_path();
self
}
@@ -785,6 +860,9 @@ impl RpcServerArgs {
if self.ws && self.ws_api.as_ref().is_some_and(|api| api.contains(&ns)) {
return true;
}
if self.operator && self.operator_api.as_ref().is_some_and(|api| api.contains(&ns)) {
return true;
}
// IPC exposes all modules when enabled
!self.ipcdisable
}
@@ -813,6 +891,10 @@ impl Default for RpcServerArgs {
ipcdisable,
ipcpath,
ipc_socket_permissions,
operator,
operator_addr,
operator_port,
operator_api,
auth_addr,
auth_port,
auth_jwtsecret,
@@ -857,6 +939,10 @@ impl Default for RpcServerArgs {
ipcdisable,
ipcpath,
ipc_socket_permissions,
operator,
operator_addr,
operator_port,
operator_api,
auth_addr,
auth_port,
auth_jwtsecret,
@@ -1017,6 +1103,10 @@ mod tests {
ipcdisable: false,
ipcpath: "reth.ipc".to_string(),
ipc_socket_permissions: Some("0o666".to_string()),
operator: false,
operator_addr: "127.0.0.1".parse().unwrap(),
operator_port: constants::DEFAULT_OPERATOR_RPC_PORT,
operator_api: None,
auth_addr: "127.0.0.1".parse().unwrap(),
auth_port: 8551,
auth_jwtsecret: Some(std::path::PathBuf::from("/tmp/jwt.hex")),

View File

@@ -163,6 +163,14 @@ impl RethRpcServerConfig for RpcServerArgs {
config = config.with_ipc(RpcModuleSelection::default_ipc_modules());
}
if self.operator {
config = config.with_operator(
self.operator_api
.clone()
.unwrap_or_else(|| RpcModuleSelection::standard_modules().into()),
);
}
config
}
@@ -223,6 +231,20 @@ impl RethRpcServerConfig for RpcServerArgs {
config.with_ipc(self.ipc_server_builder()).with_ipc_endpoint(self.ipcpath.clone());
}
if self.operator {
let socket_address = SocketAddr::new(self.operator_addr, self.operator_port);
config = config
.with_operator_address(socket_address)
.with_operator(self.http_ws_server_builder());
}
if self.operator_api.is_some() && !self.operator {
warn!(
target: "reth::cli",
"The --operator.api flag is set but --operator is not enabled. Operator RPC API will not be exposed."
);
}
config
}

View File

@@ -17,6 +17,8 @@ pub enum ServerKind {
WsHttp(SocketAddr),
/// Auth.
Auth(SocketAddr),
/// Operator.
Operator(SocketAddr),
}
impl ServerKind {
@@ -27,6 +29,7 @@ impl ServerKind {
Self::WS(_) => "--ws.port",
Self::WsHttp(_) => "--ws.port and --http.port",
Self::Auth(_) => "--authrpc.port",
Self::Operator(_) => "--operator.port",
}
}
}
@@ -38,6 +41,7 @@ impl std::fmt::Display for ServerKind {
Self::WS(addr) => write!(f, "{addr} (WS-RPC server)"),
Self::WsHttp(addr) => write!(f, "{addr} (WS-HTTP-RPC server)"),
Self::Auth(addr) => write!(f, "{addr} (AUTH server)"),
Self::Operator(addr) => write!(f, "{addr} (Operator-RPC server)"),
}
}
}
@@ -141,6 +145,7 @@ mod tests {
ServerKind::WS(addr),
ServerKind::WsHttp(addr),
ServerKind::Auth(addr),
ServerKind::Operator(addr),
];
for kind in &kinds {

View File

@@ -397,7 +397,8 @@ where
let mut modules = TransportRpcModules::default();
if !module_config.is_empty() {
let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
let TransportRpcModuleConfig { http, ws, ipc, operator, config } =
module_config.clone();
let mut registry = self.into_registry(config.unwrap_or_default(), eth, engine_events);
@@ -405,6 +406,7 @@ where
modules.http = registry.maybe_module(http.as_ref());
modules.ws = registry.maybe_module(ws.as_ref());
modules.ipc = registry.maybe_module(ipc.as_ref());
modules.operator = registry.maybe_module(operator.as_ref());
}
modules
@@ -924,11 +926,13 @@ where
let http = self.maybe_module(config.http.as_ref());
let ws = self.maybe_module(config.ws.as_ref());
let ipc = self.maybe_module(config.ipc.as_ref());
let operator = self.maybe_module(config.operator.as_ref());
modules.config = config;
modules.http = http;
modules.ws = ws;
modules.ipc = ipc;
modules.operator = operator;
modules
}
@@ -1109,6 +1113,10 @@ pub struct RpcServerConfig<RpcMiddleware = Identity> {
ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
/// The Endpoint where to launch the ipc server
ipc_endpoint: Option<String>,
/// Configs for JSON-RPC Operator server.
operator_server_config: Option<ServerConfigBuilder>,
/// Address where to bind the operator server to
operator_addr: Option<SocketAddr>,
/// JWT secret for authentication
jwt_secret: Option<JwtSecret>,
/// Configurable RPC middleware
@@ -1130,6 +1138,8 @@ impl Default for RpcServerConfig<Identity> {
ws_addr: None,
ipc_server_config: None,
ipc_endpoint: None,
operator_server_config: None,
operator_addr: None,
jwt_secret: None,
rpc_middleware: Default::default(),
}
@@ -1179,6 +1189,16 @@ impl RpcServerConfig {
self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
self
}
/// Configures the operator server
///
/// Note: this always configures an [`EthSubscriptionIdProvider`] [`IdProvider`] for
/// convenience. To set a custom [`IdProvider`], please use [`Self::with_id_provider`].
pub fn with_operator(mut self, config: ServerConfigBuilder) -> Self {
self.operator_server_config =
Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
self
}
}
impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
@@ -1194,6 +1214,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
ws_addr: self.ws_addr,
ipc_server_config: self.ipc_server_config,
ipc_endpoint: self.ipc_endpoint,
operator_server_config: self.operator_server_config,
operator_addr: self.operator_addr,
jwt_secret: self.jwt_secret,
rpc_middleware,
}
@@ -1240,6 +1262,15 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
self
}
/// Configures the [`SocketAddr`] of the operator server
///
/// Default is [`Ipv4Addr::LOCALHOST`] and
/// [`reth_rpc_server_types::constants::DEFAULT_OPERATOR_RPC_PORT`]
pub const fn with_operator_address(mut self, addr: SocketAddr) -> Self {
self.operator_addr = Some(addr);
self
}
/// Sets a custom [`IdProvider`] for all configured transports.
///
/// By default all transports use [`EthSubscriptionIdProvider`]
@@ -1254,7 +1285,10 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
}
if let Some(ipc) = self.ipc_server_config {
self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
self.ipc_server_config = Some(ipc.set_id_provider(id_provider.clone()));
}
if let Some(config) = self.operator_server_config {
self.operator_server_config = Some(config.set_id_provider(id_provider));
}
self
@@ -1286,7 +1320,12 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
Some(ws_server_config.custom_tokio_runtime(tokio_runtime.clone()));
}
if let Some(ipc_server_config) = self.ipc_server_config {
self.ipc_server_config = Some(ipc_server_config.custom_tokio_runtime(tokio_runtime));
self.ipc_server_config =
Some(ipc_server_config.custom_tokio_runtime(tokio_runtime.clone()));
}
if let Some(operator_server_config) = self.operator_server_config {
self.operator_server_config =
Some(operator_server_config.custom_tokio_runtime(tokio_runtime));
}
self
}
@@ -1297,7 +1336,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
pub const fn has_server(&self) -> bool {
self.http_server_config.is_some() ||
self.ws_server_config.is_some() ||
self.ipc_server_config.is_some()
self.ipc_server_config.is_some() ||
self.operator_server_config.is_some()
}
/// Returns the [`SocketAddr`] of the http server
@@ -1310,6 +1350,11 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
self.ws_addr
}
/// Returns the [`SocketAddr`] of the operator server
pub const fn operator_address(&self) -> Option<SocketAddr> {
self.operator_addr
}
/// Returns the endpoint of the ipc server
pub fn ipc_endpoint(&self) -> Option<String> {
self.ipc_endpoint.clone()
@@ -1335,7 +1380,7 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
}
}
/// Builds and starts the configured server(s): http, ws, ipc.
/// Builds and starts the configured server(s): http, ws, ipc, operator.
///
/// If both http and ws are on the same port, they are combined into one server.
///
@@ -1347,6 +1392,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
let mut http_handle = None;
let mut ws_handle = None;
let mut ipc_handle = None;
let mut operator_handle = None;
let mut operator_local_addr = None;
let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::LOCALHOST,
@@ -1358,6 +1405,11 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
constants::DEFAULT_WS_RPC_PORT,
)));
let operator_socket_addr = self.operator_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::LOCALHOST,
constants::DEFAULT_OPERATOR_RPC_PORT,
)));
let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
let ipc_path =
self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
@@ -1369,6 +1421,33 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
}
// Launch the operator server if configured
if let Some(config) = self.operator_server_config &&
let Some(module) = modules.operator.as_ref()
{
let server = ServerBuilder::new()
.set_config(config.http_only().build())
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
)
.set_rpc_middleware(
RpcServiceBuilder::default()
.layer(RpcRequestMetrics::http(module))
.layer(self.rpc_middleware.clone()),
)
.build(operator_socket_addr)
.await
.map_err(|err| {
RpcError::server_error(err, ServerKind::Operator(operator_socket_addr))
})?;
let addr = server.local_addr().map_err(|err| {
RpcError::server_error(err, ServerKind::Operator(operator_socket_addr))
})?;
operator_local_addr = Some(addr);
operator_handle = Some(server.start(module.clone()));
}
// If both are configured on the same port, we combine them into one server.
if self.http_addr == self.ws_addr &&
self.http_server_config.is_some() &&
@@ -1435,6 +1514,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
ws: ws_handle,
ipc_endpoint: self.ipc_endpoint.clone(),
ipc: ipc_handle,
operator_local_addr,
operator: operator_handle,
jwt_secret: self.jwt_secret,
});
}
@@ -1507,6 +1588,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
ws: ws_handle,
ipc_endpoint: self.ipc_endpoint.clone(),
ipc: ipc_handle,
operator_local_addr,
operator: operator_handle,
jwt_secret: self.jwt_secret,
})
}
@@ -1531,6 +1614,8 @@ pub struct TransportRpcModuleConfig {
ws: Option<RpcModuleSelection>,
/// ipc module configuration
ipc: Option<RpcModuleSelection>,
/// operator module configuration
operator: Option<RpcModuleSelection>,
/// Config for the modules
config: Option<RpcModuleConfig>,
}
@@ -1553,6 +1638,11 @@ impl TransportRpcModuleConfig {
Self::default().with_ipc(ipc)
}
/// Creates a new config with only operator set
pub fn set_operator(operator: impl Into<RpcModuleSelection>) -> Self {
Self::default().with_operator(operator)
}
/// Sets the [`RpcModuleSelection`] for the http transport.
pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
self.http = Some(http.into());
@@ -1571,6 +1661,12 @@ impl TransportRpcModuleConfig {
self
}
/// Sets the [`RpcModuleSelection`] for the operator transport.
pub fn with_operator(mut self, operator: impl Into<RpcModuleSelection>) -> Self {
self.operator = Some(operator.into());
self
}
/// Sets a custom [`RpcModuleConfig`] for the configured modules.
pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
self.config = Some(config);
@@ -1592,6 +1688,11 @@ impl TransportRpcModuleConfig {
&mut self.ipc
}
/// Get a mutable reference to the operator module configuration.
pub const fn operator_mut(&mut self) -> &mut Option<RpcModuleSelection> {
&mut self.operator
}
/// Get a mutable reference to the rpc module configuration.
pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
&mut self.config
@@ -1599,7 +1700,7 @@ impl TransportRpcModuleConfig {
/// Returns true if no transports are configured
pub const fn is_empty(&self) -> bool {
self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
self.http.is_none() && self.ws.is_none() && self.ipc.is_none() && self.operator.is_none()
}
/// Returns the [`RpcModuleSelection`] for the http transport
@@ -1617,6 +1718,11 @@ impl TransportRpcModuleConfig {
self.ipc.as_ref()
}
/// Returns the [`RpcModuleSelection`] for the operator transport
pub const fn operator(&self) -> Option<&RpcModuleSelection> {
self.operator.as_ref()
}
/// Returns the [`RpcModuleConfig`] for the configured modules
pub const fn config(&self) -> Option<&RpcModuleConfig> {
self.config.as_ref()
@@ -1624,7 +1730,10 @@ impl TransportRpcModuleConfig {
/// Returns true if the given module is configured for any transport.
pub fn contains_any(&self, module: &RethRpcModule) -> bool {
self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
self.contains_http(module) ||
self.contains_ws(module) ||
self.contains_ipc(module) ||
self.contains_operator(module)
}
/// Returns true if the given module is configured for the http transport.
@@ -1642,6 +1751,11 @@ impl TransportRpcModuleConfig {
self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
}
/// Returns true if the given module is configured for the operator transport.
pub fn contains_operator(&self, module: &RethRpcModule) -> bool {
self.operator.as_ref().is_some_and(|operator| operator.contains(module))
}
/// Ensures that both http and ws are configured and that they are configured to use the same
/// port.
fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
@@ -1677,6 +1791,8 @@ pub struct TransportRpcModules<Context = ()> {
ws: Option<RpcModule<Context>>,
/// rpcs module for ipc
ipc: Option<RpcModule<Context>>,
/// rpcs module for operator
operator: Option<RpcModule<Context>>,
}
// === impl TransportRpcModules ===
@@ -1710,6 +1826,13 @@ impl TransportRpcModules {
self
}
/// Sets the [`RpcModule`] for the operator transport.
/// This will overwrite current module, if any.
pub fn with_operator(mut self, operator: RpcModule<()>) -> Self {
self.operator = Some(operator);
self
}
/// Returns the [`TransportRpcModuleConfig`] used to configure this instance.
pub const fn module_config(&self) -> &TransportRpcModuleConfig {
&self.config
@@ -1732,7 +1855,10 @@ impl TransportRpcModules {
self.merge_ws(other.clone())?;
}
if self.module_config().contains_ipc(&module) {
self.merge_ipc(other)?;
self.merge_ipc(other.clone())?;
}
if self.module_config().contains_operator(&module) {
self.merge_operator(other)?;
}
Ok(())
@@ -1795,6 +1921,21 @@ impl TransportRpcModules {
Ok(false)
}
/// Merge the given [Methods] in the configured operator methods.
///
/// Fails if any of the methods in other is present already.
///
/// Returns [Ok(false)] if no operator transport is configured.
pub fn merge_operator(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, RegisterMethodError> {
if let Some(ref mut operator) = self.operator {
return operator.merge(other.into()).map(|_| true)
}
Ok(false)
}
/// Merge the given [`Methods`] in all configured methods.
///
/// Fails if any of the methods in other is present already.
@@ -1805,7 +1946,8 @@ impl TransportRpcModules {
let other = other.into();
self.merge_http(other.clone())?;
self.merge_ws(other.clone())?;
self.merge_ipc(other)?;
self.merge_ipc(other.clone())?;
self.merge_operator(other)?;
Ok(())
}
@@ -1838,6 +1980,9 @@ impl TransportRpcModules {
if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
let _ = methods.merge(m);
}
if let Some(m) = self.operator_methods(|name| f(name, &methods)) {
let _ = methods.merge(m);
}
methods
}
@@ -1871,6 +2016,16 @@ impl TransportRpcModules {
self.ipc.as_ref().map(|module| methods_by(module, filter))
}
/// Returns all [`Methods`] installed for the operator server based in the given closure.
///
/// Returns `None` if no operator support is configured.
pub fn operator_methods<F>(&self, filter: F) -> Option<Methods>
where
F: FnMut(&str) -> bool,
{
self.operator.as_ref().map(|module| methods_by(module, filter))
}
/// Removes the method with the given name from the configured http methods.
///
/// Returns `true` if the method was found and removed, `false` otherwise.
@@ -1937,6 +2092,28 @@ impl TransportRpcModules {
}
}
/// Removes the method with the given name from the configured operator methods.
///
/// Returns `true` if the method was found and removed, `false` otherwise.
///
/// Be aware that a subscription consist of two methods, `subscribe` and `unsubscribe` and
/// it's the caller responsibility to remove both `subscribe` and `unsubscribe` methods for
/// subscriptions.
pub fn remove_operator_method(&mut self, method_name: &'static str) -> bool {
if let Some(operator_module) = &mut self.operator {
operator_module.remove_method(method_name).is_some()
} else {
false
}
}
/// Removes the given methods from the configured operator methods.
pub fn remove_operator_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
for name in methods {
self.remove_operator_method(name);
}
}
/// Removes the method with the given name from all configured transports.
///
/// Returns `true` if the method was found and removed, `false` otherwise.
@@ -1944,8 +2121,9 @@ impl TransportRpcModules {
let http_removed = self.remove_http_method(method_name);
let ws_removed = self.remove_ws_method(method_name);
let ipc_removed = self.remove_ipc_method(method_name);
let operator_removed = self.remove_operator_method(method_name);
http_removed || ws_removed || ipc_removed
http_removed || ws_removed || ipc_removed || operator_removed
}
/// Renames a method in all configured transports by:
@@ -1999,6 +2177,21 @@ impl TransportRpcModules {
self.merge_ws(other)
}
/// Replace the given [Methods] in the configured operator methods.
///
/// Fails if any of the methods in other is present already or if the method being removed is
/// not present
///
/// Returns [Ok(false)] if no operator transport is configured.
pub fn replace_operator(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, RegisterMethodError> {
let other = other.into();
self.remove_operator_methods(other.method_names());
self.merge_operator(other)
}
/// Replaces the method with the given name from all configured transports.
///
/// Returns `true` if the method was found and replaced, `false` otherwise
@@ -2009,7 +2202,8 @@ impl TransportRpcModules {
let other = other.into();
self.replace_http(other.clone())?;
self.replace_ws(other.clone())?;
self.replace_ipc(other)?;
self.replace_ipc(other.clone())?;
self.replace_operator(other)?;
Ok(true)
}
@@ -2049,6 +2243,18 @@ impl TransportRpcModules {
self.merge_ipc(other)
}
/// Adds or replaces given [`Methods`] in operator module.
///
/// Returns `true` if the methods were replaced or added, `false` otherwise.
pub fn add_or_replace_operator(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, RegisterMethodError> {
let other = other.into();
self.remove_operator_methods(other.method_names());
self.merge_operator(other)
}
/// Adds or replaces given [`Methods`] in all configured network modules.
pub fn add_or_replace_configured(
&mut self,
@@ -2057,7 +2263,8 @@ impl TransportRpcModules {
let other = other.into();
self.add_or_replace_http(other.clone())?;
self.add_or_replace_ws(other.clone())?;
self.add_or_replace_ipc(other)?;
self.add_or_replace_ipc(other.clone())?;
self.add_or_replace_operator(other)?;
Ok(())
}
/// Adds or replaces the given [`Methods`] in the transport modules where the specified
@@ -2075,7 +2282,10 @@ impl TransportRpcModules {
self.add_or_replace_ws(other.clone())?;
}
if self.module_config().contains_ipc(&module) {
self.add_or_replace_ipc(other)?;
self.add_or_replace_ipc(other.clone())?;
}
if self.module_config().contains_operator(&module) {
self.add_or_replace_operator(other)?;
}
Ok(())
}
@@ -2112,6 +2322,8 @@ pub struct RpcServerHandle {
ws: Option<ServerHandle>,
ipc_endpoint: Option<String>,
ipc: Option<jsonrpsee::server::ServerHandle>,
operator_local_addr: Option<SocketAddr>,
operator: Option<ServerHandle>,
jwt_secret: Option<JwtSecret>,
}
@@ -2144,6 +2356,11 @@ impl RpcServerHandle {
self.ws_local_addr
}
/// Returns the [`SocketAddr`] of the operator server if started.
pub const fn operator_local_addr(&self) -> Option<SocketAddr> {
self.operator_local_addr
}
/// Tell the server to stop without waiting for the server to stop.
pub fn stop(self) -> Result<(), AlreadyStoppedError> {
if let Some(handle) = self.http {
@@ -2158,6 +2375,10 @@ impl RpcServerHandle {
handle.stop()?
}
if let Some(handle) = self.operator {
handle.stop()?
}
Ok(())
}
@@ -2176,6 +2397,11 @@ impl RpcServerHandle {
self.ws_local_addr.map(|addr| format!("ws://{addr}"))
}
/// Returns the url to the operator server
pub fn operator_url(&self) -> Option<String> {
self.operator_local_addr.map(|addr| format!("http://{addr}"))
}
/// Returns a http client connected to the server.
pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
let url = self.http_url()?;
@@ -2431,6 +2657,7 @@ mod tests {
)),
ws: None,
ipc: None,
operator: None,
config: None,
}
)
@@ -2445,6 +2672,7 @@ mod tests {
http: Some(RpcModuleSelection::Selection(Default::default())),
ws: None,
ipc: None,
operator: None,
config: None,
}
)
@@ -2660,6 +2888,7 @@ mod tests {
http: Some(http_module),
ws: Some(ws_module),
ipc: Some(ipc_module),
operator: None,
};
// Create new methods: one to replace an existing method, one to add a new one
@@ -2693,8 +2922,13 @@ mod tests {
// Create a config that enables RethRpcModule::Eth for HTTP only
let config = TransportRpcModuleConfig::default().with_http([RethRpcModule::Eth]);
let mut modules =
TransportRpcModules { config, http: Some(RpcModule::new(())), ws: None, ipc: None };
let mut modules = TransportRpcModules {
config,
http: Some(RpcModule::new(())),
ws: None,
ipc: None,
operator: None,
};
// Track whether closure was called
let mut closure_called = false;

View File

@@ -9,6 +9,9 @@ pub const DEFAULT_WS_RPC_PORT: u16 = 8546;
/// The default port for the auth server.
pub const DEFAULT_AUTH_PORT: u16 = 8551;
/// The default port for the operator RPC server.
pub const DEFAULT_OPERATOR_RPC_PORT: u16 = 8547;
/// The default maximum block range allowed to filter
pub const DEFAULT_MAX_BLOCKS_PER_FILTER: u64 = 100_000;

View File

@@ -357,6 +357,26 @@ RPC:
If not specified, the permissions will be set by the system's umask.
--operator
Enable the Operator-RPC server.
This starts an additional HTTP-RPC server that can serve a different set of API modules on a separate port. Useful for exposing operator-specific APIs without affecting the public-facing RPC server.
--operator.addr <OPERATOR_ADDR>
Operator server address to listen on
[default: 127.0.0.1]
--operator.port <OPERATOR_PORT>
Operator server port to listen on
[default: 8547]
--operator.api <OPERATOR_API>
Rpc Modules to be configured for the operator server
[possible values: admin, debug, eth, net, trace, txpool, web3, rpc, reth, ots, flashbots, miner, mev, testing]
--authrpc.addr <AUTH_ADDR>
Auth server address to listen on
@@ -388,7 +408,7 @@ RPC:
This will prevent the authenticated engine-API server from starting. Use this if you're running a node that doesn't need to serve engine API requests.
--rpc.jwtsecret <HEX>
Hex encoded JWT secret to authenticate the regular RPC server(s), see `--http.api` and `--ws.api`.
Hex encoded JWT secret to authenticate the regular RPC server(s), see `--http.api`, `--ws.api`, and `--operator.api`.
This is __not__ used for the authenticated engine-API RPC server, see `--authrpc.jwtsecret`.