diff --git a/example/dchat/dchatd/Cargo.toml b/example/dchat/dchatd/Cargo.toml index 6482facae..82d2490a9 100644 --- a/example/dchat/dchatd/Cargo.toml +++ b/example/dchat/dchatd/Cargo.toml @@ -13,9 +13,10 @@ async-trait = "0.1.74" log = "0.4.20" url = "2.5.0" [dependencies] -# darkfi +# ANCHOR: darkfi darkfi = {path = "../../../", features = ["net", "toml", "system", "async-daemonize", "rpc"]} darkfi-serial = {path = "../../../src/serial"} +# ANCHOR_END: darkfi # daemon easy-parallel = "3.3.1" diff --git a/example/dchat/dchatd/dchatd_config.toml b/example/dchat/dchatd/dchatd_config.toml index 3b96c01f9..9d1dbfd1a 100644 --- a/example/dchat/dchatd/dchatd_config.toml +++ b/example/dchat/dchatd/dchatd_config.toml @@ -1,5 +1,8 @@ # dchat toml +## RPC listen address. +rpc_listen =["tcp://127.0.0.1:51054"] + [net] ## P2P accept addresses Required for inbound nodes. #inbound=["tcp://127.0.0.1:51554"] @@ -8,7 +11,7 @@ #external_addr=["tcp://127.0.0.1:51554"] ## Seed nodes to connect to. Required for inbound and outbound nodes. -seeds=["tcp://127.0.0.1:55555"] +seeds=["tcp://127.0.0.1:50515"] ## Outbound connect slots. Required for outbound nodes. outbound_connections = 5 diff --git a/example/dchat/dchatd/src/main.rs b/example/dchat/dchatd/src/main.rs index fee326730..37c133428 100644 --- a/example/dchat/dchatd/src/main.rs +++ b/example/dchat/dchatd/src/main.rs @@ -16,6 +16,7 @@ * along with this program. If not, see . */ +// ANCHOR: imports use log::{debug, error, info}; use smol::{lock::Mutex, stream::StreamExt}; use std::{collections::HashSet, sync::Arc}; @@ -38,6 +39,7 @@ use crate::{ dchatmsg::{DchatMsg, DchatMsgsBuffer}, protocol_dchat::ProtocolDchat, }; +// ANCHOR_END: imports pub mod dchat_error; pub mod dchatmsg; @@ -81,7 +83,6 @@ struct Dchat { pub rpc_connections: Mutex>, pub dnet_sub: JsonSubscriber, } -// ANCHOR_END: dchat impl Dchat { fn new( @@ -93,69 +94,70 @@ impl Dchat { Self { p2p, recv_msgs, rpc_connections, dnet_sub } } } +// ANCHOR_END: dchat // ANCHOR: main async_daemonize!(realmain); async fn realmain(args: Args, ex: Arc>) -> Result<()> { let p2p = net::P2p::new(args.net.into(), ex.clone()).await; - // // ANCHOR: dnet - // info!("Starting dnet subs task"); - // let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); - // let dnet_sub_ = dnet_sub.clone(); - // let p2p_ = p2p.clone(); - // let dnet_task = StoppableTask::new(); - // dnet_task.clone().start( - // async move { - // let dnet_sub = p2p_.dnet_subscribe().await; - // loop { - // let event = dnet_sub.receive().await; - // debug!("Got dnet event: {:?}", event); - // dnet_sub_.notify(vec![event.into()].into()).await; - // } - // }, - // |res| async { - // match res { - // Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } - // Err(e) => panic!("{}", e), - // } - // }, - // Error::DetachedTaskStopped, - // ex.clone(), - // ); - // // ANCHOR_end: dnet + // ANCHOR: dnet + info!("Starting dnet subs task"); + let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); + let dnet_sub_ = dnet_sub.clone(); + let p2p_ = p2p.clone(); + let dnet_task = StoppableTask::new(); + dnet_task.clone().start( + async move { + let dnet_sub = p2p_.dnet_subscribe().await; + loop { + let event = dnet_sub.receive().await; + debug!("Got dnet event: {:?}", event); + dnet_sub_.notify(vec![event.into()].into()).await; + } + }, + |res| async { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => panic!("{}", e), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + // ANCHOR_end: dnet - // // ANCHOR: rpc - // info!("Starting JSON-RPC server on port {}", args.rpc_listen); - // let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }])); - // let rpc_connections = Mutex::new(HashSet::new()); - // let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub)); - // let _ex = ex.clone(); + // ANCHOR: rpc + info!("Starting JSON-RPC server on port {}", args.rpc_listen); + let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }])); + let rpc_connections = Mutex::new(HashSet::new()); + let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub)); + let _ex = ex.clone(); - // let rpc_task = StoppableTask::new(); - // rpc_task.clone().start( - // listen_and_serve(args.rpc_listen, dchat.clone(), None, ex.clone()), - // |res| async move { - // match res { - // Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await, - // Err(e) => error!("Failed stopping JSON-RPC server: {}", e), - // } - // }, - // Error::RpcServerStopped, - // ex.clone(), - // ); - // // ANCHOR_end: rpc + let rpc_task = StoppableTask::new(); + rpc_task.clone().start( + listen_and_serve(args.rpc_listen, dchat.clone(), None, ex.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await, + Err(e) => error!("Failed stopping JSON-RPC server: {}", e), + } + }, + Error::RpcServerStopped, + ex.clone(), + ); + // ANCHOR_end: rpc - // // ANCHOR: register_protocol - // info!("Registering Dchat protocol"); - // let registry = p2p.protocol_registry(); - // registry - // .register(!net::session::SESSION_SEED, move |channel, _p2p| { - // let msgs_ = msgs.clone(); - // async move { ProtocolDchat::init(channel, msgs_).await } - // }) - // .await; - // // ANCHOR_END: register_protocol + // ANCHOR: register_protocol + info!("Registering Dchat protocol"); + let registry = p2p.protocol_registry(); + registry + .register(!net::session::SESSION_SEED, move |channel, _p2p| { + let msgs_ = msgs.clone(); + async move { ProtocolDchat::init(channel, msgs_).await } + }) + .await; + // ANCHOR_END: register_protocol // ANCHOR: p2p_start info!("Starting P2P network"); @@ -170,11 +172,11 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!("Stopping P2P network"); p2p.stop().await; - //info!("Stopping JSON-RPC server"); - //rpc_task.stop().await; - //dnet_task.stop().await; + info!("Stopping JSON-RPC server"); + rpc_task.stop().await; + dnet_task.stop().await; - //info!("Shut down successfully"); + info!("Shut down successfully"); // ANCHOR_END: shutdown Ok(()) } diff --git a/example/dchat/dchatd/src/rpc.rs b/example/dchat/dchatd/src/rpc.rs index 429893a18..a3b75dcd2 100644 --- a/example/dchat/dchatd/src/rpc.rs +++ b/example/dchat/dchatd/src/rpc.rs @@ -48,6 +48,7 @@ impl RequestHandler for Dchat { } // ANCHOR_END: req_match } + async fn connections_mut(&self) -> MutexGuard<'_, HashSet> { self.rpc_connections.lock().await }