diff --git a/script/evgrd/Makefile b/script/evgrd/Makefile index c8d971971..06c0165d1 100644 --- a/script/evgrd/Makefile +++ b/script/evgrd/Makefile @@ -1,3 +1,3 @@ evgrd: - cargo run --bin evgrd --features=build-daemon + LOG_TARGETS=evgrd cargo run --bin evgrd --features=build-daemon diff --git a/script/evgrd/bin/evgrd.rs b/script/evgrd/bin/evgrd.rs index 124f35573..7e229919a 100644 --- a/script/evgrd/bin/evgrd.rs +++ b/script/evgrd/bin/evgrd.rs @@ -42,7 +42,7 @@ use std::{collections::HashSet, path::PathBuf, sync::Arc}; use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; use url::Url; -use evgrd::{FetchEventsMessage, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS}; +use evgrd::{FetchEventsMessage, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS, MSG_SENDEVENT}; mod rpc; @@ -149,7 +149,7 @@ async fn rpc_serve( match listener.next().await { Ok((stream, url)) => { info!(target: "evgrd", "Accepted connection from {url}"); - ex.spawn(handle_connect(stream, daemon.clone(), ex.clone())).detach(); + ex.spawn(handle_connect(stream, daemon.clone())).detach(); } // Errors we didn't handle above: @@ -164,11 +164,7 @@ async fn rpc_serve( } } -async fn handle_connect( - mut stream: Box, - daemon: Arc, - _ex: Arc>, -) -> Result<()> { +async fn handle_connect(mut stream: Box, daemon: Arc) -> Result<()> { let client_version = VersionMessage::decode_async(&mut stream).await?; info!(target: "evgrd", "Client version: {}", client_version.protocol_version); @@ -184,27 +180,43 @@ async fn handle_connect( ev.encode_async(&mut stream).await?; } msg_type = u8::decode_async(&mut stream).fuse() => { + debug!(target: "evgrd", "Received msg_type: {msg_type:?}"); let msg_type = msg_type?; - if msg_type != MSG_FETCHEVENTS { - error!(target: "evgrd", "Connection received invalid msg_type: {msg_type}"); - return Err(Error::MalformedPacket) - } - - let fetchevs = FetchEventsMessage::decode_async(&mut stream).await?; - info!(target: "evgrd", "Fetching events {fetchevs:?}"); - let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?; - - info!("fetched {events:?}"); - - for event in events { - MSG_EVENT.encode_async(&mut stream).await?; - event.encode_async(&mut stream).await?; + match msg_type { + MSG_FETCHEVENTS => fetch_events(&mut stream, &daemon).await?, + MSG_SENDEVENT => send_event(&mut stream, &daemon).await?, + _ => error!("Skipping unhandled msg_type: {msg_type}") } } } } } +async fn fetch_events(stream: &mut Box, daemon: &Daemon) -> Result<()> { + let fetchevs = FetchEventsMessage::decode_async(stream).await?; + info!(target: "evgrd", "Fetching events {fetchevs:?}"); + let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?; + + info!("fetched {events:?}"); + + for event in events { + MSG_EVENT.encode_async(stream).await?; + event.encode_async(stream).await?; + } + Ok(()) +} + +async fn send_event(stream: &mut Box, daemon: &Daemon) -> Result<()> { + let timestamp = u64::decode_async(stream).await?; + let content = Vec::::decode_async(stream).await?; + info!("send_event: {timestamp}, {content:?}"); + + let event = Event::with_timestamp(timestamp, content, &daemon.event_graph).await; + daemon.event_graph.dag_insert(&[event.clone()]).await.unwrap(); + + Ok(()) +} + async_daemonize!(realmain); async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!("Starting evgrd node"); diff --git a/script/evgrd/src/lib.rs b/script/evgrd/src/lib.rs index 2a53f6ed3..50a7b7546 100644 --- a/script/evgrd/src/lib.rs +++ b/script/evgrd/src/lib.rs @@ -379,3 +379,4 @@ impl FetchEventsMessage { pub const MSG_EVENT: u8 = 1; pub const MSG_FETCHEVENTS: u8 = 2; +pub const MSG_SENDEVENT: u8 = 3;