diff --git a/Cargo.lock b/Cargo.lock index 861d68227..031b51dfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5445,6 +5445,37 @@ version = "0.12.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d0e916b1148c8e263850e1ebcbd046f333e0683c724876bb0da63ea4373dc8a" +[[package]] +name = "taud" +version = "0.4.1" +dependencies = [ + "async-trait", + "blake3", + "bs58", + "chrono", + "crypto_box", + "darkfi", + "darkfi-serial", + "easy-parallel", + "futures", + "hex", + "libc", + "log", + "rand 0.8.5", + "serde", + "signal-hook", + "signal-hook-async-std", + "simplelog", + "sled", + "smol", + "structopt", + "structopt-toml", + "thiserror", + "tinyjson", + "toml 0.7.6", + "url", +] + [[package]] name = "tempfile" version = "3.8.0" diff --git a/Cargo.toml b/Cargo.toml index 571b818c9..3f3297f5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ members = [ #"bin/genev/genevd", #"bin/genev/genev-cli", "bin/darkirc", - #"bin/tau/taud", + "bin/tau/taud", #"bin/tau/tau-cli", "bin/vanityaddr", "bin/lilith", diff --git a/bin/tau/taud/Cargo.toml b/bin/tau/taud/Cargo.toml index 14414913a..2e4518254 100644 --- a/bin/tau/taud/Cargo.toml +++ b/bin/tau/taud/Cargo.toml @@ -17,22 +17,35 @@ name = "taud" path = "src/main.rs" [dependencies] -darkfi = {path = "../../../", features = ["bs58", "async-daemonize", "event-graph", "rpc"]} -darkfi-serial = {path = "../../../src/serial"} +darkfi = { path = "../../../", features = [ + "bs58", + "async-daemonize", + "event-graph", + "rpc", +] } +darkfi-serial = { path = "../../../src/serial" } + +# Event Graph DB +sled = "0.34.7" + +# Crypto +blake3 = "1.4.1" +crypto_box = { version = "0.9.1", features = ["std", "chacha20"] } +rand = "0.8.5" + +# Encoding and parsing +bs58 = "0.5.0" +hex = "0.4.3" +toml = "0.7.6" # Misc async-trait = "0.1.73" -bs58 = "0.5.0" chrono = "0.4.26" -crypto_box = {version = "0.9.1", features = ["std", "chacha20"]} futures = "0.3.28" -hex = "0.4.3" libc = "0.2.147" log = "0.4.20" -rand = "0.8.5" thiserror = "1.0.47" tinyjson = "2.5.1" -toml = "0.7.6" url = "2.4.1" # Daemon @@ -43,6 +56,6 @@ simplelog = "0.12.1" smol = "1.3.0" # Argument parsing -serde = {version = "1.0.188", features = ["derive"]} +serde = { version = "1.0.188", features = ["derive"] } structopt = "0.3.26" structopt-toml = "0.5.1" diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 1bf6f284d..d4b10951f 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -23,38 +23,38 @@ use std::{ fs::{create_dir_all, remove_dir_all}, io::{stdin, Write}, path::Path, - sync::Arc, + sync::{Arc, OnceLock}, }; use crypto_box::{ aead::{Aead, AeadCore}, ChaChaBox, SecretKey, }; -use darkfi_serial::{async_trait, deserialize, serialize, SerialDecodable, SerialEncodable}; +use darkfi_serial::{ + async_trait, deserialize, deserialize_async_partial, serialize, serialize_async, + SerialDecodable, SerialEncodable, +}; use futures::{select, FutureExt}; use libc::mkfifo; use log::{debug, error, info}; use rand::rngs::OsRng; -use smol::{lock::Mutex, stream::StreamExt}; +use smol::{fs, lock::RwLock, stream::StreamExt}; use structopt_toml::StructOptToml; use tinyjson::JsonValue; use darkfi::{ async_daemonize, event_graph::{ - events_queue::EventsQueue, - model::{Event, EventId, Model, ModelPtr}, - protocol_event::{ProtocolEvent, Seen, SeenPtr}, - view::{View, ViewPtr}, - EventMsg, + proto::{EventPut, ProtocolEventGraph}, + Event, EventGraph, EventGraphPtr, NULL_ID, }, - net::{self, P2pPtr}, + net::{P2p, P2pPtr, SESSION_ALL}, rpc::{ jsonrpc::JsonSubscriber, server::{listen_and_serve, RequestHandler}, }, system::StoppableTask, - util::{path::expand_path, time::Timestamp}, + util::path::expand_path, Error, Result, }; @@ -98,12 +98,6 @@ pub struct EncryptedTask { payload: String, } -impl EventMsg for EncryptedTask { - fn new() -> Self { - Self { payload: String::from("root") } - } -} - fn encrypt_task( task: &TaskInfo, chacha_box: &ChaChaBox, @@ -155,18 +149,19 @@ fn try_decrypt_task(encrypt_task: &EncryptedTask, chacha_box: &ChaChaBox) -> Tau #[allow(clippy::too_many_arguments)] async fn start_sync_loop( + event_graph: EventGraphPtr, broadcast_rcv: smol::channel::Receiver, - view: ViewPtr, - model: ModelPtr, - seen: SeenPtr, workspaces: Arc>, datastore_path: std::path::PathBuf, - missed_events: Arc>>>, piped: bool, p2p: P2pPtr, + sled: sled::Db, + last_sent: RwLock, + seen: OnceLock, ) -> TaudResult<()> { + let incoming = event_graph.event_sub.clone().subscribe().await; + seen.set(sled.open_tree("tau_db").unwrap()).unwrap(); loop { - let mut v = view.lock().await; select! { task_event = broadcast_rcv.recv().fuse() => { let tk = task_event.map_err(Error::from)?; @@ -174,25 +169,46 @@ async fn start_sync_loop( let chacha_box = workspaces.get(&tk.workspace).unwrap(); let encrypted_task = encrypt_task(&tk, chacha_box, &mut OsRng)?; info!(target: "tau", "Send the task: ref: {}", tk.ref_id); - let event = Event { - previous_event_hash: model.lock().await.get_head_hash().map_err(Error::from)?, - action: encrypted_task, - timestamp: Timestamp::current_time(), - }; + // Build a DAG event and return it. + let event = Event::new( + serialize_async(&encrypted_task).await, + event_graph.clone(), + ) + .await; + // Update the last sent event. + // let event_id = event.id(); + // *last_sent.write().await = event_id; - p2p.broadcast(&event).await; + // If it fails for some reason, for now, we just note it + // and pass. + if let Err(e) = event_graph.dag_insert(event.clone()).await { + error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e); + } else { + // We sent this, so it should be considered seen. + // TODO: should we save task on send or on receive? + // on receive better because it's garanteed your event is out there + // debug!("Marking event {} as seen", event_id); + // seen.get().unwrap().insert(event_id.as_bytes(), &[]).unwrap(); + // Otherwise, broadcast it + p2p.broadcast(&EventPut(event)).await; + } } } - task_event = v.process().fuse() => { - let event = task_event.map_err(Error::from)?; - if !seen.push(&event.hash()).await { + task_event = incoming.receive().fuse() => { + let event_id = task_event.id(); + if *last_sent.read().await == event_id { continue } - - missed_events.lock().await.push(event.clone()); - - on_receive_task(&event.action, &datastore_path, &workspaces, piped) + // Try to deserialize the `Event`'s content into a `Privmsg` + let enc_task: EncryptedTask = match deserialize_async_partial(task_event.content()).await { + Ok((v, _)) => v, + Err(e) => { + error!("[TAUD] Failed deserializing incoming EncryptedTask event: {}", e); + continue + } + }; + on_receive_task(&enc_task, &datastore_path, &workspaces, piped) .await?; } } @@ -326,63 +342,51 @@ async fn realmain(settings: Args, executor: Arc>) -> Res return Ok(()) } - //////////////////// - // Initialize the base structures - //////////////////// - let events_queue = EventsQueue::::new(); - let model = Arc::new(Mutex::new(Model::new(events_queue.clone()))); - let view = Arc::new(Mutex::new(View::new(events_queue))); - let model_clone = model.clone(); + info!("Initializing taud node"); - model.lock().await.load_tree(&datastore_path)?; + // Create datastore path if not there already. + let datastore = expand_path(&settings.datastore)?; + fs::create_dir_all(&datastore).await?; - //////////////////// - // Buffers - //////////////////// - let seen_event = Seen::new(); - let seen_inv = Seen::new(); + info!("Instantiating event DAG"); + let sled_db = sled::open(datastore)?; + let p2p = P2p::new(settings.net.into(), executor.clone()).await; + let event_graph = + EventGraph::new(p2p.clone(), sled_db.clone(), "darkirc_dag", 1, executor.clone()).await?; - let (broadcast_snd, broadcast_rcv) = smol::channel::unbounded::(); - - // - // P2p setup - // - let net_settings = settings.net.clone(); - - let p2p = net::P2p::new(net_settings.into(), executor.clone()).await; + info!("Registering EventGraph P2P protocol"); + let event_graph_ = Arc::clone(&event_graph); let registry = p2p.protocol_registry(); - registry - .register(net::SESSION_ALL, move |channel, p2p| { - let seen_event = seen_event.clone(); - let seen_inv = seen_inv.clone(); - let model = model.clone(); - async move { ProtocolEvent::init(channel, p2p, model, seen_event, seen_inv).await } + .register(SESSION_ALL, move |channel, _| { + let event_graph_ = event_graph_.clone(); + async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() } }) .await; + let (broadcast_snd, broadcast_rcv) = smol::channel::unbounded::(); + info!(target: "taud", "Starting P2P network"); p2p.clone().start().await?; //////////////////// // Listner //////////////////// - let seen_ids = Seen::new(); - let missed_events = Arc::new(Mutex::new(vec![])); - info!(target: "taud", "Starting sync loop task"); + let last_sent = RwLock::new(NULL_ID); + let seen = OnceLock::new(); let sync_loop_task = StoppableTask::new(); sync_loop_task.clone().start( start_sync_loop( + event_graph.clone(), broadcast_rcv, - view, - model_clone.clone(), - seen_ids, workspaces.clone(), datastore_path.clone(), - missed_events, settings.piped, p2p.clone(), + sled_db.clone(), + last_sent, + seen, ), |res| async { match res { @@ -458,8 +462,6 @@ async fn realmain(settings: Args, executor: Arc>) -> Res info!(target: "taud", "Stopping sync loop task..."); sync_loop_task.stop().await; - model_clone.lock().await.save_tree(&datastore_path)?; - p2p.stop().await; Ok(())