evgrd: test client and evgrd server skeletons

This commit is contained in:
darkfi
2024-09-10 10:16:43 +02:00
parent 407b32eb25
commit f3937d2f0a
4 changed files with 295 additions and 46 deletions

View File

@@ -10,12 +10,17 @@ repository = "https://codeberg.org/darkrenaissance/darkfi"
[[bin]]
name = "evgrd"
path = "src/main.rs"
path = "bin/evgrd.rs"
[[bin]]
name = "test"
path = "bin/test.rs"
[dependencies]
darkfi = {path = "../../", features = ["async-daemonize", "event-graph", "rpc", "zk"]}
darkfi-sdk = {path = "../../src/sdk", features = ["async"]}
darkfi-serial = {path = "../../src/serial", features = ["async"]}
futures-lite = "2.3.0"
libc = "0.2.158"
# Event Graph DB

View File

@@ -16,29 +16,41 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{sync::{Arc, Mutex as SyncMutex}, path::PathBuf};
use darkfi::{
async_daemonize, cli_desc,
event_graph::{proto::ProtocolEventGraph, EventGraph, EventGraphPtr},
net::{session::SESSION_DEFAULT, settings::SettingsOpt as NetSettingsOpt, P2p, P2pPtr},
event_graph::{self, proto::ProtocolEventGraph, EventGraph, EventGraphPtr},
net::{
session::SESSION_DEFAULT,
settings::SettingsOpt as NetSettingsOpt,
transport::{Listener, PtListener, PtStream},
P2p, P2pPtr,
},
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{sleep, StoppableTask, StoppableTaskPtr},
system::{sleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr},
util::path::{expand_path, get_config_path},
Error, Result,
};
use darkfi_serial::{
async_trait, deserialize_async, serialize_async, Encodable, SerialDecodable, SerialEncodable,
async_trait, deserialize_async, serialize_async, AsyncDecodable, AsyncEncodable, Encodable,
SerialDecodable, SerialEncodable,
};
use futures::FutureExt;
use log::{debug, error, info};
use rand::rngs::OsRng;
use sled_overlay::sled;
use smol::{fs, lock::Mutex, stream::StreamExt, Executor};
use std::{
path::PathBuf,
sync::{Arc, Mutex as SyncMutex},
};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use url::Url;
use evgrd::{FetchEventsMessage, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS};
const CONFIG_FILE: &str = "evgrd.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../evgrd.toml");
@@ -58,7 +70,7 @@ struct Args {
/// Set log file output
log: Option<String>,
#[structopt(long, default_value = "tcp://127.0.0.1:26880")]
#[structopt(long, default_value = "tcp://127.0.0.1:5588")]
/// RPC server listen address
rpc_listen: Url,
@@ -92,47 +104,121 @@ struct Args {
}
pub struct Daemon {
/// P2P network pointer
p2p: P2pPtr,
/// Sled DB (also used in event_graph and for RLN)
sled: sled::Db,
/// Event Graph instance
event_graph: EventGraphPtr,
/// JSON-RPC connection tracker
///// P2P network pointer
//p2p: P2pPtr,
///// Sled DB (also used in event_graph and for RLN)
//sled: sled::Db,
///// Event Graph instance
//event_graph: EventGraphPtr,
///// JSON-RPC connection tracker
//rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
/// dnet JSON-RPC subscriber
dnet_sub: JsonSubscriber,
/// deg JSON-RPC subscriber
deg_sub: JsonSubscriber,
/// Replay logs (DB) path
replay_datastore: PathBuf,
///// dnet JSON-RPC subscriber
//dnet_sub: JsonSubscriber,
///// deg JSON-RPC subscriber
//deg_sub: JsonSubscriber,
///// Replay logs (DB) path
//replay_datastore: PathBuf,
/// New events publisher
events_pub: PublisherPtr<event_graph::Event>,
}
impl Daemon {
fn new(
p2p: P2pPtr,
sled: sled::Db,
event_graph: EventGraphPtr,
dnet_sub: JsonSubscriber,
deg_sub: JsonSubscriber,
replay_datastore: PathBuf,
//p2p: P2pPtr,
//sled: sled::Db,
//event_graph: EventGraphPtr,
//dnet_sub: JsonSubscriber,
//deg_sub: JsonSubscriber,
//replay_datastore: PathBuf,
events_pub: PublisherPtr<event_graph::Event>,
) -> Self {
Self {
p2p,
sled,
event_graph,
//p2p,
//sled,
//event_graph,
//rpc_connections: Mutex::new(HashSet::new()),
dnet_sub,
deg_sub,
replay_datastore,
//dnet_sub,
//deg_sub,
//replay_datastore,
events_pub,
}
}
}
async fn rpc_serve(
listener: Box<dyn PtListener>,
daemon: Arc<Daemon>,
ex: Arc<Executor<'_>>,
) -> Result<()> {
loop {
match listener.next().await {
Ok((stream, url)) => {
info!(target: "evgrd", "Accepted connection from {url}");
ex.spawn(handle_connect(stream, daemon.clone(), ex.clone())).detach();
}
// Errors we didn't handle above:
Err(e) => {
error!(
target: "evgrd",
"Unhandled listener.next() error: {}", e,
);
continue
}
}
}
Ok(())
}
async fn handle_connect(
mut stream: Box<dyn PtStream>,
daemon: Arc<Daemon>,
ex: Arc<Executor<'_>>,
) -> Result<()> {
let client_version = VersionMessage::decode_async(&mut stream).await?;
info!(target: "evgrd", "Client version: {}", client_version.protocol_version);
let version = VersionMessage::new();
version.encode_async(&mut stream).await?;
let event_sub = daemon.events_pub.clone().subscribe().await;
loop {
futures::select! {
ev = event_sub.receive().fuse() => {
MSG_EVENT.encode_async(&mut stream).await?;
ev.encode_async(&mut stream).await?;
}
msg_type = u8::decode_async(&mut stream).fuse() => {
let msg_type = msg_type?;
if msg_type != MSG_FETCHEVENTS {
error!(target: "evgrd", "Connection received invalid msg_type: {msg_type}");
return Err(Error::MalformedPacket)
}
let fetchevs = FetchEventsMessage::decode_async(&mut stream).await?;
info!(target: "evgrd", "Fetching events {fetchevs:?}");
// Now do your thing with the daemon and get missing tips
// Then send them like this:
// for ev in evs {
// MSG_EVENT.encode_async(&mut stream).await?;
// ev.encode_async(&mut stream).await?;
// }
}
}
}
Ok(())
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!("Starting evgrd node");
/*
// Create datastore path if not there already.
let datastore = expand_path(&args.datastore)?;
fs::create_dir_all(&datastore).await?;
@@ -215,38 +301,48 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
Error::DetachedTaskStopped,
ex.clone(),
);
*/
// New events are published here
let events_pub = Publisher::new();
info!("Starting JSON-RPC server");
let daemon = Arc::new(Daemon::new(
p2p.clone(),
sled_db.clone(),
event_graph.clone(),
dnet_sub,
deg_sub,
replay_datastore.clone(),
//p2p.clone(),
//sled_db.clone(),
//event_graph.clone(),
//dnet_sub,
//deg_sub,
//replay_datastore.clone(),
events_pub,
));
let daemon_ = Arc::clone(&daemon);
/*
let listener = Listener::new(args.rpc_listen, None).await?;
let ptlistener = listener.listen().await?;
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, daemon.clone(), None, ex.clone()),
rpc_serve(ptlistener, daemon.clone(), ex.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => daemon_.stop_connections().await,
Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
Ok(()) => panic!("Acceptor task should never complete without error status"),
//Err(Error::RpcServerStopped) => daemon_.stop_connections().await,
Err(e) => error!("Failed stopping RPC server: {}", e),
}
},
Error::RpcServerStopped,
ex.clone(),
);
*/
/*
info!("Starting P2P network");
p2p.clone().start().await?;
*/
info!("Waiting for some P2P connections...");
sleep(5).await;
/*
// We'll attempt to sync {sync_attempts} times
if !args.skip_dag_sync {
for i in 1..=args.sync_attempts {
@@ -270,17 +366,21 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
} else {
*event_graph.synced.write().await = true;
}
*/
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
info!("Caught termination signal, cleaning up and exiting...");
/*
info!("Stopping P2P network");
p2p.stop().await;
*/
info!("Stopping JSON-RPC server");
//rpc_task.stop().await;
info!("Stopping RPC server");
rpc_task.stop().await;
/*
dnet_task.stop().await;
deg_task.stop().await;
@@ -292,5 +392,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!("Flushed {} bytes", flushed_bytes);
info!("Shut down successfully");
*/
Ok(())
}

83
script/evgrd/bin/test.rs Normal file
View File

@@ -0,0 +1,83 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2024 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::{
async_daemonize, cli_desc,
event_graph::{self, proto::ProtocolEventGraph, EventGraph, EventGraphPtr},
net::{
session::SESSION_DEFAULT,
settings::SettingsOpt as NetSettingsOpt,
transport::{Dialer, Listener, PtListener, PtStream},
P2p, P2pPtr,
},
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{sleep, StoppableTask, StoppableTaskPtr},
util::path::{expand_path, get_config_path},
Error, Result,
};
use darkfi_serial::{
async_trait, deserialize_async, serialize_async, AsyncDecodable, AsyncEncodable, Encodable,
SerialDecodable, SerialEncodable,
};
use log::{debug, error, info, warn};
use url::Url;
use evgrd::{FetchEventsMessage, LocalEventGraph, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS};
async fn amain() -> Result<()> {
let evgr = LocalEventGraph::new();
let endpoint = "tcp://127.0.0.1:5588";
let endpoint = Url::parse(endpoint)?;
let dialer = Dialer::new(endpoint, None).await?;
let timeout = std::time::Duration::from_secs(60);
info!("Connecting...");
let mut stream = dialer.dial(Some(timeout)).await?;
info!("Connected!");
let version = VersionMessage::new();
version.encode_async(&mut stream).await?;
let server_version = VersionMessage::decode_async(&mut stream).await?;
info!("Server version: {}", server_version.protocol_version);
let fetchevs = FetchEventsMessage::new(evgr.unref_tips.clone());
MSG_FETCHEVENTS.encode_async(&mut stream).await?;
fetchevs.encode_async(&mut stream).await?;
loop {
let msg_type = u8::decode_async(&mut stream).await?;
if msg_type != MSG_EVENT {
error!("Received invalid msg_type: {msg_type}");
return Err(Error::MalformedPacket)
}
let ev = event_graph::Event::decode_async(&mut stream).await?;
}
Ok(())
}
fn main() {
smol::block_on(amain());
}

60
script/evgrd/src/lib.rs Normal file
View File

@@ -0,0 +1,60 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2024 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::event_graph::Event;
use darkfi_serial::{
async_trait, deserialize_async, serialize_async, Encodable, SerialDecodable, SerialEncodable,
};
use std::sync::Arc;
pub const PROTOCOL_VERSION: u32 = 1;
pub struct LocalEventGraph {
pub unref_tips: Vec<(u64, blake3::Hash)>,
}
impl LocalEventGraph {
pub fn new() -> Self {
Self { unref_tips: vec![] }
}
}
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct VersionMessage {
pub protocol_version: u32,
}
impl VersionMessage {
pub fn new() -> Self {
Self { protocol_version: PROTOCOL_VERSION }
}
}
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FetchEventsMessage {
unref_tips: Vec<(u64, blake3::Hash)>,
}
impl FetchEventsMessage {
pub fn new(unref_tips: Vec<(u64, blake3::Hash)>) -> Self {
Self { unref_tips }
}
}
pub const MSG_EVENT: u8 = 1;
pub const MSG_FETCHEVENTS: u8 = 2;