From 804c3e75495fdd29a36440feb339655523a05f53 Mon Sep 17 00:00:00 2001 From: darkfi Date: Sun, 22 Sep 2024 17:00:43 +0200 Subject: [PATCH] wallet: darkirc backend broadcast p2p events --- bin/darkwallet/src/darkirc2.rs | 13 +++++++++---- script/evgrd/bin/evgrd.rs | 8 +++++--- src/event_graph/event.rs | 7 +------ 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/bin/darkwallet/src/darkirc2.rs b/bin/darkwallet/src/darkirc2.rs index 362bcbbeb..5015f3774 100644 --- a/bin/darkwallet/src/darkirc2.rs +++ b/bin/darkwallet/src/darkirc2.rs @@ -26,7 +26,7 @@ use darkfi::{ }; use darkfi_serial::{ async_trait, deserialize_async_partial, AsyncDecodable, AsyncEncodable, Encodable, - SerialDecodable, SerialEncodable, + SerialDecodable, SerialEncodable, serialize_async, }; use evgrd::{ FetchEventsMessage, LocalEventGraph, LocalEventGraphPtr, VersionMessage, MSG_EVENT, @@ -136,6 +136,7 @@ impl LocalDarkIRC { pub async fn start(self: Arc, ex: ExecutorPtr) -> Result<()> { debug!(target: "darkirc", "LocalDarkIRC::start()"); + //self.reconnect().await?; self.version_exchange().await?; let me = Arc::downgrade(&self); @@ -164,8 +165,6 @@ impl LocalDarkIRC { assert!(tasks.is_empty()); *tasks = vec![recv_task, send_task]; - self.is_connected.store(true, Ordering::Relaxed); - Ok(()) } @@ -190,11 +189,14 @@ impl LocalDarkIRC { MSG_FETCHEVENTS.encode_async(writer).await?; fetchevs.encode_async(writer).await?; + self.is_connected.store(true, Ordering::Relaxed); + Ok(()) } async fn send_msg(&self, timestamp: u64, msg: Privmsg) -> Result<()> { if !self.is_connected.load(Ordering::Relaxed) { + debug!(target: "darkirc", "send_msg: not connected, reconnecting..."); self.reconnect().await?; } @@ -203,7 +205,10 @@ impl LocalDarkIRC { MSG_SENDEVENT.encode_async(writer).await?; timestamp.encode_async(writer).await?; - msg.encode_async(writer).await?; + + let content: Vec = serialize_async(&msg).await; + content.encode_async(writer).await?; + Ok(()) } diff --git a/script/evgrd/bin/evgrd.rs b/script/evgrd/bin/evgrd.rs index 7e229919a..be6c24700 100644 --- a/script/evgrd/bin/evgrd.rs +++ b/script/evgrd/bin/evgrd.rs @@ -18,7 +18,7 @@ use darkfi::{ async_daemonize, cli_desc, - event_graph::{proto::ProtocolEventGraph, Event, EventGraph, EventGraphPtr}, + event_graph::{proto::{ProtocolEventGraph, EventPut}, Event, EventGraph, EventGraphPtr}, net::{ session::SESSION_DEFAULT, settings::SettingsOpt as NetSettingsOpt, @@ -197,7 +197,7 @@ async fn fetch_events(stream: &mut Box, daemon: &Daemon) -> Result info!(target: "evgrd", "Fetching events {fetchevs:?}"); let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?; - info!("fetched {events:?}"); + //info!("fetched {events:?}"); for event in events { MSG_EVENT.encode_async(stream).await?; @@ -209,11 +209,13 @@ async fn fetch_events(stream: &mut Box, daemon: &Daemon) -> Result async fn send_event(stream: &mut Box, daemon: &Daemon) -> Result<()> { let timestamp = u64::decode_async(stream).await?; let content = Vec::::decode_async(stream).await?; - info!("send_event: {timestamp}, {content:?}"); + info!(target: "evgrd", "send_event: {timestamp}, {content:?}"); let event = Event::with_timestamp(timestamp, content, &daemon.event_graph).await; daemon.event_graph.dag_insert(&[event.clone()]).await.unwrap(); + daemon.p2p.broadcast(&EventPut(event)).await; + Ok(()) } diff --git a/src/event_graph/event.rs b/src/event_graph/event.rs index b0d5a04c0..c0a7bb935 100644 --- a/src/event_graph/event.rs +++ b/src/event_graph/event.rs @@ -60,12 +60,7 @@ impl Event { /// Same as `Event::new()` but allows specifying the timestamp explicitly. pub async fn with_timestamp(timestamp: u64, data: Vec, event_graph: &EventGraph) -> Self { let (layer, parents) = event_graph.get_next_layer_with_parents().await; - Self { - timestamp, - content: data, - parents, - layer, - } + Self { timestamp, content: data, parents, layer } } /// Hash the [`Event`] to retrieve its ID