bin/tau: use new event_graph code

This commit is contained in:
Dastan-glitch
2023-10-13 15:21:15 +03:00
committed by parazyd
parent 359c914eef
commit 7a9604d968
4 changed files with 125 additions and 79 deletions

31
Cargo.lock generated
View File

@@ -5445,6 +5445,37 @@ version = "0.12.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d0e916b1148c8e263850e1ebcbd046f333e0683c724876bb0da63ea4373dc8a" checksum = "9d0e916b1148c8e263850e1ebcbd046f333e0683c724876bb0da63ea4373dc8a"
[[package]]
name = "taud"
version = "0.4.1"
dependencies = [
"async-trait",
"blake3",
"bs58",
"chrono",
"crypto_box",
"darkfi",
"darkfi-serial",
"easy-parallel",
"futures",
"hex",
"libc",
"log",
"rand 0.8.5",
"serde",
"signal-hook",
"signal-hook-async-std",
"simplelog",
"sled",
"smol",
"structopt",
"structopt-toml",
"thiserror",
"tinyjson",
"toml 0.7.6",
"url",
]
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.8.0" version = "3.8.0"

View File

@@ -30,7 +30,7 @@ members = [
#"bin/genev/genevd", #"bin/genev/genevd",
#"bin/genev/genev-cli", #"bin/genev/genev-cli",
"bin/darkirc", "bin/darkirc",
#"bin/tau/taud", "bin/tau/taud",
#"bin/tau/tau-cli", #"bin/tau/tau-cli",
"bin/vanityaddr", "bin/vanityaddr",
"bin/lilith", "bin/lilith",

View File

@@ -17,22 +17,35 @@ name = "taud"
path = "src/main.rs" path = "src/main.rs"
[dependencies] [dependencies]
darkfi = {path = "../../../", features = ["bs58", "async-daemonize", "event-graph", "rpc"]} darkfi = { path = "../../../", features = [
darkfi-serial = {path = "../../../src/serial"} "bs58",
"async-daemonize",
"event-graph",
"rpc",
] }
darkfi-serial = { path = "../../../src/serial" }
# Event Graph DB
sled = "0.34.7"
# Crypto
blake3 = "1.4.1"
crypto_box = { version = "0.9.1", features = ["std", "chacha20"] }
rand = "0.8.5"
# Encoding and parsing
bs58 = "0.5.0"
hex = "0.4.3"
toml = "0.7.6"
# Misc # Misc
async-trait = "0.1.73" async-trait = "0.1.73"
bs58 = "0.5.0"
chrono = "0.4.26" chrono = "0.4.26"
crypto_box = {version = "0.9.1", features = ["std", "chacha20"]}
futures = "0.3.28" futures = "0.3.28"
hex = "0.4.3"
libc = "0.2.147" libc = "0.2.147"
log = "0.4.20" log = "0.4.20"
rand = "0.8.5"
thiserror = "1.0.47" thiserror = "1.0.47"
tinyjson = "2.5.1" tinyjson = "2.5.1"
toml = "0.7.6"
url = "2.4.1" url = "2.4.1"
# Daemon # Daemon
@@ -43,6 +56,6 @@ simplelog = "0.12.1"
smol = "1.3.0" smol = "1.3.0"
# Argument parsing # Argument parsing
serde = {version = "1.0.188", features = ["derive"]} serde = { version = "1.0.188", features = ["derive"] }
structopt = "0.3.26" structopt = "0.3.26"
structopt-toml = "0.5.1" structopt-toml = "0.5.1"

View File

@@ -23,38 +23,38 @@ use std::{
fs::{create_dir_all, remove_dir_all}, fs::{create_dir_all, remove_dir_all},
io::{stdin, Write}, io::{stdin, Write},
path::Path, path::Path,
sync::Arc, sync::{Arc, OnceLock},
}; };
use crypto_box::{ use crypto_box::{
aead::{Aead, AeadCore}, aead::{Aead, AeadCore},
ChaChaBox, SecretKey, ChaChaBox, SecretKey,
}; };
use darkfi_serial::{async_trait, deserialize, serialize, SerialDecodable, SerialEncodable}; use darkfi_serial::{
async_trait, deserialize, deserialize_async_partial, serialize, serialize_async,
SerialDecodable, SerialEncodable,
};
use futures::{select, FutureExt}; use futures::{select, FutureExt};
use libc::mkfifo; use libc::mkfifo;
use log::{debug, error, info}; use log::{debug, error, info};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use smol::{lock::Mutex, stream::StreamExt}; use smol::{fs, lock::RwLock, stream::StreamExt};
use structopt_toml::StructOptToml; use structopt_toml::StructOptToml;
use tinyjson::JsonValue; use tinyjson::JsonValue;
use darkfi::{ use darkfi::{
async_daemonize, async_daemonize,
event_graph::{ event_graph::{
events_queue::EventsQueue, proto::{EventPut, ProtocolEventGraph},
model::{Event, EventId, Model, ModelPtr}, Event, EventGraph, EventGraphPtr, NULL_ID,
protocol_event::{ProtocolEvent, Seen, SeenPtr},
view::{View, ViewPtr},
EventMsg,
}, },
net::{self, P2pPtr}, net::{P2p, P2pPtr, SESSION_ALL},
rpc::{ rpc::{
jsonrpc::JsonSubscriber, jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler}, server::{listen_and_serve, RequestHandler},
}, },
system::StoppableTask, system::StoppableTask,
util::{path::expand_path, time::Timestamp}, util::path::expand_path,
Error, Result, Error, Result,
}; };
@@ -98,12 +98,6 @@ pub struct EncryptedTask {
payload: String, payload: String,
} }
impl EventMsg for EncryptedTask {
fn new() -> Self {
Self { payload: String::from("root") }
}
}
fn encrypt_task( fn encrypt_task(
task: &TaskInfo, task: &TaskInfo,
chacha_box: &ChaChaBox, chacha_box: &ChaChaBox,
@@ -155,18 +149,19 @@ fn try_decrypt_task(encrypt_task: &EncryptedTask, chacha_box: &ChaChaBox) -> Tau
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn start_sync_loop( async fn start_sync_loop(
event_graph: EventGraphPtr,
broadcast_rcv: smol::channel::Receiver<TaskInfo>, broadcast_rcv: smol::channel::Receiver<TaskInfo>,
view: ViewPtr<EncryptedTask>,
model: ModelPtr<EncryptedTask>,
seen: SeenPtr<EventId>,
workspaces: Arc<HashMap<String, ChaChaBox>>, workspaces: Arc<HashMap<String, ChaChaBox>>,
datastore_path: std::path::PathBuf, datastore_path: std::path::PathBuf,
missed_events: Arc<Mutex<Vec<Event<EncryptedTask>>>>,
piped: bool, piped: bool,
p2p: P2pPtr, p2p: P2pPtr,
sled: sled::Db,
last_sent: RwLock<blake3::Hash>,
seen: OnceLock<sled::Tree>,
) -> TaudResult<()> { ) -> TaudResult<()> {
let incoming = event_graph.event_sub.clone().subscribe().await;
seen.set(sled.open_tree("tau_db").unwrap()).unwrap();
loop { loop {
let mut v = view.lock().await;
select! { select! {
task_event = broadcast_rcv.recv().fuse() => { task_event = broadcast_rcv.recv().fuse() => {
let tk = task_event.map_err(Error::from)?; let tk = task_event.map_err(Error::from)?;
@@ -174,25 +169,46 @@ async fn start_sync_loop(
let chacha_box = workspaces.get(&tk.workspace).unwrap(); let chacha_box = workspaces.get(&tk.workspace).unwrap();
let encrypted_task = encrypt_task(&tk, chacha_box, &mut OsRng)?; let encrypted_task = encrypt_task(&tk, chacha_box, &mut OsRng)?;
info!(target: "tau", "Send the task: ref: {}", tk.ref_id); info!(target: "tau", "Send the task: ref: {}", tk.ref_id);
let event = Event { // Build a DAG event and return it.
previous_event_hash: model.lock().await.get_head_hash().map_err(Error::from)?, let event = Event::new(
action: encrypted_task, serialize_async(&encrypted_task).await,
timestamp: Timestamp::current_time(), event_graph.clone(),
}; )
.await;
// Update the last sent event.
// let event_id = event.id();
// *last_sent.write().await = event_id;
p2p.broadcast(&event).await; // If it fails for some reason, for now, we just note it
// and pass.
if let Err(e) = event_graph.dag_insert(event.clone()).await {
error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e);
} else {
// We sent this, so it should be considered seen.
// TODO: should we save task on send or on receive?
// on receive better because it's garanteed your event is out there
// debug!("Marking event {} as seen", event_id);
// seen.get().unwrap().insert(event_id.as_bytes(), &[]).unwrap();
// Otherwise, broadcast it
p2p.broadcast(&EventPut(event)).await;
}
} }
} }
task_event = v.process().fuse() => { task_event = incoming.receive().fuse() => {
let event = task_event.map_err(Error::from)?; let event_id = task_event.id();
if !seen.push(&event.hash()).await { if *last_sent.read().await == event_id {
continue continue
} }
// Try to deserialize the `Event`'s content into a `Privmsg`
missed_events.lock().await.push(event.clone()); let enc_task: EncryptedTask = match deserialize_async_partial(task_event.content()).await {
Ok((v, _)) => v,
on_receive_task(&event.action, &datastore_path, &workspaces, piped) Err(e) => {
error!("[TAUD] Failed deserializing incoming EncryptedTask event: {}", e);
continue
}
};
on_receive_task(&enc_task, &datastore_path, &workspaces, piped)
.await?; .await?;
} }
} }
@@ -326,63 +342,51 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
return Ok(()) return Ok(())
} }
//////////////////// info!("Initializing taud node");
// Initialize the base structures
////////////////////
let events_queue = EventsQueue::<EncryptedTask>::new();
let model = Arc::new(Mutex::new(Model::new(events_queue.clone())));
let view = Arc::new(Mutex::new(View::new(events_queue)));
let model_clone = model.clone();
model.lock().await.load_tree(&datastore_path)?; // Create datastore path if not there already.
let datastore = expand_path(&settings.datastore)?;
fs::create_dir_all(&datastore).await?;
//////////////////// info!("Instantiating event DAG");
// Buffers let sled_db = sled::open(datastore)?;
//////////////////// let p2p = P2p::new(settings.net.into(), executor.clone()).await;
let seen_event = Seen::new(); let event_graph =
let seen_inv = Seen::new(); EventGraph::new(p2p.clone(), sled_db.clone(), "darkirc_dag", 1, executor.clone()).await?;
let (broadcast_snd, broadcast_rcv) = smol::channel::unbounded::<TaskInfo>(); info!("Registering EventGraph P2P protocol");
let event_graph_ = Arc::clone(&event_graph);
//
// P2p setup
//
let net_settings = settings.net.clone();
let p2p = net::P2p::new(net_settings.into(), executor.clone()).await;
let registry = p2p.protocol_registry(); let registry = p2p.protocol_registry();
registry registry
.register(net::SESSION_ALL, move |channel, p2p| { .register(SESSION_ALL, move |channel, _| {
let seen_event = seen_event.clone(); let event_graph_ = event_graph_.clone();
let seen_inv = seen_inv.clone(); async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
let model = model.clone();
async move { ProtocolEvent::init(channel, p2p, model, seen_event, seen_inv).await }
}) })
.await; .await;
let (broadcast_snd, broadcast_rcv) = smol::channel::unbounded::<TaskInfo>();
info!(target: "taud", "Starting P2P network"); info!(target: "taud", "Starting P2P network");
p2p.clone().start().await?; p2p.clone().start().await?;
//////////////////// ////////////////////
// Listner // Listner
//////////////////// ////////////////////
let seen_ids = Seen::new();
let missed_events = Arc::new(Mutex::new(vec![]));
info!(target: "taud", "Starting sync loop task"); info!(target: "taud", "Starting sync loop task");
let last_sent = RwLock::new(NULL_ID);
let seen = OnceLock::new();
let sync_loop_task = StoppableTask::new(); let sync_loop_task = StoppableTask::new();
sync_loop_task.clone().start( sync_loop_task.clone().start(
start_sync_loop( start_sync_loop(
event_graph.clone(),
broadcast_rcv, broadcast_rcv,
view,
model_clone.clone(),
seen_ids,
workspaces.clone(), workspaces.clone(),
datastore_path.clone(), datastore_path.clone(),
missed_events,
settings.piped, settings.piped,
p2p.clone(), p2p.clone(),
sled_db.clone(),
last_sent,
seen,
), ),
|res| async { |res| async {
match res { match res {
@@ -458,8 +462,6 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
info!(target: "taud", "Stopping sync loop task..."); info!(target: "taud", "Stopping sync loop task...");
sync_loop_task.stop().await; sync_loop_task.stop().await;
model_clone.lock().await.save_tree(&datastore_path)?;
p2p.stop().await; p2p.stop().await;
Ok(()) Ok(())