From cc9fd99460ebd89d61b233edec4ba0dc28b43a71 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sun, 31 Jul 2022 23:05:35 +0400 Subject: [PATCH] darkwiki: WIP: POC Operational transformation and raft --- script/research/darkwiki/Cargo.toml | 16 ++ script/research/darkwiki/darkwiki.toml | 30 +++ script/research/darkwiki/src/error.rs | 25 +++ script/research/darkwiki/src/jsonrpc.rs | 55 ++++++ script/research/darkwiki/src/main.rs | 206 ++++++++++++++++++++- script/research/darkwiki/src/sequence.rs | 195 +++++++++++++++++++ script/research/darkwiki/src/update_cli.rs | 38 ++++ 7 files changed, 563 insertions(+), 2 deletions(-) create mode 100644 script/research/darkwiki/darkwiki.toml create mode 100644 script/research/darkwiki/src/error.rs create mode 100644 script/research/darkwiki/src/jsonrpc.rs create mode 100644 script/research/darkwiki/src/sequence.rs create mode 100644 script/research/darkwiki/src/update_cli.rs diff --git a/script/research/darkwiki/Cargo.toml b/script/research/darkwiki/Cargo.toml index b9ee705ad..fa004fe6b 100644 --- a/script/research/darkwiki/Cargo.toml +++ b/script/research/darkwiki/Cargo.toml @@ -32,5 +32,21 @@ fxhash = "0.2.1" serde = {version = "1.0.138", features = ["derive"]} serde_json = "1.0.82" structopt = "0.3.26" +structopt-toml = "0.5.0" +unicode-segmentation = "1.9.0" +crypto_box = {version = "0.7.2", features = ["std"]} +hex = "0.4.3" +bs58 = "0.4.0" + [workspace] + +[[bin]] +name = "darkwikid" +path = "src/main.rs" + +[[bin]] +name = "darkwikiupdate" +path = "src/update_cli.rs" + + diff --git a/script/research/darkwiki/darkwiki.toml b/script/research/darkwiki/darkwiki.toml new file mode 100644 index 000000000..f8e856854 --- /dev/null +++ b/script/research/darkwiki/darkwiki.toml @@ -0,0 +1,30 @@ +## JSON-RPC listen URL +#rpc_listen="tcp://127.0.0.1:13055" + +## Sets Datastore Path +#datastore="~/.config/darkfi/darkwiki" + +## Raft net settings +[net] +## P2P accept address +inbound="tcp://127.0.0.1:13001" + +## Connection slots +#outbound_connections=5 + +## P2P external address +external_addr="tls://127.0.0.1:13001" + +## Peers to connect to +#peers=["tls://127.0.0.1:13003"] + +## Seed nodes to connect to +#seeds=[] + +## these are the default configuration for the p2p network +#manual_attempt_limit=0 +#seed_query_timeout_seconds=8 +#connect_timeout_seconds=10 +#channel_handshake_seconds=4 +#channel_heartbeat_seconds=10 + diff --git a/script/research/darkwiki/src/error.rs b/script/research/darkwiki/src/error.rs new file mode 100644 index 000000000..7a9cfab97 --- /dev/null +++ b/script/research/darkwiki/src/error.rs @@ -0,0 +1,25 @@ +#[derive(thiserror::Error, Debug)] +pub enum DarkWikiError { + #[error("Add Operation failed")] + AddOperationFailed, + #[error("Encryption error: `{0}`")] + EncryptionError(String), + #[error("Json serialization error: `{0}`")] + SerdeJsonError(String), + #[error("InternalError")] + Darkfi(#[from] darkfi::error::Error), +} + +pub type DarkWikiResult = std::result::Result; + +impl From for DarkWikiError { + fn from(err: serde_json::Error) -> DarkWikiError { + DarkWikiError::SerdeJsonError(err.to_string()) + } +} + +impl From for DarkWikiError { + fn from(err: crypto_box::aead::Error) -> DarkWikiError { + DarkWikiError::EncryptionError(err.to_string()) + } +} diff --git a/script/research/darkwiki/src/jsonrpc.rs b/script/research/darkwiki/src/jsonrpc.rs new file mode 100644 index 000000000..6bfe528ab --- /dev/null +++ b/script/research/darkwiki/src/jsonrpc.rs @@ -0,0 +1,55 @@ +use async_trait::async_trait; + +use log::error; +use serde_json::{json, Value}; + +use darkfi::{ + rpc::{ + jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult}, + server::RequestHandler, + }, + Error, +}; + +pub struct JsonRpcInterface { + update_notifier: async_channel::Sender<()>, +} + +#[async_trait] +impl RequestHandler for JsonRpcInterface { + async fn handle_request(&self, req: JsonRequest) -> JsonResult { + if !req.params.is_array() { + return JsonError::new(ErrorCode::InvalidParams, None, req.id).into() + } + + let params = req.params.as_array().unwrap(); + + let rep = match req.method.as_str() { + Some("update") => self.update(req.id, params).await, + Some(_) | None => return JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), + }; + + rep + } +} + +impl JsonRpcInterface { + pub fn new(update_notifier: async_channel::Sender<()>) -> Self { + Self { update_notifier } + } + + // RPCAPI: + // Update files in ~/darkwiki + // --> {"jsonrpc": "2.0", "method": "update", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "result": true, "id": 1} + async fn update(&self, id: Value, _params: &[Value]) -> JsonResult { + let res = self.update_notifier.send(()).await.map_err(Error::from); + + if let Err(e) = res { + error!("Failed to update: {}", e); + return JsonError::new(ErrorCode::InternalError, None, id).into() + } + + JsonResponse::new(json!(true), id).into() + } +} diff --git a/script/research/darkwiki/src/main.rs b/script/research/darkwiki/src/main.rs index e7a11a969..4b735c3e3 100644 --- a/script/research/darkwiki/src/main.rs +++ b/script/research/darkwiki/src/main.rs @@ -1,3 +1,205 @@ -fn main() { - println!("Hello, world!"); +use async_std::sync::{Arc, Mutex}; +use std::{ + fs::{create_dir_all, read_dir}, + path::{Path, PathBuf}, +}; + +use async_executor::Executor; +use futures::{select, FutureExt}; +use fxhash::FxHashMap; +use log::{error, warn}; +use serde::Deserialize; +use smol::future; +use structopt::StructOpt; +use structopt_toml::StructOptToml; +use url::Url; + +use darkfi::{ + async_daemonize, + net::{self, settings::SettingsOpt}, + raft::{NetMsg, ProtocolRaft, Raft, RaftSettings}, + rpc::server::listen_and_serve, + util::{ + cli::{get_log_config, get_log_level, spawn_config}, + expand_path, + file::{load_file, load_json_file, save_file, save_json_file}, + gen_id, + path::get_config_path, + }, + Error, Result, +}; + +mod error; +mod jsonrpc; +mod sequence; + +use error::DarkWikiResult; +use jsonrpc::JsonRpcInterface; +use sequence::{Operation, Sequence}; + +pub const CONFIG_FILE: &str = "darkwiki.toml"; +pub const CONFIG_FILE_CONTENTS: &str = include_str!("../darkwiki.toml"); +pub const DOCS_PATH: &str = "~/darkwiki"; + +/// darkwiki cli +#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] +#[serde(default)] +#[structopt(name = "darkwiki")] +pub struct Args { + /// Sets a custom config file + #[structopt(long)] + pub config: Option, + /// Sets Datastore Path + #[structopt(long, default_value = "~/.config/darkfi/darkwiki")] + pub datastore: String, + /// JSON-RPC listen URL + #[structopt(long = "rpc", default_value = "tcp://127.0.0.1:13055")] + pub rpc_listen: Url, + #[structopt(flatten)] + pub net: SettingsOpt, + /// Increase verbosity + #[structopt(short, parse(from_occurrences))] + pub verbose: u8, +} + +fn on_receive_operation(op: Operation, datastore_path: &Path) -> DarkWikiResult<()> { + let json_files_path = datastore_path.join("files"); + let docs_path = PathBuf::from(expand_path(&DOCS_PATH)?); + + let id_path = json_files_path.join(op.id()); + + let json_file = load_json_file::(&id_path); + let mut seq: Sequence = if let Ok(file) = json_file { file } else { Sequence::new(&op.id()) }; + + seq.add_op(&op)?; + save_json_file::(&id_path, &seq)?; + + //let st = seq.apply(); + //save_file(&docs_path.join(op.id()), &st)?; + + Ok(()) +} + +fn on_receive_update(datastore_path: &Path) -> DarkWikiResult> { + let ret = vec![]; + + let json_files_path = datastore_path.join("files"); + let docs_path = PathBuf::from(expand_path(&DOCS_PATH)?); + + let files = read_dir(&docs_path).unwrap(); + + for file in files { + let file_path = file.unwrap().path(); + let path = docs_path.join(&file_path); + let edit = load_file(&path)?; + + if let Ok(seq) = load_json_file::(&json_files_path.join(&file_path)) { + // + // TODO the transformation should happen here + // + } else { + let mut seq = Sequence::new(&gen_id(30)); + //seq.insert(0, &edit)?; + save_json_file(&json_files_path.join(file_path), &seq)?; + } + } + + Ok(ret) +} + +async fn start( + update_notifier_rv: async_channel::Receiver<()>, + raft_sender: async_channel::Sender, + raft_receiver: async_channel::Receiver, + datastore_path: PathBuf, +) -> DarkWikiResult<()> { + loop { + select! { + _ = update_notifier_rv.recv().fuse() => { + let ops = on_receive_update(&datastore_path)?; + for op in ops { + raft_sender.send(op).await.map_err(Error::from)?; + } + } + op = raft_receiver.recv().fuse() => { + let op = op.map_err(Error::from)?; + on_receive_operation(op, &datastore_path)?; + } + + } + } +} + +async_daemonize!(realmain); +async fn realmain(settings: Args, executor: Arc>) -> Result<()> { + let datastore_path = expand_path(&settings.datastore)?; + + create_dir_all(expand_path(&DOCS_PATH)?)?; + create_dir_all(datastore_path.join("files"))?; + + let (update_notifier_sx, update_notifier_rv) = async_channel::unbounded::<()>(); + + // + // RPC + // + let rpc_interface = Arc::new(JsonRpcInterface::new(update_notifier_sx)); + executor.spawn(listen_and_serve(settings.rpc_listen.clone(), rpc_interface)).detach(); + + // + // Raft + // + let net_settings = settings.net; + let seen_net_msgs = Arc::new(Mutex::new(FxHashMap::default())); + + let datastore_raft = datastore_path.join("darkwiki.db"); + let raft_settings = RaftSettings { datastore_path: datastore_raft, ..RaftSettings::default() }; + + let mut raft = Raft::::new(raft_settings, seen_net_msgs.clone())?; + + executor + .spawn(start(update_notifier_rv, raft.sender(), raft.receiver(), datastore_path.clone())) + .detach(); + + // + // P2p setup + // + let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::(); + + let p2p = net::P2p::new(net_settings.into()).await; + let p2p = p2p.clone(); + + let registry = p2p.protocol_registry(); + + let raft_node_id = raft.id(); + registry + .register(net::SESSION_ALL, move |channel, p2p| { + let raft_node_id = raft_node_id.clone(); + let sender = p2p_send_channel.clone(); + let seen_net_msgs_cloned = seen_net_msgs.clone(); + async move { + ProtocolRaft::init(raft_node_id, channel, sender, p2p, seen_net_msgs_cloned).await + } + }) + .await; + + p2p.clone().start(executor.clone()).await?; + + executor.spawn(p2p.clone().run(executor.clone())).detach(); + + // + // Waiting Exit signal + // + let (signal, shutdown) = async_channel::bounded::<()>(1); + ctrlc_async::set_async_handler(async move { + warn!(target: "darkwiki", "Catch exit signal"); + // cleaning up tasks running in the background + if let Err(e) = signal.send(()).await { + error!("Error on sending exit signal: {}", e); + } + }) + .unwrap(); + + raft.run(p2p.clone(), p2p_recv_channel.clone(), executor.clone(), shutdown.clone()).await?; + + Ok(()) } diff --git a/script/research/darkwiki/src/sequence.rs b/script/research/darkwiki/src/sequence.rs new file mode 100644 index 000000000..6e35444fd --- /dev/null +++ b/script/research/darkwiki/src/sequence.rs @@ -0,0 +1,195 @@ +use std::{io, result::Result}; + +use serde::{Deserialize, Serialize}; +use unicode_segmentation::UnicodeSegmentation; + +use darkfi::util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable}; + +use crate::error::DarkWikiError; + +#[derive(PartialEq, Serialize, Deserialize, Clone, Debug)] +pub enum OperationMethod { + Delete(u64), + Insert(String), + Retain(u64), +} + +#[derive(PartialEq, Serialize, Deserialize, SerialEncodable, SerialDecodable, Clone, Debug)] +pub struct Operation { + id: String, + method: OperationMethod, +} + +impl Operation { + pub fn id(&self) -> String { + self.id.clone() + } +} + +#[derive(PartialEq, Serialize, Deserialize, Clone, Debug)] +pub struct Sequence { + id: String, + operations: Vec, + len: u64, +} + +impl Sequence { + pub fn new(id: &str) -> Self { + Self { id: id.to_string(), operations: vec![], len: 0 } + } + /// + /// Apply all operations to the provided &str + /// Return the final String + /// + pub fn apply(&self, s: &str) -> String { + let mut st = vec![]; + let mut chars = s.graphemes(true).collect::>(); + for op in &self.operations { + match op { + OperationMethod::Retain(n) => { + st.extend(chars[..(*n as usize)].to_vec()); + } + OperationMethod::Delete(n) => { + chars.drain(0..(*n as usize)); + } + OperationMethod::Insert(insert) => { + st.extend(insert.graphemes(true).collect::>()); + } + } + } + + st.join("") + } + + /// + /// Add new operation + /// Return AddOperationFailed error if failed + /// + pub fn add_op(&mut self, op: &Operation) -> Result<(), DarkWikiError> { + match &op.method { + OperationMethod::Delete(n) => { + if *n == 0 { + return Ok(()) + } + } + OperationMethod::Insert(insert) => { + if insert.is_empty() { + return Ok(()) + } + } + OperationMethod::Retain(n) => { + if *n == 0 { + return Ok(()) + } + } + } + + self.operations.push(op.method.clone()); + Ok(()) + } + + /// + /// Insert string at `n` position with Insert Operation + /// Return AddOperationFailed if failed + /// + pub fn insert(&mut self, st: &str) -> Result { + let method = OperationMethod::Insert(st.into()); + let op = Operation { id: self.id.clone(), method }; + self.add_op(&op)?; + Ok(op) + } + + /// + /// Move the position of cursor + /// Return AddOperationFailed if failed + /// + pub fn retain(&mut self, n: u64) -> Result { + let method = OperationMethod::Retain(n); + let op = Operation { id: self.id.clone(), method }; + self.add_op(&op)?; + Ok(op) + } + + /// + /// Delete string at `n` position with Delete Operation + /// Return AddOperationFailed if failed + /// + pub fn delete(&mut self, n: u64) -> Result { + let method = OperationMethod::Delete(n); + let op = Operation { id: self.id.clone(), method }; + self.add_op(&op)?; + Ok(op) + } +} + +impl Encodable for OperationMethod { + fn encode(&self, mut s: S) -> darkfi::Result { + let len: usize = match self { + Self::Delete(i) => (0 as u8).encode(&mut s)? + i.encode(&mut s)?, + Self::Insert(t) => (1 as u8).encode(&mut s)? + t.encode(&mut s)?, + Self::Retain(i) => (2 as u8).encode(&mut s)? + i.encode(&mut s)?, + }; + Ok(len) + } +} + +impl Decodable for OperationMethod { + fn decode(mut d: D) -> darkfi::Result { + let com: u8 = Decodable::decode(&mut d)?; + match com { + 0 => { + let i: u64 = Decodable::decode(&mut d)?; + Ok(Self::Delete(i)) + } + 1 => { + let t: String = Decodable::decode(d)?; + Ok(Self::Insert(t)) + } + 2 => { + let i: u64 = Decodable::decode(&mut d)?; + Ok(Self::Retain(i)) + } + _ => Err(darkfi::Error::ParseFailed("Parse OperationMethod failed")), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use darkfi::util::{ + gen_id, + serial::{deserialize, serialize}, + }; + + #[test] + fn test_seq() { + // + // English + // + let _t = "this is the first paragraph"; + let mut seq = Sequence::new(&gen_id(30)); + + // + // Korean + // + let _t = "안녕하십니까"; + let mut seq = Sequence::new(&gen_id(30)); + + // + // Arabic + // + let _t = "عربي"; + let mut seq = Sequence::new(&gen_id(30)); + } + + #[test] + fn test_serialize() { + let op_method = OperationMethod::Delete(3); + + let op_method_ser = serialize(&op_method); + let op_method_deser = deserialize(&op_method_ser).unwrap(); + + assert_eq!(op_method, op_method_deser); + } +} diff --git a/script/research/darkwiki/src/update_cli.rs b/script/research/darkwiki/src/update_cli.rs new file mode 100644 index 000000000..2e0114dba --- /dev/null +++ b/script/research/darkwiki/src/update_cli.rs @@ -0,0 +1,38 @@ +use serde_json::json; +use simplelog::{ColorChoice, TermLogger, TerminalMode}; +use structopt::StructOpt; +use url::Url; + +use darkfi::{ + rpc::{client::RpcClient, jsonrpc::JsonRequest}, + util::cli::{get_log_config, get_log_level}, + Result, +}; + +#[derive(Clone, Debug, StructOpt)] +#[structopt(name = "darkwikiupdate")] +struct Args { + #[structopt(short, parse(from_occurrences))] + /// Increase verbosity (-vvv supported) + verbose: u8, + + #[structopt(short, long, default_value = "tcp://127.0.0.1:13055")] + /// darkfid JSON-RPC endpoint + endpoint: Url, +} + +#[async_std::main] +async fn main() -> Result<()> { + let args = Args::from_args(); + + let log_level = get_log_level(args.verbose.into()); + let log_config = get_log_config(); + TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?; + + let rpc_client = RpcClient::new(args.endpoint).await?; + + let req = JsonRequest::new("update", json!([])); + rpc_client.request(req).await?; + + rpc_client.close().await +}