wallet/evgrd: add more debug and error handling to net interfaces.

This commit is contained in:
darkfi
2024-09-25 13:22:58 +02:00
parent 8e7e898ac4
commit 204de13e63
2 changed files with 40 additions and 29 deletions

View File

@@ -50,7 +50,7 @@ use std::{
use url::Url;
use crate::{
prop::{PropertyBool, PropertyStr, PropertyFloat32, Role},
prop::{PropertyBool, PropertyFloat32, PropertyStr, Role},
scene::{SceneNodePtr, Slot},
};
@@ -59,7 +59,7 @@ const EVGRDB_PATH: &str = "/data/data/darkfi.darkwallet/evgr/";
#[cfg(target_os = "linux")]
const EVGRDB_PATH: &str = "~/.local/darkfi/darkwallet/evgr/";
const ENDPOINT: &str = "tcp://agorism.dev:5588";
const ENDPOINT: &str = "tor://obbc5rgtsqtscnph7yxrbsgsm5axbppfn552yr5lrrd2ocgkdcsjcnyd.onion:25589";
const CHANNEL: &str = "#random";
/// Due to drift between different machine's clocks, if the message timestamp is recent
@@ -105,7 +105,8 @@ impl LocalDarkIRC {
let editbox_node = sg_root.clone().lookup_node("/window/view/editz").unwrap();
let editbox_text = PropertyStr::wrap(&editbox_node, Role::App, "text", 0).unwrap();
let chatview_scroll = PropertyFloat32::wrap(&chatview_node, Role::Internal, "scroll", 0).unwrap();
let chatview_scroll =
PropertyFloat32::wrap(&chatview_node, Role::Internal, "scroll", 0).unwrap();
let upgrade_popup_node = sg_root.clone().lookup_node("/window/view/upgrade_popup").unwrap();
let upgrade_popup_is_visible =
@@ -249,8 +250,10 @@ impl LocalDarkIRC {
let Some(stream) = &mut *self.stream.lock().await else { return Err(Error::ConnectFailed) };
let version = VersionMessage::new();
debug!(target: "darkirc", "Sending version: {version:?}");
version.encode_async(stream).await?;
debug!(target: "darkirc", "Receiving version...");
let server_version = VersionMessage::decode_async(stream).await?;
info!(target: "darkirc", "Backend server version: {}", server_version.protocol_version);

View File

@@ -152,7 +152,13 @@ async fn rpc_serve(
match listener.next().await {
Ok((stream, url)) => {
info!(target: "evgrd", "Accepted connection from {url}");
ex.spawn(handle_connect(stream, daemon.clone())).detach();
let daemon = daemon.clone();
ex.spawn(async move {
if let Err(e) = handle_connect(stream, daemon).await {
error!(target: "evgrd", "Handle connect exited: {e}");
}
})
.detach();
}
// Errors we didn't handle above:
@@ -168,11 +174,13 @@ async fn rpc_serve(
}
async fn handle_connect(mut stream: Box<dyn PtStream>, daemon: Arc<Daemon>) -> Result<()> {
debug!(target: "evgrd", "Receiving version...");
let client_version = VersionMessage::decode_async(&mut stream).await?;
info!(target: "evgrd", "Client version: {}", client_version.protocol_version);
let version = VersionMessage::new();
version.encode_async(&mut stream).await?;
debug!(target: "darkirc", "Sent version: {version:?}");
let event_sub = daemon.event_graph.event_pub.clone().subscribe().await;
@@ -188,7 +196,7 @@ async fn handle_connect(mut stream: Box<dyn PtStream>, daemon: Arc<Daemon>) -> R
match msg_type {
MSG_FETCHEVENTS => fetch_events(&mut stream, &daemon).await?,
MSG_SENDEVENT => send_event(&mut stream, &daemon).await?,
_ => error!("Skipping unhandled msg_type: {msg_type}")
_ => error!(target: "evgrd", "Skipping unhandled msg_type: {msg_type}")
}
}
}
@@ -200,7 +208,7 @@ async fn fetch_events(stream: &mut Box<dyn PtStream>, daemon: &Daemon) -> Result
info!(target: "evgrd", "Fetching events {fetchevs:?}");
let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?;
//info!("fetched {events:?}");
//info!(target: "evgrd", "fetched {events:?}");
for event in events {
MSG_EVENT.encode_async(stream).await?;
@@ -250,7 +258,7 @@ async fn send_event(stream: &mut Box<dyn PtStream>, daemon: &Daemon) -> Result<(
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!("Starting evgrd node");
info!(target: "evgrd", "Starting evgrd node");
// Create datastore path if not there already.
let datastore = expand_path(&args.datastore)?;
@@ -259,7 +267,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
let replay_datastore = expand_path(&args.replay_datastore)?;
let replay_mode = args.replay_mode;
info!("Instantiating event DAG");
info!(target: "evgrd", "Instantiating event DAG");
let sled_db = sled::open(datastore)?;
let mut p2p_settings: darkfi::net::Settings = args.net.into();
p2p_settings.app_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap();
@@ -284,7 +292,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
let prune_task = event_graph.prune_task.get().unwrap();
info!("Registering EventGraph P2P protocol");
info!(target: "evgrd", "Registering EventGraph P2P protocol");
let event_graph_ = Arc::clone(&event_graph);
let registry = p2p.protocol_registry();
registry
@@ -294,7 +302,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
})
.await;
info!("Starting dnet subs task");
info!(target: "evgrd", "Starting dnet subs task");
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let dnet_sub_ = dnet_sub.clone();
let p2p_ = p2p.clone();
@@ -304,7 +312,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
let dnet_sub = p2p_.dnet_subscribe().await;
loop {
let event = dnet_sub.receive().await;
debug!("Got dnet event: {:?}", event);
debug!(target: "evgrd", "Got dnet event: {:?}", event);
dnet_sub_.notify(vec![event.into()].into()).await;
}
},
@@ -318,7 +326,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
info!("Starting deg subs task");
info!(target: "evgrd", "Starting deg subs task");
let deg_sub = JsonSubscriber::new("deg.subscribe_events");
let deg_sub_ = deg_sub.clone();
let event_graph_ = event_graph.clone();
@@ -328,7 +336,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
let deg_sub = event_graph_.deg_subscribe().await;
loop {
let event = deg_sub.receive().await;
debug!("Got deg event: {:?}", event);
debug!(target: "evgrd", "Got deg event: {:?}", event);
deg_sub_.notify(vec![event.into()].into()).await;
}
},
@@ -342,7 +350,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
info!("Starting JSON-RPC server");
info!(target: "evgrd", "Starting JSON-RPC server");
let daemon = Arc::new(Daemon::new(
p2p.clone(),
//sled_db.clone(),
@@ -360,14 +368,14 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => daemon_.stop_connections().await,
Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
Err(e) => error!(target: "evgrd", "Failed stopping JSON-RPC server: {}", e),
}
},
Error::RpcServerStopped,
ex.clone(),
);
info!("Starting evgrd server");
info!(target: "evgrd", "Starting evgrd server");
let mut rpc_tasks = vec![];
for listen_url in args.daemon_listen {
let listener = Listener::new(listen_url, None).await?;
@@ -380,7 +388,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
match res {
Ok(()) => panic!("Acceptor task should never complete without error status"),
//Err(Error::RpcServerStopped) => daemon_.stop_connections().await,
Err(e) => error!("Failed stopping RPC server: {}", e),
Err(e) => error!(target: "evgrd", "Failed stopping RPC server: {}", e),
}
},
Error::RpcServerStopped,
@@ -389,27 +397,27 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
rpc_tasks.push(rpc_task);
}
info!("Starting P2P network");
info!(target: "evgrd", "Starting P2P network");
p2p.clone().start().await?;
info!("Waiting for some P2P connections...");
info!(target: "evgrd", "Waiting for some P2P connections...");
sleep(5).await;
// We'll attempt to sync {sync_attempts} times
if !args.skip_dag_sync {
for i in 1..=args.sync_attempts {
info!("Syncing event DAG (attempt #{})", i);
info!(target: "evgrd", "Syncing event DAG (attempt #{})", i);
match event_graph.dag_sync().await {
Ok(()) => break,
Err(e) => {
if i == args.sync_attempts {
error!("Failed syncing DAG. Exiting.");
error!(target: "evgrd", "Failed syncing DAG. Exiting.");
p2p.stop().await;
return Err(Error::DagSyncFailed)
} else {
// TODO: Maybe at this point we should prune or something?
// TODO: Or maybe just tell the user to delete the DAG from FS.
error!("Failed syncing DAG ({}), retrying in {}s...", e, args.sync_timeout);
error!(target: "evgrd", "Failed syncing DAG ({}), retrying in {}s...", e, args.sync_timeout);
sleep(args.sync_timeout.into()).await;
}
}
@@ -422,25 +430,25 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
info!("Caught termination signal, cleaning up and exiting...");
info!(target: "evgrd", "Caught termination signal, cleaning up and exiting...");
info!("Stopping P2P network");
info!(target: "evgrd", "Stopping P2P network");
p2p.stop().await;
info!("Stopping RPC server");
info!(target: "evgrd", "Stopping RPC server");
for rpc_task in rpc_tasks {
rpc_task.stop().await;
}
dnet_task.stop().await;
deg_task.stop().await;
info!("Stopping IRC server");
info!(target: "evgrd", "Stopping IRC server");
prune_task.stop().await;
info!("Flushing sled database...");
info!(target: "evgrd", "Flushing sled database...");
let flushed_bytes = sled_db.flush_async().await?;
info!("Flushed {} bytes", flushed_bytes);
info!(target: "evgrd", "Flushed {} bytes", flushed_bytes);
info!("Shut down successfully");
info!(target: "evgrd", "Shut down successfully");
Ok(())
}