bin/genev: testing some generic event utilization of event_graph

This commit is contained in:
Dastan-glitch
2023-02-23 03:16:13 +03:00
parent 5ffc4bab9a
commit b5acec7e82
11 changed files with 562 additions and 18 deletions

2
.gitignore vendored
View File

@@ -20,6 +20,8 @@
/faucetd
/fu
/fud
/genevd
/genev
/ircd
/ircd2
/tau

71
Cargo.lock generated
View File

@@ -599,9 +599,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.1.4"
version = "4.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f13b9c79b5d1dd500d20ef541215a6423c75829ef43117e1b4d17fd8af0b5d76"
checksum = "ec0b0588d44d4d63a87dbd75c136c166bbfd9a86a31cb89e09906521c7d3f5e3"
dependencies = [
"bitflags",
"clap_derive",
@@ -618,7 +618,7 @@ version = "4.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd125be87bf4c255ebc50de0b7f4d2a6201e8ac3dc86e39c0ad081dc5e7236fe"
dependencies = [
"clap 4.1.4",
"clap 4.1.6",
]
[[package]]
@@ -1049,12 +1049,12 @@ dependencies = [
[[package]]
name = "ctrlc"
version = "3.2.4"
version = "3.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1631ca6e3c59112501a9d87fd86f21591ff77acd31331e8a73f8d80a65bbdd71"
checksum = "bbcf33c2a618cbe41ee43ae6e9f2e48368cd9f9db2896f10167d8d762679f639"
dependencies = [
"nix",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
]
[[package]]
@@ -1124,7 +1124,7 @@ dependencies = [
"blake3",
"bs58",
"chrono",
"clap 4.1.4",
"clap 4.1.6",
"crypto_api_chachapoly",
"darkfi-derive",
"darkfi-derive-internal",
@@ -1505,7 +1505,7 @@ version = "0.4.0"
dependencies = [
"async-channel",
"async-std",
"clap 4.1.4",
"clap 4.1.6",
"darkfi",
"easy-parallel",
"hex",
@@ -1536,7 +1536,7 @@ dependencies = [
"async-std",
"blake3",
"bs58",
"clap 4.1.4",
"clap 4.1.6",
"clap_complete",
"darkfi",
"darkfi-dao-contract",
@@ -1891,7 +1891,7 @@ name = "fu"
version = "0.4.0"
dependencies = [
"async-std",
"clap 4.1.4",
"clap 4.1.6",
"darkfi",
"log",
"serde_json",
@@ -2071,6 +2071,41 @@ dependencies = [
"version_check",
]
[[package]]
name = "genev"
version = "0.1.0"
dependencies = [
"async-std",
"clap 4.1.6",
"darkfi",
"darkfi-serial",
"log",
"serde",
"serde_json",
"simplelog",
"url",
]
[[package]]
name = "genevd"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"ctrlc",
"darkfi",
"darkfi-serial",
"easy-parallel",
"log",
"serde",
"serde_json",
"simplelog",
"smol",
"structopt",
"structopt-toml",
"url",
]
[[package]]
name = "getrandom"
version = "0.2.8"
@@ -2391,7 +2426,7 @@ dependencies = [
"async-trait",
"bs58",
"chrono",
"clap 4.1.4",
"clap 4.1.6",
"crypto_box",
"ctrlc",
"darkfi",
@@ -2422,7 +2457,7 @@ dependencies = [
"async-trait",
"bs58",
"chrono",
"clap 4.1.4",
"clap 4.1.6",
"crypto_box",
"ctrlc",
"darkfi",
@@ -3624,9 +3659,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.91"
version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76"
dependencies = [
"itoa 1.0.5",
"ryu",
@@ -4062,7 +4097,7 @@ version = "0.4.0"
dependencies = [
"async-std",
"chrono",
"clap 4.1.4",
"clap 4.1.6",
"colored",
"darkfi",
"log",
@@ -4533,7 +4568,7 @@ name = "vanityaddr"
version = "0.4.0"
dependencies = [
"bs58",
"clap 4.1.4",
"clap 4.1.6",
"ctrlc",
"darkfi",
"darkfi-sdk",
@@ -5161,7 +5196,7 @@ dependencies = [
name = "zkas"
version = "0.4.0"
dependencies = [
"clap 4.1.4",
"clap 4.1.6",
"darkfi",
]
@@ -5169,7 +5204,7 @@ dependencies = [
name = "zktool"
version = "0.4.0"
dependencies = [
"clap 4.1.4",
"clap 4.1.6",
"darkfi",
"darkfi-sdk",
]

View File

@@ -26,6 +26,8 @@ members = [
"bin/faucetd",
"bin/fud/fu",
"bin/fud/fud",
"bin/genev/genevd",
"bin/genev/genev-cli",
"bin/ircd",
"bin/ircd2",
"bin/dnetview",

View File

@@ -0,0 +1,22 @@
[package]
name = "genev"
description = "Generic Event example CLI"
version = "0.1.0"
edition = "2021"
authors = ["Dyne.org foundation <foundation@dyne.org>"]
license = "AGPL-3.0-only"
homepage = "https://dark.fi"
repository = "https://github.com/darkrenaissance/darkfi"
[dependencies]
darkfi = {path = "../../../", features = ["event-graph", "rpc", "bs58"]}
darkfi-serial = {path = "../../../src/serial"}
async-std = {version = "1.12.0", features = ["attributes"]}
clap = {version = "4.1.6", features = ["derive"]}
log = "0.4.17"
simplelog = "0.12.0"
serde = {version = "1.0.152", features = ["derive"]}
serde_json = "1.0.93"
url = "2.3.1"

View File

@@ -0,0 +1,85 @@
use clap::{Parser, Subcommand};
use darkfi_serial::{SerialDecodable, SerialEncodable};
use serde::Serialize;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use url::Url;
use darkfi::{
rpc::client::RpcClient,
util::cli::{get_log_config, get_log_level},
Result,
};
use crate::rpc::Gen;
mod rpc;
#[derive(SerialEncodable, SerialDecodable, Debug, Serialize)]
pub struct BaseEvent {
pub nick: String,
pub title: String,
pub text: String,
}
#[derive(Parser)]
#[clap(name = "genev", version)]
struct Args {
#[arg(short, action = clap::ArgAction::Count)]
/// Increase verbosity (-vvv supported)
verbose: u8,
#[clap(short, long, default_value = "tcp://127.0.0.1:28880")]
/// JSON-RPC endpoint
endpoint: Url,
#[clap(subcommand)]
command: Option<SubCmd>,
}
#[derive(Subcommand)]
enum SubCmd {
Add { values: Vec<String> },
List,
}
#[async_std::main]
async fn main() -> Result<()> {
let args = Args::parse();
let log_level = get_log_level(args.verbose.into());
let log_config = get_log_config();
TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?;
let rpc_client = RpcClient::new(args.endpoint).await?;
let gen = Gen { rpc_client };
match args.command {
Some(subcmd) => match subcmd {
SubCmd::Add { values } => {
let event = BaseEvent {
nick: values[0].clone(),
title: values[1].clone(),
text: values[2..].join(" "),
};
return gen.add(event).await
}
SubCmd::List => {
let events = gen.list().await?;
for event in events {
println!("=============================");
println!(
"- nickname: {}, title: {}, text: {}",
event.action.nick, event.action.title, event.action.text
);
}
}
},
None => println!("none"),
}
gen.close_connection().await?;
Ok(())
}

View File

@@ -0,0 +1,42 @@
use log::debug;
use serde_json::json;
use darkfi::{
event_graph::model::Event,
rpc::{client::RpcClient, jsonrpc::JsonRequest},
Result,
};
use crate::BaseEvent;
pub struct Gen {
pub rpc_client: RpcClient,
}
impl Gen {
pub async fn close_connection(&self) -> Result<()> {
self.rpc_client.close().await
}
/// Add a new task.
pub async fn add(&self, event: BaseEvent) -> Result<()> {
let req = JsonRequest::new("add", json!([event]));
let rep = self.rpc_client.request(req).await?;
debug!("Got reply: {:?}", rep);
Ok(())
}
/// Get current open tasks ids.
pub async fn list(&self) -> Result<Vec<Event<BaseEvent>>> {
let req = JsonRequest::new("list", json!([]));
let rep = self.rpc_client.request(req).await?;
debug!("reply: {:?}", rep);
let bytes: Vec<u8> = serde_json::from_value(rep)?;
let events: Vec<Event<BaseEvent>> = darkfi_serial::deserialize(&bytes)?;
Ok(events)
}
}

View File

@@ -0,0 +1,34 @@
## JSON-RPC listen URL
#rpc_listen="tcp://127.0.0.1:28880"
## Current display name
#nickname="NICKNAME"
## net settings
[net]
## P2P accept addresses
#inbound = ["tcp://127.0.0.1:28881"]
## Connection slots
outbound_connections=8
## P2P external addresses
#external_addr = ["tls://127.0.0.1:28881"]
## Peers to connect to
#peers = ["tls://127.0.0.1:28881"]
## Seed nodes to connect to
seeds=["tls://lilith0.dark.fi:28881", "tls://lilith1.dark.fi:28881"]
# Prefered transports for outbound connections
#transports = ["tls", "tcp"]
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10

View File

@@ -0,0 +1,28 @@
[package]
name = "genevd"
description = "Generic Event example daemon"
version = "0.1.0"
edition = "2021"
authors = ["Dyne.org foundation <foundation@dyne.org>"]
license = "AGPL-3.0-only"
homepage = "https://dark.fi"
repository = "https://github.com/darkrenaissance/darkfi"
[dependencies]
darkfi = {path = "../../../", features = ["event-graph", "rpc", "bs58", "util"]}
darkfi-serial = {path = "../../../src/serial"}
async-std = {version = "1.12.0", features = ["attributes"]}
async-trait = "0.1.64"
ctrlc = { version = "3.2.5", features = ["termination"] }
easy-parallel = "3.2.0"
simplelog = "0.12.0"
smol = "1.3.0"
log = "0.4.17"
url = "2.3.1"
# Argument parsing
serde = {version = "1.0.152", features = ["derive"]}
serde_json = "1.0.93"
structopt = "0.3.26"
structopt-toml = "0.5.1"

View File

@@ -0,0 +1,19 @@
use darkfi::event_graph::EventMsg;
use darkfi_serial::{SerialDecodable, SerialEncodable};
#[derive(SerialEncodable, SerialDecodable, Clone, Debug)]
pub struct GenEvent {
pub nick: String,
pub title: String,
pub text: String,
}
impl EventMsg for GenEvent {
fn new() -> Self {
Self {
nick: "groot".to_string(),
title: "I am groot".to_string(),
text: "I am groot!!".to_string(),
}
}
}

View File

@@ -0,0 +1,155 @@
use async_std::sync::{Arc, Mutex};
use log::{info, warn};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use darkfi::{
async_daemonize, cli_desc,
event_graph::{
events_queue::EventsQueue,
model::{Event, EventId, Model},
protocol_event::{ProtocolEvent, Seen, SeenPtr, UnreadEvents},
view::{View, ViewPtr},
},
net::{self, settings::SettingsOpt},
rpc::server::listen_and_serve,
Result,
};
mod genevent;
mod rpc;
use genevent::GenEvent;
use url::Url;
use crate::rpc::JsonRpcInterface;
const CONFIG_FILE: &str = "genev_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../../genev_config.toml");
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "genev", about = cli_desc!())]
struct Args {
#[structopt(short, long)]
/// Configuration file to use
config: Option<String>,
/// JSON-RPC listen URL
#[structopt(long = "rpc", default_value = "tcp://127.0.0.1:28880")]
pub rpc_listen: Url,
#[structopt(flatten)]
pub net: SettingsOpt,
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
verbose: u8,
}
async fn start_sync_loop(
view: ViewPtr<GenEvent>,
seen: SeenPtr<EventId>,
missed_events: Arc<Mutex<Vec<Event<GenEvent>>>>,
) -> Result<()> {
loop {
let event = view.lock().await.process().await?;
if !seen.push(&event.hash()).await {
continue
}
info!("new event: {:?}", event);
missed_events.lock().await.push(event.clone());
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, executor: Arc<smol::Executor<'_>>) -> Result<()> {
////////////////////
// Initialize the base structures
////////////////////
let events_queue = EventsQueue::<GenEvent>::new();
let model = Arc::new(Mutex::new(Model::new(events_queue.clone())));
let view = Arc::new(Mutex::new(View::new(events_queue)));
let model_clone = model.clone();
////////////////////
// P2p setup
////////////////////
// Buffers
let seen_event = Seen::new();
let seen_inv = Seen::new();
let unread_events = UnreadEvents::new();
let unread_events_clone = unread_events.clone();
// Check the version
let mut net_settings = args.net.clone();
net_settings.app_version = Some(option_env!("CARGO_PKG_VERSION").unwrap_or("").to_string());
// New p2p
let p2p = net::P2p::new(net_settings.into()).await;
let p2p2 = p2p.clone();
// Register the protocol_event
let registry = p2p.protocol_registry();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let seen_event = seen_event.clone();
let seen_inv = seen_inv.clone();
let model = model.clone();
let unread_events = unread_events.clone();
async move {
ProtocolEvent::init(channel, p2p, model, seen_event, seen_inv, unread_events).await
}
})
.await;
// Start
p2p.clone().start(executor.clone()).await?;
// Run
let executor_cloned = executor.clone();
executor_cloned.spawn(p2p.clone().run(executor.clone())).detach();
////////////////////
// Listner
////////////////////
let seen_ids = Seen::new();
let missed_events = Arc::new(Mutex::new(vec![]));
executor.spawn(start_sync_loop(view, seen_ids.clone(), missed_events.clone())).detach();
//
// RPC interface
//
let rpc_interface = Arc::new(JsonRpcInterface::new(
"Alolymous".to_string(),
unread_events_clone,
missed_events.clone(),
model_clone,
seen_ids.clone(),
p2p.clone(),
));
let _ex = executor.clone();
executor.spawn(listen_and_serve(args.rpc_listen.clone(), rpc_interface, _ex)).detach();
////////////////////
// Wait for SIGINT
////////////////////
let (signal, shutdown) = smol::channel::bounded::<()>(1);
ctrlc::set_handler(move || {
warn!(target: "ircd", "ircd start Exit Signal");
// cleaning up tasks running in the background
async_std::task::block_on(signal.send(())).unwrap();
})
.unwrap();
shutdown.recv().await?;
print!("\r");
info!("Caught termination signal, cleaning up and exiting...");
// stop p2p
p2p2.stop().await;
Ok(())
}

120
bin/genev/genevd/src/rpc.rs Normal file
View File

@@ -0,0 +1,120 @@
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use log::debug;
use serde_json::{json, Value};
use darkfi::{
event_graph::{
get_current_time,
model::{Event, EventId, ModelPtr},
protocol_event::{SeenPtr, UnreadEvents},
},
net,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
server::RequestHandler,
},
};
use crate::genevent::GenEvent;
pub struct JsonRpcInterface {
_nickname: String,
unread_events: Arc<Mutex<UnreadEvents<GenEvent>>>,
missed_events: Arc<Mutex<Vec<Event<GenEvent>>>>,
model: ModelPtr<GenEvent>,
seen: SeenPtr<EventId>,
p2p: net::P2pPtr,
}
#[async_trait]
impl RequestHandler for JsonRpcInterface {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
if !req.params.is_array() {
return JsonError::new(ErrorCode::InvalidParams, None, req.id).into()
}
match req.method.as_str() {
Some("add") => self.add(req.id, req.params).await,
Some("list") => self.list(req.id, req.params).await,
Some("ping") => self.pong(req.id, req.params).await,
Some("get_info") => self.get_info(req.id, req.params).await,
Some(_) | None => return JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
}
impl JsonRpcInterface {
pub fn new(
_nickname: String,
unread_events: Arc<Mutex<UnreadEvents<GenEvent>>>,
missed_events: Arc<Mutex<Vec<Event<GenEvent>>>>,
model: ModelPtr<GenEvent>,
seen: SeenPtr<EventId>,
p2p: net::P2pPtr,
) -> Self {
Self { _nickname, unread_events, missed_events, model, seen, p2p }
}
// RPCAPI:
// Replies to a ping method.
// --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": "pong", "id": 42}
async fn pong(&self, id: Value, _params: Value) -> JsonResult {
JsonResponse::new(json!("pong"), id).into()
}
// RPCAPI:
// Retrieves P2P network information.
// --> {"jsonrpc": "2.0", "method": "get_info", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42}
async fn get_info(&self, id: Value, _params: Value) -> JsonResult {
let resp = self.p2p.get_info().await;
JsonResponse::new(resp, id).into()
}
// RPCAPI:
// Add a new event
// --> {"jsonrpc": "2.0", "method": "add", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [nickname, ...], "id": 1}
async fn add(&self, id: Value, params: Value) -> JsonResult {
let genevent = GenEvent {
nick: params[0].get("nick").unwrap().to_string(),
title: params[0].get("title").unwrap().to_string(),
text: params[0].get("text").unwrap().to_string(),
};
let event = Event {
previous_event_hash: self.model.lock().await.get_head_hash(),
action: genevent,
timestamp: get_current_time(),
read_confirms: 0,
};
if !self.seen.push(&event.hash()).await {
let json = json!(false);
return JsonResponse::new(json, id).into()
}
self.unread_events.lock().await.insert(&event);
self.p2p.broadcast(event).await.unwrap();
let json = json!(true);
JsonResponse::new(json, id).into()
}
// RPCAPI:
// List events
// --> {"jsonrpc": "2.0", "method": "list", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1}
async fn list(&self, id: Value, _params: Value) -> JsonResult {
debug!("fetching all events");
let msd = self.missed_events.lock().await.clone();
let ser = darkfi_serial::serialize(&msd);
let json = json!(ser);
JsonResponse::new(json, id).into()
}
}