diff --git a/bin/darkwallet/Cargo.toml b/bin/darkwallet/Cargo.toml index 3d3a75f4a..709d55cd8 100644 --- a/bin/darkwallet/Cargo.toml +++ b/bin/darkwallet/Cargo.toml @@ -37,7 +37,9 @@ futures = "0.3.30" async-recursion = "1.1.1" colored = "2.1.0" #rustpython-vm = "0.3.1" -sled = "0.34.7" +sled = "0.34" +url = "2.5" +semver = "1.0" [patch.crates-io] freetype-rs = { git = "https://github.com/narodnik/freetype-rs" } diff --git a/bin/darkwallet/src/main.rs b/bin/darkwallet/src/main.rs index 9323c10f7..0e73b4dbf 100644 --- a/bin/darkwallet/src/main.rs +++ b/bin/darkwallet/src/main.rs @@ -27,6 +27,22 @@ use async_lock::Mutex; use std::sync::{mpsc, Arc}; +use darkfi::{ + async_daemonize, cli_desc, + event_graph::{self, proto::ProtocolEventGraph, EventGraph, EventGraphPtr}, + net::{session::SESSION_DEFAULT, settings::Settings as NetSettings, P2p, P2pPtr}, + rpc::{ + jsonrpc::JsonSubscriber, + server::{listen_and_serve, RequestHandler}, + }, + system::{sleep, sleep_forever, StoppableTask, StoppableTaskPtr, Subscription}, + util::path::{expand_path, get_config_path}, + Error, Result, +}; +use darkfi_serial::{ + async_trait, deserialize_async, AsyncDecodable, SerialDecodable, SerialEncodable, +}; + #[macro_use] extern crate log; #[allow(unused_imports)] @@ -65,13 +81,140 @@ fn panic_hook(panic_info: &std::panic::PanicInfo) { std::process::exit(1); } +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] +pub struct Privmsg { + pub channel: String, + pub nick: String, + pub msg: String, +} + +async fn print_evs(ev_sub: Subscription) { + loop { + let ev = ev_sub.receive().await; + + // Try to deserialize the `Event`'s content into a `Privmsg` + let mut privmsg: Privmsg = match deserialize_async(ev.content()).await { + Ok(v) => v, + Err(e) => { + error!("[IRC CLIENT] Failed deserializing incoming Privmsg event: {}", e); + continue + } + }; + + info!("ev_id={:?}", ev.id()); + info!("ev: {:?}", ev); + info!("privmsg: {:?}", privmsg); + info!(""); + } +} + +async fn realmain(ex: ExecutorPtr) -> darkfi::Result<()> { + let sled_db = sled::open("evgrdb")?; + + let mut p2p_settings: NetSettings = Default::default(); + p2p_settings.app_version = semver::Version::parse("0.5.0").unwrap(); + p2p_settings.seeds.push(url::Url::parse("tcp+tls://lilith1.dark.fi:5262").unwrap()); + + let p2p = P2p::new(p2p_settings, ex.clone()).await?; + + let event_graph = EventGraph::new( + p2p.clone(), + sled_db.clone(), + std::path::PathBuf::new(), + false, + "darkirc_dag", + 1, + ex.clone(), + ) + .await?; + + let prune_task = event_graph.prune_task.get().unwrap(); + + info!("Registering EventGraph P2P protocol"); + let event_graph_ = Arc::clone(&event_graph); + let registry = p2p.protocol_registry(); + registry + .register(SESSION_DEFAULT, move |channel, _| { + let event_graph_ = event_graph_.clone(); + async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() } + }) + .await; + + let ev_sub = event_graph.event_pub.clone().subscribe().await; + let ev_task = ex.spawn(print_evs(ev_sub)); + + info!("Starting P2P network"); + p2p.clone().start().await?; + + info!("Waiting for some P2P connections..."); + sleep(5).await; + + // We'll attempt to sync {sync_attempts} times + let sync_attempts = 4; + for i in 1..=sync_attempts { + info!("Syncing event DAG (attempt #{})", i); + match event_graph.dag_sync().await { + Ok(()) => break, + Err(e) => { + if i == sync_attempts { + error!("Failed syncing DAG. Exiting."); + p2p.stop().await; + return Err(Error::DagSyncFailed) + } else { + // TODO: Maybe at this point we should prune or something? + // TODO: Or maybe just tell the user to delete the DAG from FS. + error!("Failed syncing DAG ({}), retrying in {}s...", e, 4); + sleep(4).await; + } + } + } + } + + sleep_forever().await; + // Signal handling for graceful termination. + //let (signals_handler, signals_task) = SignalHandler::new(ex)?; + //signals_handler.wait_termination(signals_task).await?; + info!("Caught termination signal, cleaning up and exiting..."); + + info!("Stopping P2P network"); + p2p.stop().await; + + info!("Stopping IRC server"); + prune_task.stop().await; + + info!("Flushing sled database..."); + let flushed_bytes = sled_db.flush_async().await?; + info!("Flushed {} bytes", flushed_bytes); + + info!("Shut down successfully"); + + Ok(()) +} + fn newmain() { + simplelog::TermLogger::init( + simplelog::LevelFilter::Info, + simplelog::Config::default(), + simplelog::TerminalMode::Stdout, + simplelog::ColorChoice::Auto, + ) + .unwrap(); + let ex = Arc::new(smol::Executor::new()); - smol::block_on(async { - let mut p2p_settings: darkfi::net::Settings = Default::default(); - //p2p_settings.app_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap(); - let p2p = darkfi::net::P2p::new(p2p_settings, ex.clone()).await.unwrap(); - }); + let n_threads = std::thread::available_parallelism().unwrap().get(); + let ex = std::sync::Arc::new(smol::Executor::new()); + let (signal, shutdown) = smol::channel::unbounded::<()>(); + let (_, result) = easy_parallel::Parallel::new() + // Run four executor threads + .each(0..n_threads, |_| smol::future::block_on(ex.run(shutdown.recv()))) + // Run the main future on the current thread. + .finish(|| { + smol::future::block_on(async { + realmain(ex.clone()).await?; + drop(signal); + Ok::<(), darkfi::Error>(()) + }) + }); } fn main() {