evgrd: add SENTEVENT to protocol

This commit is contained in:
darkfi
2024-09-22 12:29:30 +02:00
parent 7c11ee6cc9
commit ac53cae94a
3 changed files with 35 additions and 22 deletions

View File

@@ -1,3 +1,3 @@
evgrd:
cargo run --bin evgrd --features=build-daemon
LOG_TARGETS=evgrd cargo run --bin evgrd --features=build-daemon

View File

@@ -42,7 +42,7 @@ use std::{collections::HashSet, path::PathBuf, sync::Arc};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use url::Url;
use evgrd::{FetchEventsMessage, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS};
use evgrd::{FetchEventsMessage, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS, MSG_SENDEVENT};
mod rpc;
@@ -149,7 +149,7 @@ async fn rpc_serve(
match listener.next().await {
Ok((stream, url)) => {
info!(target: "evgrd", "Accepted connection from {url}");
ex.spawn(handle_connect(stream, daemon.clone(), ex.clone())).detach();
ex.spawn(handle_connect(stream, daemon.clone())).detach();
}
// Errors we didn't handle above:
@@ -164,11 +164,7 @@ async fn rpc_serve(
}
}
async fn handle_connect(
mut stream: Box<dyn PtStream>,
daemon: Arc<Daemon>,
_ex: Arc<Executor<'_>>,
) -> Result<()> {
async fn handle_connect(mut stream: Box<dyn PtStream>, daemon: Arc<Daemon>) -> Result<()> {
let client_version = VersionMessage::decode_async(&mut stream).await?;
info!(target: "evgrd", "Client version: {}", client_version.protocol_version);
@@ -184,27 +180,43 @@ async fn handle_connect(
ev.encode_async(&mut stream).await?;
}
msg_type = u8::decode_async(&mut stream).fuse() => {
debug!(target: "evgrd", "Received msg_type: {msg_type:?}");
let msg_type = msg_type?;
if msg_type != MSG_FETCHEVENTS {
error!(target: "evgrd", "Connection received invalid msg_type: {msg_type}");
return Err(Error::MalformedPacket)
}
let fetchevs = FetchEventsMessage::decode_async(&mut stream).await?;
info!(target: "evgrd", "Fetching events {fetchevs:?}");
let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?;
info!("fetched {events:?}");
for event in events {
MSG_EVENT.encode_async(&mut stream).await?;
event.encode_async(&mut stream).await?;
match msg_type {
MSG_FETCHEVENTS => fetch_events(&mut stream, &daemon).await?,
MSG_SENDEVENT => send_event(&mut stream, &daemon).await?,
_ => error!("Skipping unhandled msg_type: {msg_type}")
}
}
}
}
}
async fn fetch_events(stream: &mut Box<dyn PtStream>, daemon: &Daemon) -> Result<()> {
let fetchevs = FetchEventsMessage::decode_async(stream).await?;
info!(target: "evgrd", "Fetching events {fetchevs:?}");
let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?;
info!("fetched {events:?}");
for event in events {
MSG_EVENT.encode_async(stream).await?;
event.encode_async(stream).await?;
}
Ok(())
}
async fn send_event(stream: &mut Box<dyn PtStream>, daemon: &Daemon) -> Result<()> {
let timestamp = u64::decode_async(stream).await?;
let content = Vec::<u8>::decode_async(stream).await?;
info!("send_event: {timestamp}, {content:?}");
let event = Event::with_timestamp(timestamp, content, &daemon.event_graph).await;
daemon.event_graph.dag_insert(&[event.clone()]).await.unwrap();
Ok(())
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!("Starting evgrd node");

View File

@@ -379,3 +379,4 @@ impl FetchEventsMessage {
pub const MSG_EVENT: u8 = 1;
pub const MSG_FETCHEVENTS: u8 = 2;
pub const MSG_SENDEVENT: u8 = 3;