dchat: add anchors/ fix ports/ uncomment daemon

This commit is contained in:
lunar-mining
2023-12-15 13:42:38 +01:00
parent 4f5d7ddb98
commit bbf2a67531
4 changed files with 68 additions and 61 deletions

View File

@@ -13,9 +13,10 @@ async-trait = "0.1.74"
log = "0.4.20"
url = "2.5.0"
[dependencies]
# darkfi
# ANCHOR: darkfi
darkfi = {path = "../../../", features = ["net", "toml", "system", "async-daemonize", "rpc"]}
darkfi-serial = {path = "../../../src/serial"}
# ANCHOR_END: darkfi
# daemon
easy-parallel = "3.3.1"

View File

@@ -1,5 +1,8 @@
# dchat toml
## RPC listen address.
rpc_listen =["tcp://127.0.0.1:51054"]
[net]
## P2P accept addresses Required for inbound nodes.
#inbound=["tcp://127.0.0.1:51554"]
@@ -8,7 +11,7 @@
#external_addr=["tcp://127.0.0.1:51554"]
## Seed nodes to connect to. Required for inbound and outbound nodes.
seeds=["tcp://127.0.0.1:55555"]
seeds=["tcp://127.0.0.1:50515"]
## Outbound connect slots. Required for outbound nodes.
outbound_connections = 5

View File

@@ -16,6 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
// ANCHOR: imports
use log::{debug, error, info};
use smol::{lock::Mutex, stream::StreamExt};
use std::{collections::HashSet, sync::Arc};
@@ -38,6 +39,7 @@ use crate::{
dchatmsg::{DchatMsg, DchatMsgsBuffer},
protocol_dchat::ProtocolDchat,
};
// ANCHOR_END: imports
pub mod dchat_error;
pub mod dchatmsg;
@@ -81,7 +83,6 @@ struct Dchat {
pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
pub dnet_sub: JsonSubscriber,
}
// ANCHOR_END: dchat
impl Dchat {
fn new(
@@ -93,69 +94,70 @@ impl Dchat {
Self { p2p, recv_msgs, rpc_connections, dnet_sub }
}
}
// ANCHOR_END: dchat
// ANCHOR: main
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
let p2p = net::P2p::new(args.net.into(), ex.clone()).await;
// // ANCHOR: dnet
// info!("Starting dnet subs task");
// let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
// let dnet_sub_ = dnet_sub.clone();
// let p2p_ = p2p.clone();
// let dnet_task = StoppableTask::new();
// dnet_task.clone().start(
// async move {
// let dnet_sub = p2p_.dnet_subscribe().await;
// loop {
// let event = dnet_sub.receive().await;
// debug!("Got dnet event: {:?}", event);
// dnet_sub_.notify(vec![event.into()].into()).await;
// }
// },
// |res| async {
// match res {
// Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
// Err(e) => panic!("{}", e),
// }
// },
// Error::DetachedTaskStopped,
// ex.clone(),
// );
// // ANCHOR_end: dnet
// ANCHOR: dnet
info!("Starting dnet subs task");
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let dnet_sub_ = dnet_sub.clone();
let p2p_ = p2p.clone();
let dnet_task = StoppableTask::new();
dnet_task.clone().start(
async move {
let dnet_sub = p2p_.dnet_subscribe().await;
loop {
let event = dnet_sub.receive().await;
debug!("Got dnet event: {:?}", event);
dnet_sub_.notify(vec![event.into()].into()).await;
}
},
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => panic!("{}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
// ANCHOR_end: dnet
// // ANCHOR: rpc
// info!("Starting JSON-RPC server on port {}", args.rpc_listen);
// let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
// let rpc_connections = Mutex::new(HashSet::new());
// let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub));
// let _ex = ex.clone();
// ANCHOR: rpc
info!("Starting JSON-RPC server on port {}", args.rpc_listen);
let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
let rpc_connections = Mutex::new(HashSet::new());
let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub));
let _ex = ex.clone();
// let rpc_task = StoppableTask::new();
// rpc_task.clone().start(
// listen_and_serve(args.rpc_listen, dchat.clone(), None, ex.clone()),
// |res| async move {
// match res {
// Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await,
// Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
// }
// },
// Error::RpcServerStopped,
// ex.clone(),
// );
// // ANCHOR_end: rpc
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, dchat.clone(), None, ex.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await,
Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
}
},
Error::RpcServerStopped,
ex.clone(),
);
// ANCHOR_end: rpc
// // ANCHOR: register_protocol
// info!("Registering Dchat protocol");
// let registry = p2p.protocol_registry();
// registry
// .register(!net::session::SESSION_SEED, move |channel, _p2p| {
// let msgs_ = msgs.clone();
// async move { ProtocolDchat::init(channel, msgs_).await }
// })
// .await;
// // ANCHOR_END: register_protocol
// ANCHOR: register_protocol
info!("Registering Dchat protocol");
let registry = p2p.protocol_registry();
registry
.register(!net::session::SESSION_SEED, move |channel, _p2p| {
let msgs_ = msgs.clone();
async move { ProtocolDchat::init(channel, msgs_).await }
})
.await;
// ANCHOR_END: register_protocol
// ANCHOR: p2p_start
info!("Starting P2P network");
@@ -170,11 +172,11 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
info!("Stopping P2P network");
p2p.stop().await;
//info!("Stopping JSON-RPC server");
//rpc_task.stop().await;
//dnet_task.stop().await;
info!("Stopping JSON-RPC server");
rpc_task.stop().await;
dnet_task.stop().await;
//info!("Shut down successfully");
info!("Shut down successfully");
// ANCHOR_END: shutdown
Ok(())
}

View File

@@ -48,6 +48,7 @@ impl RequestHandler for Dchat {
}
// ANCHOR_END: req_match
}
async fn connections_mut(&self) -> MutexGuard<'_, HashSet<StoppableTaskPtr>> {
self.rpc_connections.lock().await
}