rpc/server: Implement stop_connections() in RequestHandler.

This commit is contained in:
parazyd
2023-08-31 11:43:50 +02:00
parent 43e1c2f7a0
commit fb1350daec
10 changed files with 62 additions and 44 deletions

View File

@@ -444,11 +444,12 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
// JSON-RPC server
info!("Starting JSON-RPC server");
let rpc_task = StoppableTask::new();
let darkfid_ = darkfid.clone();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, darkfid.clone(), None, ex.clone()),
|res| async {
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
Ok(()) | Err(Error::RpcServerStopped) => darkfid_.stop_connections().await,
Err(e) => error!(target: "darkfid", "Failed starting sync JSON-RPC server: {}", e),
}
},

View File

@@ -31,7 +31,10 @@ use darkfi::{
blockchain::BlockInfo,
cli_desc,
net::{settings::SettingsOpt, P2pPtr},
rpc::{jsonrpc::JsonSubscriber, server::listen_and_serve},
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{StoppableTask, StoppableTaskPtr},
util::time::TimeKeeper,
validator::{Validator, ValidatorConfig, ValidatorPtr},
@@ -202,11 +205,12 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
// stop() function to shut down, also terminating the task we
// created for it.
let rpc_task = StoppableTask::new();
let darkfid_ = darkfid.clone();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, darkfid.clone(), None, ex.clone()),
|res| async {
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
Ok(()) | Err(Error::RpcServerStopped) => darkfid_.stop_connections().await,
Err(e) => error!(target: "darkfid", "Failed starting sync JSON-RPC server: {}", e),
}
},

View File

@@ -38,7 +38,10 @@ use darkfi::{
view::View,
},
net,
rpc::{jsonrpc::JsonSubscriber, server::listen_and_serve},
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{sleep, StoppableTask, Subscriber, SubscriberPtr},
util::{file::save_json_file, path::expand_path, time::Timestamp},
Error, Result,
@@ -245,12 +248,14 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
dnet_sub: json_sub,
rpc_connections: Mutex::new(HashSet::new()),
});
let rpc_task = StoppableTask::new();
let rpc_interface_ = rpc_interface.clone();
rpc_task.clone().start(
listen_and_serve(rpc_listen_addr, rpc_interface, None, executor.clone()),
|res| async {
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
Ok(()) | Err(Error::RpcServerStopped) => rpc_interface_.stop_connections().await,
Err(e) => error!(target: "darkirc", "Failed starting JSON-RPC server: {}", e),
}
},

View File

@@ -763,11 +763,12 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
// JSON-RPC server
info!(target: "faucetd", "Starting JSON-RPC server");
let rpc_task = StoppableTask::new();
let faucetd_ = faucetd.clone();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, faucetd.clone(), None, ex.clone()),
|res| async {
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
Ok(()) | Err(Error::RpcServerStopped) => faucetd_.stop_connections().await,
Err(e) => error!(target: "faucetd", "Failed starting JSON-RPC server: {}", e),
}
},

View File

@@ -587,11 +587,12 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!(target: "fud", "Starting JSON-RPC server on {}", args.rpc_listen);
let rpc_task = StoppableTask::new();
let fud_ = fud.clone();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, fud.clone(), None, ex.clone()),
|res| async {
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
Ok(()) | Err(Error::RpcServerStopped) => fud_.stop_connections().await,
Err(e) => error!(target: "fud", "Failed starting sync JSON-RPC server: {}", e),
}
},
@@ -600,8 +601,8 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
);
info!("Starting P2P protocols");
let fud_ = fud.clone();
let registry = p2p.protocol_registry();
let fud_ = fud.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let fud_ = fud_.clone();

View File

@@ -27,7 +27,7 @@ use darkfi::{
view::{View, ViewPtr},
},
net::{self, settings::SettingsOpt},
rpc::server::listen_and_serve,
rpc::server::{listen_and_serve, RequestHandler},
system::StoppableTask,
Error, Result,
};
@@ -153,11 +153,12 @@ async fn realmain(args: Args, executor: Arc<smol::Executor<'static>>) -> Result<
p2p.clone(),
));
let rpc_task = StoppableTask::new();
let rpc_interface_ = rpc_interface.clone();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, rpc_interface, None, executor.clone()),
|res| async {
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
Ok(()) | Err(Error::RpcServerStopped) => rpc_interface_.stop_connections().await,
Err(e) => error!(target: "genevd", "Failed starting JSON-RPC server: {}", e),
}
},

View File

@@ -476,12 +476,13 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
// JSON-RPC server
info!(target: "lilith", "Starting JSON-RPC server on {}", args.rpc_listen);
let lilith_ = lilith.clone();
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, lilith.clone(), None, ex.clone()),
|res| async {
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
Ok(()) | Err(Error::RpcServerStopped) => lilith_.stop_connections().await,
Err(e) => error!(target: "lilith", "Failed starting JSON-RPC server: {}", e),
}
},

View File

@@ -49,7 +49,7 @@ use darkfi::{
EventMsg,
},
net::{self, P2pPtr},
rpc::server::listen_and_serve,
rpc::server::{listen_and_serve, RequestHandler},
system::StoppableTask,
util::{path::expand_path, time::Timestamp},
Error, Result,
@@ -403,10 +403,10 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
));
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(settings.rpc_listen, rpc_interface, None, executor.clone()),
|res| async {
listen_and_serve(settings.rpc_listen, rpc_interface.clone(), None, executor.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
Ok(()) | Err(Error::RpcServerStopped) => rpc_interface.stop_connections().await,
Err(e) => error!(target: "taud", "Failed starting JSON-RPC server: {}", e),
}
},

View File

@@ -60,6 +60,14 @@ pub trait RequestHandler: Sync + Send {
async fn active_connections(&self) -> usize {
self.connections_mut().await.len()
}
async fn stop_connections(&self) {
info!(target: "rpc::server", "[RPC] Server stopped, closing connections");
for (i, task) in self.connections().await.iter().enumerate() {
debug!(target: "rpc::server", "Stopping connection #{}", i);
task.stop().await;
}
}
}
/// Accept function that should run inside a loop for accepting incoming
@@ -250,11 +258,7 @@ mod tests {
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => {
eprintln!("Stopping connections");
for (i, task) in rpc_server_.connections().await.iter().enumerate() {
eprintln!("Stopping connection #{}", i);
task.stop().await;
}
rpc_server_.stop_connections().await
}
Err(e) => panic!("{}", e),
}

View File

@@ -29,14 +29,13 @@ use tinyjson::JsonValue;
use url::Url;
use darkfi::{
net::transport::Listener,
rpc::{
client::RpcClient,
jsonrpc::*,
server::{accept, RequestHandler},
server::{listen_and_serve, RequestHandler},
},
system::StoppableTaskPtr,
Result,
system::{msleep, StoppableTask, StoppableTaskPtr},
Error, Result,
};
struct RpcSrv {
@@ -75,7 +74,6 @@ impl RequestHandler for RpcSrv {
#[test]
fn jsonrpc_reqrep() -> Result<()> {
let executor = Arc::new(Executor::new());
let executor_ = executor.clone();
smol::block_on(executor.run(async {
// Find an available port
@@ -88,20 +86,22 @@ fn jsonrpc_reqrep() -> Result<()> {
stop_sub: smol::channel::unbounded(),
rpc_connections: Mutex::new(HashSet::new()),
});
let listener = Listener::new(endpoint.clone()).await?.listen().await?;
let rpcsrv_ = Arc::clone(&rpcsrv);
executor
.spawn(async move {
while let Ok((stream, peer_addr)) = listener.next().await {
let rh_ = rpcsrv.clone();
executor_
.spawn(async move {
let _ = accept(stream, peer_addr.clone(), rh_, None).await;
})
.detach();
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(endpoint.clone(), rpcsrv.clone(), None, executor.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => rpcsrv_.stop_connections().await,
Err(e) => eprintln!("Failed starting JSON-RPC server: {}", e),
}
})
.detach();
},
Error::RpcServerStopped,
executor.clone(),
);
msleep(500).await;
let client = RpcClient::new(endpoint, executor.clone()).await?;
let req = JsonRequest::new("ping", vec![]);