From 757f466ca3c820700d2adc193337f568cdf4f25a Mon Sep 17 00:00:00 2001 From: darkfi Date: Fri, 6 Sep 2024 11:32:05 +0200 Subject: [PATCH] added evgrd --- Cargo.lock | 34 +++++ Cargo.toml | 2 +- bin/evgrd/Cargo.toml | 64 ++++++++++ bin/evgrd/evgrd.toml | 0 bin/evgrd/src/main.rs | 286 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 385 insertions(+), 1 deletion(-) create mode 100644 bin/evgrd/Cargo.toml create mode 100644 bin/evgrd/evgrd.toml create mode 100644 bin/evgrd/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index c10d04a25..02a2a2c90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3014,6 +3014,40 @@ dependencies = [ "pin-project-lite 0.2.14", ] +[[package]] +name = "evgrd" +version = "0.5.0" +dependencies = [ + "async-trait", + "bcrypt", + "blake3 1.5.4", + "bs58", + "crypto_box", + "darkfi", + "darkfi-sdk", + "darkfi-serial", + "easy-parallel", + "futures", + "futures-rustls", + "libc", + "log", + "openssl", + "rand 0.8.5", + "rusqlite", + "rustls-pemfile", + "semver 1.0.23", + "serde", + "signal-hook", + "signal-hook-async-std", + "simplelog", + "sled-overlay", + "smol", + "structopt", + "structopt-toml", + "toml 0.8.19", + "url", +] + [[package]] name = "fallible-iterator" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 0df03abed..804761113 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ members = [ "src/contract/dao", "src/contract/deployooor", - "example/dchat/dchatd", + "example/dchat/dchatd", "bin/evgrd", ] [dependencies] diff --git a/bin/evgrd/Cargo.toml b/bin/evgrd/Cargo.toml new file mode 100644 index 000000000..b3724fbd1 --- /dev/null +++ b/bin/evgrd/Cargo.toml @@ -0,0 +1,64 @@ +[package] +name = "evgrd" +description = "Event graph daemon" +version = "0.5.0" +edition = "2021" +authors = ["Dyne.org foundation "] +license = "AGPL-3.0-only" +homepage = "https://dark.fi" +repository = "https://codeberg.org/darkrenaissance/darkfi" + +[[bin]] +name = "evgrd" +path = "src/main.rs" + +[dependencies] +darkfi = {path = "../../", features = ["async-daemonize", "event-graph", "rpc", "zk"]} +darkfi-sdk = {path = "../../src/sdk", features = ["async"]} +darkfi-serial = {path = "../../src/serial", features = ["async"]} +libc = "0.2.158" + +# Event Graph DB +sled-overlay = "0.1.3" + +# TLS +async-trait = "0.1.81" +futures = "0.3.30" +futures-rustls = {version = "0.26.0", default-features = false, features = ["logging", "tls12", "ring"]} +rustls-pemfile = "2.1.3" + +# Crypto +blake3 = "1.5.4" +bcrypt = "0.15.1" +crypto_box = {version = "0.9.1", features = ["std", "chacha20"]} +rand = "0.8.5" + +# Misc +log = "0.4.22" +url = "2.5.2" + +# Encoding and parsing +bs58 = "0.5.1" +toml = "0.8.19" +semver = "1.0.23" + +# Daemon +easy-parallel = "3.3.1" +signal-hook-async-std = "0.2.2" +signal-hook = "0.3.17" +simplelog = "0.12.2" +smol = "2.0.1" + +# Argument parsing +serde = {version = "1.0.209", features = ["derive"]} +structopt = "0.3.26" +structopt-toml = "0.5.1" + +# See https://github.com/rust-mobile/android-rs-glue/issues/193 +[target.aarch64-linux-android.dependencies] +openssl = { version = "*", features = ["vendored"] } +rusqlite = {version = "0.31.0", features = ["bundled"]} + +[lints] +workspace = true + diff --git a/bin/evgrd/evgrd.toml b/bin/evgrd/evgrd.toml new file mode 100644 index 000000000..e69de29bb diff --git a/bin/evgrd/src/main.rs b/bin/evgrd/src/main.rs new file mode 100644 index 000000000..2108f05d6 --- /dev/null +++ b/bin/evgrd/src/main.rs @@ -0,0 +1,286 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2024 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::sync::{Arc, Mutex as SyncMutex}; +use darkfi::{ + async_daemonize, cli_desc, + event_graph::{proto::ProtocolEventGraph, EventGraph, EventGraphPtr}, + net::{session::SESSION_DEFAULT, settings::SettingsOpt as NetSettingsOpt, P2p, P2pPtr}, + rpc::{ + jsonrpc::JsonSubscriber, + server::{listen_and_serve, RequestHandler}, + }, + system::{sleep, StoppableTask, StoppableTaskPtr}, + util::path::{expand_path, get_config_path}, + Error, Result, +}; +use darkfi_serial::{ + async_trait, deserialize_async, serialize_async, Encodable, SerialDecodable, SerialEncodable, +}; +use log::{debug, error, info}; +use rand::rngs::OsRng; +use sled_overlay::sled; +use smol::{fs, lock::Mutex, stream::StreamExt, Executor}; +use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; +use url::Url; + +const CONFIG_FILE: &str = "evgrd.toml"; +const CONFIG_FILE_CONTENTS: &str = include_str!("../evgrd.toml"); + +#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] +#[serde(default)] +#[structopt(name = "evgrd", about = cli_desc!())] +struct Args { + #[structopt(short, parse(from_occurrences))] + /// Increase verbosity (-vvv supported) + verbose: u8, + + #[structopt(short, long)] + /// Configuration file to use + config: Option, + + #[structopt(long)] + /// Set log file output + log: Option, + + #[structopt(long, default_value = "tcp://127.0.0.1:26880")] + /// RPC server listen address + rpc_listen: Url, + + #[structopt(short, long, default_value = "~/.local/darkfi/evgrd_db")] + /// Datastore (DB) path + datastore: String, + + #[structopt(short, long, default_value = "~/.local/darkfi/replayed_evgrd_db")] + /// Replay logs (DB) path + replay_datastore: String, + + /// Flag to store Sled DB instructions + #[structopt(long)] + replay_mode: bool, + + /// Flag to skip syncing the DAG (no history). + #[structopt(long)] + skip_dag_sync: bool, + + /// Number of attempts to sync the DAG. + #[structopt(long, default_value = "5")] + sync_attempts: u8, + + /// Number of seconds to wait before trying again if sync fails. + #[structopt(long, default_value = "10")] + sync_timeout: u8, + + /// P2P network settings + #[structopt(flatten)] + net: NetSettingsOpt, +} + +async_daemonize!(realmain); +async fn realmain(args: Args, ex: Arc>) -> Result<()> { + info!("Starting evgrd node"); + + // Create datastore path if not there already. + let datastore = expand_path(&args.datastore)?; + fs::create_dir_all(&datastore).await?; + + /* + let replay_datastore = expand_path(&args.replay_datastore)?; + let replay_mode = args.replay_mode; + + info!("Instantiating event DAG"); + let sled_db = sled::open(datastore)?; + let mut p2p_settings: darkfi::net::Settings = args.net.into(); + p2p_settings.app_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap(); + let p2p = P2p::new(p2p_settings, ex.clone()).await?; + let event_graph = EventGraph::new( + p2p.clone(), + sled_db.clone(), + replay_datastore.clone(), + replay_mode, + "darkirc_dag", + 1, + ex.clone(), + ) + .await?; + + let prune_task = event_graph.prune_task.get().unwrap(); + + info!("Registering EventGraph P2P protocol"); + let event_graph_ = Arc::clone(&event_graph); + let registry = p2p.protocol_registry(); + registry + .register(SESSION_DEFAULT, move |channel, _| { + let event_graph_ = event_graph_.clone(); + async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() } + }) + .await; + + info!("Starting dnet subs task"); + let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); + let dnet_sub_ = dnet_sub.clone(); + let p2p_ = p2p.clone(); + let dnet_task = StoppableTask::new(); + dnet_task.clone().start( + async move { + let dnet_sub = p2p_.dnet_subscribe().await; + loop { + let event = dnet_sub.receive().await; + debug!("Got dnet event: {:?}", event); + dnet_sub_.notify(vec![event.into()].into()).await; + } + }, + |res| async { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => panic!("{}", e), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + + info!("Starting deg subs task"); + let deg_sub = JsonSubscriber::new("deg.subscribe_events"); + let deg_sub_ = deg_sub.clone(); + let event_graph_ = event_graph.clone(); + let deg_task = StoppableTask::new(); + deg_task.clone().start( + async move { + let deg_sub = event_graph_.deg_subscribe().await; + loop { + let event = deg_sub.receive().await; + debug!("Got deg event: {:?}", event); + deg_sub_.notify(vec![event.into()].into()).await; + } + }, + |res| async { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => panic!("{}", e), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + + info!("Starting JSON-RPC server"); + let darkirc = Arc::new(DarkIrc::new( + p2p.clone(), + sled_db.clone(), + event_graph.clone(), + dnet_sub, + deg_sub, + replay_datastore.clone(), + )); + let darkirc_ = Arc::clone(&darkirc); + let rpc_task = StoppableTask::new(); + rpc_task.clone().start( + listen_and_serve(args.rpc_listen, darkirc.clone(), None, ex.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::RpcServerStopped) => darkirc_.stop_connections().await, + Err(e) => error!("Failed stopping JSON-RPC server: {}", e), + } + }, + Error::RpcServerStopped, + ex.clone(), + ); + + info!("Starting IRC server"); + let password = args.password.unwrap_or_default(); + let config_path = get_config_path(args.config, CONFIG_FILE)?; + let irc_server = IrcServer::new( + darkirc.clone(), + args.irc_listen, + args.irc_tls_cert, + args.irc_tls_secret, + config_path, + password, + ) + .await?; + + let irc_task = StoppableTask::new(); + let ex_ = ex.clone(); + irc_task.clone().start( + irc_server.clone().listen(ex_), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ } + Err(e) => error!("Failed stopping IRC server: {}", e), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + + info!("Starting P2P network"); + p2p.clone().start().await?; + + info!("Waiting for some P2P connections..."); + sleep(5).await; + + // We'll attempt to sync {sync_attempts} times + if !args.skip_dag_sync { + for i in 1..=args.sync_attempts { + info!("Syncing event DAG (attempt #{})", i); + match event_graph.dag_sync().await { + Ok(()) => break, + Err(e) => { + if i == args.sync_attempts { + error!("Failed syncing DAG. Exiting."); + p2p.stop().await; + return Err(Error::DagSyncFailed) + } else { + // TODO: Maybe at this point we should prune or something? + // TODO: Or maybe just tell the user to delete the DAG from FS. + error!("Failed syncing DAG ({}), retrying in {}s...", e, args.sync_timeout); + sleep(args.sync_timeout.into()).await; + } + } + } + } + } else { + *event_graph.synced.write().await = true; + } + + // Signal handling for graceful termination. + let (signals_handler, signals_task) = SignalHandler::new(ex)?; + signals_handler.wait_termination(signals_task).await?; + info!("Caught termination signal, cleaning up and exiting..."); + + info!("Stopping P2P network"); + p2p.stop().await; + + info!("Stopping JSON-RPC server"); + rpc_task.stop().await; + dnet_task.stop().await; + deg_task.stop().await; + + info!("Stopping IRC server"); + irc_task.stop().await; + prune_task.stop().await; + + info!("Flushing sled database..."); + let flushed_bytes = sled_db.flush_async().await?; + info!("Flushed {} bytes", flushed_bytes); + + info!("Shut down successfully"); + */ + Ok(()) +}