wallet: structify the darkirc local backend so we can call .send(msg) on the stream it manages.

This commit is contained in:
darkfi
2024-09-22 12:32:03 +02:00
parent 63ea55236a
commit 5bce7a7500
5 changed files with 181 additions and 56 deletions

View File

@@ -29,6 +29,7 @@ use std::{
};
use crate::{
darkirc2::LocalDarkIRCPtr,
error::Error,
expr::Op,
gfx::{GraphicsEventPublisherPtr, RenderApiPtr, Vertex},
@@ -121,7 +122,7 @@ pub struct App {
pub render_api: RenderApiPtr,
pub event_pub: GraphicsEventPublisherPtr,
pub text_shaper: TextShaperPtr,
//pub(self) darkirc_backend: DarkIrcBackendPtr,
pub darkirc_evgr: SyncMutex<Option<LocalDarkIRCPtr>>,
pub tasks: SyncMutex<Vec<Task<()>>>,
pub ex: ExecutorPtr,
}
@@ -132,7 +133,6 @@ impl App {
render_api: RenderApiPtr,
event_pub: GraphicsEventPublisherPtr,
text_shaper: TextShaperPtr,
//darkirc_backend: DarkIrcBackendPtr,
ex: ExecutorPtr,
) -> Arc<Self> {
Arc::new(Self {
@@ -142,7 +142,7 @@ impl App {
render_api,
event_pub,
text_shaper,
//darkirc_backend,
darkirc_evgr: SyncMutex::new(None),
tasks: SyncMutex::new(vec![]),
})
}

View File

@@ -16,9 +16,10 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_lock::Mutex as AsyncMutex;
use darkfi::{
event_graph::{self},
net::transport::Dialer,
net::transport::{Dialer, PtStream},
system::ExecutorPtr,
util::path::expand_path,
Error, Result,
@@ -27,10 +28,20 @@ use darkfi_serial::{
async_trait, deserialize_async_partial, AsyncDecodable, AsyncEncodable, Encodable,
SerialDecodable, SerialEncodable,
};
use evgrd::{FetchEventsMessage, LocalEventGraph, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS};
use evgrd::{
FetchEventsMessage, LocalEventGraph, LocalEventGraphPtr, VersionMessage, MSG_EVENT,
MSG_FETCHEVENTS, MSG_SENDEVENT,
};
use log::{error, info};
use sled_overlay::sled;
use smol::fs;
use smol::{
fs,
io::{ReadHalf, WriteHalf},
};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex as SyncMutex, Weak,
};
use url::Url;
use crate::scene::SceneNodePtr;
@@ -47,79 +58,182 @@ pub struct Privmsg {
pub msg: String,
}
pub async fn receive_msgs(sg_root: SceneNodePtr, ex: ExecutorPtr) -> Result<()> {
let chatview_node = sg_root.lookup_node("/window/view/chatty").ok_or(Error::ConnectFailed)?;
impl Privmsg {
pub fn new(channel: String, nick: String, msg: String) -> Self {
Self { channel, nick, msg }
}
}
info!(target: "darkirc", "Instantiating DarkIRC event DAG");
//let datastore = expand_path(EVGRDB_PATH)?;
//fs::create_dir_all(&datastore).await?;
let sled_db = sled::open(EVGRDB_PATH)?;
pub type LocalDarkIRCPtr = Arc<LocalDarkIRC>;
let evgr = LocalEventGraph::new(sled_db.clone(), "darkirc_dag", 1, ex.clone()).await?;
pub struct LocalDarkIRC {
is_connected: AtomicBool,
/// The reading half of the transport stream
reader: AsyncMutex<Option<ReadHalf<Box<dyn PtStream>>>>,
/// The writing half of the transport stream
writer: AsyncMutex<Option<WriteHalf<Box<dyn PtStream>>>>,
let endpoint = "tcp://127.0.0.1:5588";
let endpoint = "tcp://192.168.1.38:5588";
let endpoint = Url::parse(endpoint)?;
evgr: LocalEventGraphPtr,
receive_task: SyncMutex<Option<smol::Task<()>>>,
let dialer = Dialer::new(endpoint.clone(), None).await?;
let timeout = std::time::Duration::from_secs(60);
chatview_node: SceneNodePtr,
}
let mut stream = dialer.dial(Some(timeout)).await?;
info!(target: "darkirc", "Connected to the backend: {endpoint}");
impl LocalDarkIRC {
pub async fn new(sg_root: SceneNodePtr, ex: ExecutorPtr) -> Result<Arc<Self>> {
let chatview_node = sg_root.lookup_node("/window/view/chatty").unwrap();
let version = VersionMessage::new();
version.encode_async(&mut stream).await?;
info!(target: "darkirc", "Instantiating DarkIRC event DAG");
let datastore = expand_path(EVGRDB_PATH)?;
fs::create_dir_all(&datastore).await?;
let sled_db = sled::open(datastore)?;
let server_version = VersionMessage::decode_async(&mut stream).await?;
info!(target: "darkirc", "Backend server version: {}", server_version.protocol_version);
let evgr = LocalEventGraph::new(sled_db.clone(), "darkirc_dag", 1, ex.clone()).await?;
let unref_tips = evgr.unreferenced_tips.read().await.clone();
let fetchevs = FetchEventsMessage::new(unref_tips);
MSG_FETCHEVENTS.encode_async(&mut stream).await?;
fetchevs.encode_async(&mut stream).await?;
Ok(Arc::new(Self {
is_connected: AtomicBool::new(false),
reader: AsyncMutex::new(None),
writer: AsyncMutex::new(None),
loop {
let msg_type = u8::decode_async(&mut stream).await?;
evgr,
receive_task: SyncMutex::new(None),
chatview_node,
}))
}
async fn reconnect(&self) -> Result<()> {
let endpoint = "tcp://127.0.0.1:5588";
let endpoint = Url::parse(endpoint)?;
let dialer = Dialer::new(endpoint.clone(), None).await?;
let timeout = std::time::Duration::from_secs(60);
let stream = dialer.dial(Some(timeout)).await?;
info!(target: "darkirc", "Connected to the backend: {endpoint}");
let (reader, writer) = smol::io::split(stream);
*self.writer.lock().await = Some(writer);
*self.reader.lock().await = Some(reader);
Ok(())
}
pub async fn start(self: Arc<Self>, ex: ExecutorPtr) -> Result<()> {
debug!(target: "darkirc", "LocalDarkIRC::start()");
self.version_exchange().await?;
let me = Arc::downgrade(&self);
let task = ex.spawn(async move {
while let Some(self_) = me.upgrade() {
self_.receive_msg().await.unwrap();
}
error!(target: "darkirc", "Closing DarkIRC receive loop");
});
let mut receive_task = self.receive_task.lock().unwrap();
assert!(receive_task.is_none());
*receive_task = Some(task);
self.is_connected.store(true, Ordering::Relaxed);
Ok(())
}
async fn version_exchange(&self) -> Result<()> {
if !self.is_connected.load(Ordering::Relaxed) {
self.reconnect().await?;
}
let mut writer = self.writer.lock().await;
let mut reader = self.reader.lock().await;
let writer = writer.as_mut().unwrap();
let reader = reader.as_mut().unwrap();
let version = VersionMessage::new();
version.encode_async(writer).await?;
let server_version = VersionMessage::decode_async(reader).await?;
info!(target: "darkirc", "Backend server version: {}", server_version.protocol_version);
let unref_tips = self.evgr.unreferenced_tips.read().await.clone();
let fetchevs = FetchEventsMessage::new(unref_tips);
MSG_FETCHEVENTS.encode_async(writer).await?;
fetchevs.encode_async(writer).await?;
Ok(())
}
async fn send_msg(&self, timestamp: u64, msg: Privmsg) -> Result<()> {
if !self.is_connected.load(Ordering::Relaxed) {
self.reconnect().await?;
}
let mut writer = self.writer.lock().await;
let writer = writer.as_mut().unwrap();
MSG_SENDEVENT.encode_async(writer).await?;
timestamp.encode_async(writer).await?;
msg.encode_async(writer).await?;
Ok(())
}
async fn receive_msg(&self) -> Result<()> {
if !self.is_connected.load(Ordering::Relaxed) {
self.reconnect().await?;
}
debug!(target: "darkirc", "Receiving message...");
let mut reader = self.reader.lock().await;
let reader = reader.as_mut().unwrap();
let msg_type = u8::decode_async(reader).await?;
debug!(target: "darkirc", "Received: {msg_type:?}");
if msg_type != MSG_EVENT {
error!(target: "darkirc", "Received invalid msg_type: {msg_type}");
return Err(Error::MalformedPacket)
//return Err(Error::MalformedPacket)
return Ok(())
}
let ev = event_graph::Event::decode_async(&mut stream).await?;
let ev = event_graph::Event::decode_async(reader).await?;
let genesis_timestamp = evgr.current_genesis.read().await.clone().timestamp;
let genesis_timestamp = self.evgr.current_genesis.read().await.clone().timestamp;
let ev_id = ev.id();
if evgr.dag.contains_key(ev_id.as_bytes()).unwrap() ||
!ev.validate(&evgr.dag, genesis_timestamp, evgr.days_rotation, None).await?
if self.evgr.dag.contains_key(ev_id.as_bytes()).unwrap() ||
!ev.validate(&self.evgr.dag, genesis_timestamp, self.evgr.days_rotation, None)
.await?
{
error!(target: "darkirc", "Event is invalid! {ev:?}");
continue
return Ok(())
}
debug!(target: "darkirc", "got {ev:?}");
evgr.dag_insert(&[ev.clone()]).await.unwrap();
self.evgr.dag_insert(&[ev.clone()]).await.unwrap();
let privmsg: Privmsg = match deserialize_async_partial(ev.content()).await {
Ok((v, _)) => v,
Err(e) => {
error!(target: "darkirc", "Failed deserializing incoming Privmsg event: {e}");
continue
return Ok(())
}
};
debug!(target: "darkirc", "privmsg: {privmsg:?}");
if privmsg.channel != "#random" {
continue
return Ok(())
}
let mut arg_data = vec![];
ev.timestamp.encode(&mut arg_data).unwrap();
ev.id().as_bytes().encode(&mut arg_data).unwrap();
privmsg.nick.encode(&mut arg_data).unwrap();
privmsg.msg.encode(&mut arg_data).unwrap();
ev.timestamp.encode_async(&mut arg_data).await.unwrap();
ev.id().as_bytes().encode_async(&mut arg_data).await.unwrap();
privmsg.nick.encode_async(&mut arg_data).await.unwrap();
privmsg.msg.encode_async(&mut arg_data).await.unwrap();
chatview_node.call_method("insert_line", arg_data).await.unwrap();
self.chatview_node.call_method("insert_line", arg_data).await.unwrap();
Ok(())
}
}

View File

@@ -352,6 +352,18 @@ mod tests {
assert_eq!(code, code2);
}
#[test]
fn h_minus_1() {
let mut compiler = Compiler::new();
let code = compiler.compile("h - 1").unwrap();
#[rustfmt::skip]
let code2 = vec![Op::Sub((
Box::new(Op::LoadVar("h".to_string())),
Box::new(Op::ConstFloat32(1.))
))];
assert_eq!(code, code2);
}
#[test]
fn dosub() {
let mut compiler = Compiler::new();

View File

@@ -65,7 +65,11 @@ mod text;
mod ui;
mod util;
use crate::{net::ZeroMQAdapter, text::TextShaper};
use crate::{
darkirc2::{LocalDarkIRC, LocalDarkIRCPtr},
net::ZeroMQAdapter,
text::TextShaper,
};
pub type ExecutorPtr = Arc<smol::Executor<'static>>;
@@ -130,24 +134,19 @@ fn main() {
let text_shaper = TextShaper::new();
//let darkirc_backend = DarkIrcBackend::new();
let app = app::App::new(
sg_root,
render_api,
event_pub.clone(),
text_shaper,
//darkirc_backend,
ex.clone(),
);
let app = app::App::new(sg_root, render_api, event_pub.clone(), text_shaper, ex.clone());
let app_task = ex.spawn(app.clone().start());
async_runtime.push_task(app_task);
let app2 = app.clone();
let cv_started = app.is_started.clone();
let sg_root = app.sg_root.clone();
let ex2 = ex.clone();
let darkirc_task = ex.spawn(async move {
cv_started.wait().await;
if let Err(e) = darkirc2::receive_msgs(sg_root, ex2).await {
let darkirc_evgr = LocalDarkIRC::new(sg_root.clone(), ex2.clone()).await.unwrap();
*app2.darkirc_evgr.lock().unwrap() = Some(darkirc_evgr.clone());
if let Err(e) = darkirc_evgr.start(ex2).await {
error!("DarkIRC error: {e}")
}
});

View File

@@ -772,7 +772,7 @@ mod tests {
prop.set_expr(Role::App, 0, code).unwrap();
let val = prop.get_cached(0).unwrap();
assert!(val.is_null());
prop.set_cache_f32(0, 4.).unwrap();
prop.set_cache_f32(Role::App, 0, 4.).unwrap();
let val = prop.get_cached(0).unwrap();
assert_eq!(val.as_f32().unwrap(), 4.);
}