From 5bce7a7500474ffe5511a911ab35b08a80afbbbf Mon Sep 17 00:00:00 2001 From: darkfi Date: Sun, 22 Sep 2024 12:32:03 +0200 Subject: [PATCH] wallet: structify the darkirc local backend so we can call .send(msg) on the stream it manages. --- bin/darkwallet/src/app/mod.rs | 6 +- bin/darkwallet/src/darkirc2.rs | 196 +++++++++++++++++++++++------ bin/darkwallet/src/expr/compile.rs | 12 ++ bin/darkwallet/src/main.rs | 21 ++-- bin/darkwallet/src/prop/mod.rs | 2 +- 5 files changed, 181 insertions(+), 56 deletions(-) diff --git a/bin/darkwallet/src/app/mod.rs b/bin/darkwallet/src/app/mod.rs index 711263f36..69aa2e2b3 100644 --- a/bin/darkwallet/src/app/mod.rs +++ b/bin/darkwallet/src/app/mod.rs @@ -29,6 +29,7 @@ use std::{ }; use crate::{ + darkirc2::LocalDarkIRCPtr, error::Error, expr::Op, gfx::{GraphicsEventPublisherPtr, RenderApiPtr, Vertex}, @@ -121,7 +122,7 @@ pub struct App { pub render_api: RenderApiPtr, pub event_pub: GraphicsEventPublisherPtr, pub text_shaper: TextShaperPtr, - //pub(self) darkirc_backend: DarkIrcBackendPtr, + pub darkirc_evgr: SyncMutex>, pub tasks: SyncMutex>>, pub ex: ExecutorPtr, } @@ -132,7 +133,6 @@ impl App { render_api: RenderApiPtr, event_pub: GraphicsEventPublisherPtr, text_shaper: TextShaperPtr, - //darkirc_backend: DarkIrcBackendPtr, ex: ExecutorPtr, ) -> Arc { Arc::new(Self { @@ -142,7 +142,7 @@ impl App { render_api, event_pub, text_shaper, - //darkirc_backend, + darkirc_evgr: SyncMutex::new(None), tasks: SyncMutex::new(vec![]), }) } diff --git a/bin/darkwallet/src/darkirc2.rs b/bin/darkwallet/src/darkirc2.rs index 55dd23780..f7ec83097 100644 --- a/bin/darkwallet/src/darkirc2.rs +++ b/bin/darkwallet/src/darkirc2.rs @@ -16,9 +16,10 @@ * along with this program. If not, see . */ +use async_lock::Mutex as AsyncMutex; use darkfi::{ event_graph::{self}, - net::transport::Dialer, + net::transport::{Dialer, PtStream}, system::ExecutorPtr, util::path::expand_path, Error, Result, @@ -27,10 +28,20 @@ use darkfi_serial::{ async_trait, deserialize_async_partial, AsyncDecodable, AsyncEncodable, Encodable, SerialDecodable, SerialEncodable, }; -use evgrd::{FetchEventsMessage, LocalEventGraph, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS}; +use evgrd::{ + FetchEventsMessage, LocalEventGraph, LocalEventGraphPtr, VersionMessage, MSG_EVENT, + MSG_FETCHEVENTS, MSG_SENDEVENT, +}; use log::{error, info}; use sled_overlay::sled; -use smol::fs; +use smol::{ + fs, + io::{ReadHalf, WriteHalf}, +}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex as SyncMutex, Weak, +}; use url::Url; use crate::scene::SceneNodePtr; @@ -47,79 +58,182 @@ pub struct Privmsg { pub msg: String, } -pub async fn receive_msgs(sg_root: SceneNodePtr, ex: ExecutorPtr) -> Result<()> { - let chatview_node = sg_root.lookup_node("/window/view/chatty").ok_or(Error::ConnectFailed)?; +impl Privmsg { + pub fn new(channel: String, nick: String, msg: String) -> Self { + Self { channel, nick, msg } + } +} - info!(target: "darkirc", "Instantiating DarkIRC event DAG"); - //let datastore = expand_path(EVGRDB_PATH)?; - //fs::create_dir_all(&datastore).await?; - let sled_db = sled::open(EVGRDB_PATH)?; +pub type LocalDarkIRCPtr = Arc; - let evgr = LocalEventGraph::new(sled_db.clone(), "darkirc_dag", 1, ex.clone()).await?; +pub struct LocalDarkIRC { + is_connected: AtomicBool, + /// The reading half of the transport stream + reader: AsyncMutex>>>, + /// The writing half of the transport stream + writer: AsyncMutex>>>, - let endpoint = "tcp://127.0.0.1:5588"; - let endpoint = "tcp://192.168.1.38:5588"; - let endpoint = Url::parse(endpoint)?; + evgr: LocalEventGraphPtr, + receive_task: SyncMutex>>, - let dialer = Dialer::new(endpoint.clone(), None).await?; - let timeout = std::time::Duration::from_secs(60); + chatview_node: SceneNodePtr, +} - let mut stream = dialer.dial(Some(timeout)).await?; - info!(target: "darkirc", "Connected to the backend: {endpoint}"); +impl LocalDarkIRC { + pub async fn new(sg_root: SceneNodePtr, ex: ExecutorPtr) -> Result> { + let chatview_node = sg_root.lookup_node("/window/view/chatty").unwrap(); - let version = VersionMessage::new(); - version.encode_async(&mut stream).await?; + info!(target: "darkirc", "Instantiating DarkIRC event DAG"); + let datastore = expand_path(EVGRDB_PATH)?; + fs::create_dir_all(&datastore).await?; + let sled_db = sled::open(datastore)?; - let server_version = VersionMessage::decode_async(&mut stream).await?; - info!(target: "darkirc", "Backend server version: {}", server_version.protocol_version); + let evgr = LocalEventGraph::new(sled_db.clone(), "darkirc_dag", 1, ex.clone()).await?; - let unref_tips = evgr.unreferenced_tips.read().await.clone(); - let fetchevs = FetchEventsMessage::new(unref_tips); - MSG_FETCHEVENTS.encode_async(&mut stream).await?; - fetchevs.encode_async(&mut stream).await?; + Ok(Arc::new(Self { + is_connected: AtomicBool::new(false), + reader: AsyncMutex::new(None), + writer: AsyncMutex::new(None), - loop { - let msg_type = u8::decode_async(&mut stream).await?; + evgr, + receive_task: SyncMutex::new(None), + + chatview_node, + })) + } + + async fn reconnect(&self) -> Result<()> { + let endpoint = "tcp://127.0.0.1:5588"; + let endpoint = Url::parse(endpoint)?; + + let dialer = Dialer::new(endpoint.clone(), None).await?; + let timeout = std::time::Duration::from_secs(60); + + let stream = dialer.dial(Some(timeout)).await?; + info!(target: "darkirc", "Connected to the backend: {endpoint}"); + + let (reader, writer) = smol::io::split(stream); + *self.writer.lock().await = Some(writer); + *self.reader.lock().await = Some(reader); + + Ok(()) + } + + pub async fn start(self: Arc, ex: ExecutorPtr) -> Result<()> { + debug!(target: "darkirc", "LocalDarkIRC::start()"); + + self.version_exchange().await?; + + let me = Arc::downgrade(&self); + let task = ex.spawn(async move { + while let Some(self_) = me.upgrade() { + self_.receive_msg().await.unwrap(); + } + error!(target: "darkirc", "Closing DarkIRC receive loop"); + }); + + let mut receive_task = self.receive_task.lock().unwrap(); + assert!(receive_task.is_none()); + *receive_task = Some(task); + + self.is_connected.store(true, Ordering::Relaxed); + + Ok(()) + } + + async fn version_exchange(&self) -> Result<()> { + if !self.is_connected.load(Ordering::Relaxed) { + self.reconnect().await?; + } + + let mut writer = self.writer.lock().await; + let mut reader = self.reader.lock().await; + let writer = writer.as_mut().unwrap(); + let reader = reader.as_mut().unwrap(); + + let version = VersionMessage::new(); + version.encode_async(writer).await?; + + let server_version = VersionMessage::decode_async(reader).await?; + info!(target: "darkirc", "Backend server version: {}", server_version.protocol_version); + + let unref_tips = self.evgr.unreferenced_tips.read().await.clone(); + let fetchevs = FetchEventsMessage::new(unref_tips); + MSG_FETCHEVENTS.encode_async(writer).await?; + fetchevs.encode_async(writer).await?; + + Ok(()) + } + + async fn send_msg(&self, timestamp: u64, msg: Privmsg) -> Result<()> { + if !self.is_connected.load(Ordering::Relaxed) { + self.reconnect().await?; + } + + let mut writer = self.writer.lock().await; + let writer = writer.as_mut().unwrap(); + + MSG_SENDEVENT.encode_async(writer).await?; + timestamp.encode_async(writer).await?; + msg.encode_async(writer).await?; + Ok(()) + } + + async fn receive_msg(&self) -> Result<()> { + if !self.is_connected.load(Ordering::Relaxed) { + self.reconnect().await?; + } + + debug!(target: "darkirc", "Receiving message..."); + + let mut reader = self.reader.lock().await; + let reader = reader.as_mut().unwrap(); + + let msg_type = u8::decode_async(reader).await?; debug!(target: "darkirc", "Received: {msg_type:?}"); if msg_type != MSG_EVENT { error!(target: "darkirc", "Received invalid msg_type: {msg_type}"); - return Err(Error::MalformedPacket) + //return Err(Error::MalformedPacket) + return Ok(()) } - let ev = event_graph::Event::decode_async(&mut stream).await?; + let ev = event_graph::Event::decode_async(reader).await?; - let genesis_timestamp = evgr.current_genesis.read().await.clone().timestamp; + let genesis_timestamp = self.evgr.current_genesis.read().await.clone().timestamp; let ev_id = ev.id(); - if evgr.dag.contains_key(ev_id.as_bytes()).unwrap() || - !ev.validate(&evgr.dag, genesis_timestamp, evgr.days_rotation, None).await? + if self.evgr.dag.contains_key(ev_id.as_bytes()).unwrap() || + !ev.validate(&self.evgr.dag, genesis_timestamp, self.evgr.days_rotation, None) + .await? { error!(target: "darkirc", "Event is invalid! {ev:?}"); - continue + return Ok(()) } debug!(target: "darkirc", "got {ev:?}"); - evgr.dag_insert(&[ev.clone()]).await.unwrap(); + self.evgr.dag_insert(&[ev.clone()]).await.unwrap(); let privmsg: Privmsg = match deserialize_async_partial(ev.content()).await { Ok((v, _)) => v, Err(e) => { error!(target: "darkirc", "Failed deserializing incoming Privmsg event: {e}"); - continue + return Ok(()) } }; debug!(target: "darkirc", "privmsg: {privmsg:?}"); if privmsg.channel != "#random" { - continue + return Ok(()) } let mut arg_data = vec![]; - ev.timestamp.encode(&mut arg_data).unwrap(); - ev.id().as_bytes().encode(&mut arg_data).unwrap(); - privmsg.nick.encode(&mut arg_data).unwrap(); - privmsg.msg.encode(&mut arg_data).unwrap(); + ev.timestamp.encode_async(&mut arg_data).await.unwrap(); + ev.id().as_bytes().encode_async(&mut arg_data).await.unwrap(); + privmsg.nick.encode_async(&mut arg_data).await.unwrap(); + privmsg.msg.encode_async(&mut arg_data).await.unwrap(); - chatview_node.call_method("insert_line", arg_data).await.unwrap(); + self.chatview_node.call_method("insert_line", arg_data).await.unwrap(); + + Ok(()) } } diff --git a/bin/darkwallet/src/expr/compile.rs b/bin/darkwallet/src/expr/compile.rs index 9355f37e5..1d5bd1a8d 100644 --- a/bin/darkwallet/src/expr/compile.rs +++ b/bin/darkwallet/src/expr/compile.rs @@ -352,6 +352,18 @@ mod tests { assert_eq!(code, code2); } + #[test] + fn h_minus_1() { + let mut compiler = Compiler::new(); + let code = compiler.compile("h - 1").unwrap(); + #[rustfmt::skip] + let code2 = vec![Op::Sub(( + Box::new(Op::LoadVar("h".to_string())), + Box::new(Op::ConstFloat32(1.)) + ))]; + assert_eq!(code, code2); + } + #[test] fn dosub() { let mut compiler = Compiler::new(); diff --git a/bin/darkwallet/src/main.rs b/bin/darkwallet/src/main.rs index 286d30d47..027264adc 100644 --- a/bin/darkwallet/src/main.rs +++ b/bin/darkwallet/src/main.rs @@ -65,7 +65,11 @@ mod text; mod ui; mod util; -use crate::{net::ZeroMQAdapter, text::TextShaper}; +use crate::{ + darkirc2::{LocalDarkIRC, LocalDarkIRCPtr}, + net::ZeroMQAdapter, + text::TextShaper, +}; pub type ExecutorPtr = Arc>; @@ -130,24 +134,19 @@ fn main() { let text_shaper = TextShaper::new(); - //let darkirc_backend = DarkIrcBackend::new(); - let app = app::App::new( - sg_root, - render_api, - event_pub.clone(), - text_shaper, - //darkirc_backend, - ex.clone(), - ); + let app = app::App::new(sg_root, render_api, event_pub.clone(), text_shaper, ex.clone()); let app_task = ex.spawn(app.clone().start()); async_runtime.push_task(app_task); + let app2 = app.clone(); let cv_started = app.is_started.clone(); let sg_root = app.sg_root.clone(); let ex2 = ex.clone(); let darkirc_task = ex.spawn(async move { cv_started.wait().await; - if let Err(e) = darkirc2::receive_msgs(sg_root, ex2).await { + let darkirc_evgr = LocalDarkIRC::new(sg_root.clone(), ex2.clone()).await.unwrap(); + *app2.darkirc_evgr.lock().unwrap() = Some(darkirc_evgr.clone()); + if let Err(e) = darkirc_evgr.start(ex2).await { error!("DarkIRC error: {e}") } }); diff --git a/bin/darkwallet/src/prop/mod.rs b/bin/darkwallet/src/prop/mod.rs index c5ff02b90..1d867c728 100644 --- a/bin/darkwallet/src/prop/mod.rs +++ b/bin/darkwallet/src/prop/mod.rs @@ -772,7 +772,7 @@ mod tests { prop.set_expr(Role::App, 0, code).unwrap(); let val = prop.get_cached(0).unwrap(); assert!(val.is_null()); - prop.set_cache_f32(0, 4.).unwrap(); + prop.set_cache_f32(Role::App, 0, 4.).unwrap(); let val = prop.get_cached(0).unwrap(); assert_eq!(val.as_f32().unwrap(), 4.); }