darkwiki: WIP: POC Operational transformation and raft

This commit is contained in:
ghassmo
2022-07-31 23:05:35 +04:00
parent e521a8a05c
commit cc9fd99460
7 changed files with 563 additions and 2 deletions

View File

@@ -32,5 +32,21 @@ fxhash = "0.2.1"
serde = {version = "1.0.138", features = ["derive"]}
serde_json = "1.0.82"
structopt = "0.3.26"
structopt-toml = "0.5.0"
unicode-segmentation = "1.9.0"
crypto_box = {version = "0.7.2", features = ["std"]}
hex = "0.4.3"
bs58 = "0.4.0"
[workspace]
[[bin]]
name = "darkwikid"
path = "src/main.rs"
[[bin]]
name = "darkwikiupdate"
path = "src/update_cli.rs"

View File

@@ -0,0 +1,30 @@
## JSON-RPC listen URL
#rpc_listen="tcp://127.0.0.1:13055"
## Sets Datastore Path
#datastore="~/.config/darkfi/darkwiki"
## Raft net settings
[net]
## P2P accept address
inbound="tcp://127.0.0.1:13001"
## Connection slots
#outbound_connections=5
## P2P external address
external_addr="tls://127.0.0.1:13001"
## Peers to connect to
#peers=["tls://127.0.0.1:13003"]
## Seed nodes to connect to
#seeds=[]
## these are the default configuration for the p2p network
#manual_attempt_limit=0
#seed_query_timeout_seconds=8
#connect_timeout_seconds=10
#channel_handshake_seconds=4
#channel_heartbeat_seconds=10

View File

@@ -0,0 +1,25 @@
#[derive(thiserror::Error, Debug)]
pub enum DarkWikiError {
#[error("Add Operation failed")]
AddOperationFailed,
#[error("Encryption error: `{0}`")]
EncryptionError(String),
#[error("Json serialization error: `{0}`")]
SerdeJsonError(String),
#[error("InternalError")]
Darkfi(#[from] darkfi::error::Error),
}
pub type DarkWikiResult<T> = std::result::Result<T, DarkWikiError>;
impl From<serde_json::Error> for DarkWikiError {
fn from(err: serde_json::Error) -> DarkWikiError {
DarkWikiError::SerdeJsonError(err.to_string())
}
}
impl From<crypto_box::aead::Error> for DarkWikiError {
fn from(err: crypto_box::aead::Error) -> DarkWikiError {
DarkWikiError::EncryptionError(err.to_string())
}
}

View File

@@ -0,0 +1,55 @@
use async_trait::async_trait;
use log::error;
use serde_json::{json, Value};
use darkfi::{
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
server::RequestHandler,
},
Error,
};
pub struct JsonRpcInterface {
update_notifier: async_channel::Sender<()>,
}
#[async_trait]
impl RequestHandler for JsonRpcInterface {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
if !req.params.is_array() {
return JsonError::new(ErrorCode::InvalidParams, None, req.id).into()
}
let params = req.params.as_array().unwrap();
let rep = match req.method.as_str() {
Some("update") => self.update(req.id, params).await,
Some(_) | None => return JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
};
rep
}
}
impl JsonRpcInterface {
pub fn new(update_notifier: async_channel::Sender<()>) -> Self {
Self { update_notifier }
}
// RPCAPI:
// Update files in ~/darkwiki
// --> {"jsonrpc": "2.0", "method": "update", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
async fn update(&self, id: Value, _params: &[Value]) -> JsonResult {
let res = self.update_notifier.send(()).await.map_err(Error::from);
if let Err(e) = res {
error!("Failed to update: {}", e);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
JsonResponse::new(json!(true), id).into()
}
}

View File

@@ -1,3 +1,205 @@
fn main() {
println!("Hello, world!");
use async_std::sync::{Arc, Mutex};
use std::{
fs::{create_dir_all, read_dir},
path::{Path, PathBuf},
};
use async_executor::Executor;
use futures::{select, FutureExt};
use fxhash::FxHashMap;
use log::{error, warn};
use serde::Deserialize;
use smol::future;
use structopt::StructOpt;
use structopt_toml::StructOptToml;
use url::Url;
use darkfi::{
async_daemonize,
net::{self, settings::SettingsOpt},
raft::{NetMsg, ProtocolRaft, Raft, RaftSettings},
rpc::server::listen_and_serve,
util::{
cli::{get_log_config, get_log_level, spawn_config},
expand_path,
file::{load_file, load_json_file, save_file, save_json_file},
gen_id,
path::get_config_path,
},
Error, Result,
};
mod error;
mod jsonrpc;
mod sequence;
use error::DarkWikiResult;
use jsonrpc::JsonRpcInterface;
use sequence::{Operation, Sequence};
pub const CONFIG_FILE: &str = "darkwiki.toml";
pub const CONFIG_FILE_CONTENTS: &str = include_str!("../darkwiki.toml");
pub const DOCS_PATH: &str = "~/darkwiki";
/// darkwiki cli
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "darkwiki")]
pub struct Args {
/// Sets a custom config file
#[structopt(long)]
pub config: Option<String>,
/// Sets Datastore Path
#[structopt(long, default_value = "~/.config/darkfi/darkwiki")]
pub datastore: String,
/// JSON-RPC listen URL
#[structopt(long = "rpc", default_value = "tcp://127.0.0.1:13055")]
pub rpc_listen: Url,
#[structopt(flatten)]
pub net: SettingsOpt,
/// Increase verbosity
#[structopt(short, parse(from_occurrences))]
pub verbose: u8,
}
fn on_receive_operation(op: Operation, datastore_path: &Path) -> DarkWikiResult<()> {
let json_files_path = datastore_path.join("files");
let docs_path = PathBuf::from(expand_path(&DOCS_PATH)?);
let id_path = json_files_path.join(op.id());
let json_file = load_json_file::<Sequence>(&id_path);
let mut seq: Sequence = if let Ok(file) = json_file { file } else { Sequence::new(&op.id()) };
seq.add_op(&op)?;
save_json_file::<Sequence>(&id_path, &seq)?;
//let st = seq.apply();
//save_file(&docs_path.join(op.id()), &st)?;
Ok(())
}
fn on_receive_update(datastore_path: &Path) -> DarkWikiResult<Vec<Operation>> {
let ret = vec![];
let json_files_path = datastore_path.join("files");
let docs_path = PathBuf::from(expand_path(&DOCS_PATH)?);
let files = read_dir(&docs_path).unwrap();
for file in files {
let file_path = file.unwrap().path();
let path = docs_path.join(&file_path);
let edit = load_file(&path)?;
if let Ok(seq) = load_json_file::<Sequence>(&json_files_path.join(&file_path)) {
//
// TODO the transformation should happen here
//
} else {
let mut seq = Sequence::new(&gen_id(30));
//seq.insert(0, &edit)?;
save_json_file(&json_files_path.join(file_path), &seq)?;
}
}
Ok(ret)
}
async fn start(
update_notifier_rv: async_channel::Receiver<()>,
raft_sender: async_channel::Sender<Operation>,
raft_receiver: async_channel::Receiver<Operation>,
datastore_path: PathBuf,
) -> DarkWikiResult<()> {
loop {
select! {
_ = update_notifier_rv.recv().fuse() => {
let ops = on_receive_update(&datastore_path)?;
for op in ops {
raft_sender.send(op).await.map_err(Error::from)?;
}
}
op = raft_receiver.recv().fuse() => {
let op = op.map_err(Error::from)?;
on_receive_operation(op, &datastore_path)?;
}
}
}
}
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let datastore_path = expand_path(&settings.datastore)?;
create_dir_all(expand_path(&DOCS_PATH)?)?;
create_dir_all(datastore_path.join("files"))?;
let (update_notifier_sx, update_notifier_rv) = async_channel::unbounded::<()>();
//
// RPC
//
let rpc_interface = Arc::new(JsonRpcInterface::new(update_notifier_sx));
executor.spawn(listen_and_serve(settings.rpc_listen.clone(), rpc_interface)).detach();
//
// Raft
//
let net_settings = settings.net;
let seen_net_msgs = Arc::new(Mutex::new(FxHashMap::default()));
let datastore_raft = datastore_path.join("darkwiki.db");
let raft_settings = RaftSettings { datastore_path: datastore_raft, ..RaftSettings::default() };
let mut raft = Raft::<Operation>::new(raft_settings, seen_net_msgs.clone())?;
executor
.spawn(start(update_notifier_rv, raft.sender(), raft.receiver(), datastore_path.clone()))
.detach();
//
// P2p setup
//
let (p2p_send_channel, p2p_recv_channel) = async_channel::unbounded::<NetMsg>();
let p2p = net::P2p::new(net_settings.into()).await;
let p2p = p2p.clone();
let registry = p2p.protocol_registry();
let raft_node_id = raft.id();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let raft_node_id = raft_node_id.clone();
let sender = p2p_send_channel.clone();
let seen_net_msgs_cloned = seen_net_msgs.clone();
async move {
ProtocolRaft::init(raft_node_id, channel, sender, p2p, seen_net_msgs_cloned).await
}
})
.await;
p2p.clone().start(executor.clone()).await?;
executor.spawn(p2p.clone().run(executor.clone())).detach();
//
// Waiting Exit signal
//
let (signal, shutdown) = async_channel::bounded::<()>(1);
ctrlc_async::set_async_handler(async move {
warn!(target: "darkwiki", "Catch exit signal");
// cleaning up tasks running in the background
if let Err(e) = signal.send(()).await {
error!("Error on sending exit signal: {}", e);
}
})
.unwrap();
raft.run(p2p.clone(), p2p_recv_channel.clone(), executor.clone(), shutdown.clone()).await?;
Ok(())
}

View File

@@ -0,0 +1,195 @@
use std::{io, result::Result};
use serde::{Deserialize, Serialize};
use unicode_segmentation::UnicodeSegmentation;
use darkfi::util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable};
use crate::error::DarkWikiError;
#[derive(PartialEq, Serialize, Deserialize, Clone, Debug)]
pub enum OperationMethod {
Delete(u64),
Insert(String),
Retain(u64),
}
#[derive(PartialEq, Serialize, Deserialize, SerialEncodable, SerialDecodable, Clone, Debug)]
pub struct Operation {
id: String,
method: OperationMethod,
}
impl Operation {
pub fn id(&self) -> String {
self.id.clone()
}
}
#[derive(PartialEq, Serialize, Deserialize, Clone, Debug)]
pub struct Sequence {
id: String,
operations: Vec<OperationMethod>,
len: u64,
}
impl Sequence {
pub fn new(id: &str) -> Self {
Self { id: id.to_string(), operations: vec![], len: 0 }
}
///
/// Apply all operations to the provided &str
/// Return the final String
///
pub fn apply(&self, s: &str) -> String {
let mut st = vec![];
let mut chars = s.graphemes(true).collect::<Vec<&str>>();
for op in &self.operations {
match op {
OperationMethod::Retain(n) => {
st.extend(chars[..(*n as usize)].to_vec());
}
OperationMethod::Delete(n) => {
chars.drain(0..(*n as usize));
}
OperationMethod::Insert(insert) => {
st.extend(insert.graphemes(true).collect::<Vec<&str>>());
}
}
}
st.join("")
}
///
/// Add new operation
/// Return AddOperationFailed error if failed
///
pub fn add_op(&mut self, op: &Operation) -> Result<(), DarkWikiError> {
match &op.method {
OperationMethod::Delete(n) => {
if *n == 0 {
return Ok(())
}
}
OperationMethod::Insert(insert) => {
if insert.is_empty() {
return Ok(())
}
}
OperationMethod::Retain(n) => {
if *n == 0 {
return Ok(())
}
}
}
self.operations.push(op.method.clone());
Ok(())
}
///
/// Insert string at `n` position with Insert Operation
/// Return AddOperationFailed if failed
///
pub fn insert(&mut self, st: &str) -> Result<Operation, DarkWikiError> {
let method = OperationMethod::Insert(st.into());
let op = Operation { id: self.id.clone(), method };
self.add_op(&op)?;
Ok(op)
}
///
/// Move the position of cursor
/// Return AddOperationFailed if failed
///
pub fn retain(&mut self, n: u64) -> Result<Operation, DarkWikiError> {
let method = OperationMethod::Retain(n);
let op = Operation { id: self.id.clone(), method };
self.add_op(&op)?;
Ok(op)
}
///
/// Delete string at `n` position with Delete Operation
/// Return AddOperationFailed if failed
///
pub fn delete(&mut self, n: u64) -> Result<Operation, DarkWikiError> {
let method = OperationMethod::Delete(n);
let op = Operation { id: self.id.clone(), method };
self.add_op(&op)?;
Ok(op)
}
}
impl Encodable for OperationMethod {
fn encode<S: io::Write>(&self, mut s: S) -> darkfi::Result<usize> {
let len: usize = match self {
Self::Delete(i) => (0 as u8).encode(&mut s)? + i.encode(&mut s)?,
Self::Insert(t) => (1 as u8).encode(&mut s)? + t.encode(&mut s)?,
Self::Retain(i) => (2 as u8).encode(&mut s)? + i.encode(&mut s)?,
};
Ok(len)
}
}
impl Decodable for OperationMethod {
fn decode<D: io::Read>(mut d: D) -> darkfi::Result<Self> {
let com: u8 = Decodable::decode(&mut d)?;
match com {
0 => {
let i: u64 = Decodable::decode(&mut d)?;
Ok(Self::Delete(i))
}
1 => {
let t: String = Decodable::decode(d)?;
Ok(Self::Insert(t))
}
2 => {
let i: u64 = Decodable::decode(&mut d)?;
Ok(Self::Retain(i))
}
_ => Err(darkfi::Error::ParseFailed("Parse OperationMethod failed")),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use darkfi::util::{
gen_id,
serial::{deserialize, serialize},
};
#[test]
fn test_seq() {
//
// English
//
let _t = "this is the first paragraph";
let mut seq = Sequence::new(&gen_id(30));
//
// Korean
//
let _t = "안녕하십니까";
let mut seq = Sequence::new(&gen_id(30));
//
// Arabic
//
let _t = "عربي";
let mut seq = Sequence::new(&gen_id(30));
}
#[test]
fn test_serialize() {
let op_method = OperationMethod::Delete(3);
let op_method_ser = serialize(&op_method);
let op_method_deser = deserialize(&op_method_ser).unwrap();
assert_eq!(op_method, op_method_deser);
}
}

View File

@@ -0,0 +1,38 @@
use serde_json::json;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use structopt::StructOpt;
use url::Url;
use darkfi::{
rpc::{client::RpcClient, jsonrpc::JsonRequest},
util::cli::{get_log_config, get_log_level},
Result,
};
#[derive(Clone, Debug, StructOpt)]
#[structopt(name = "darkwikiupdate")]
struct Args {
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
verbose: u8,
#[structopt(short, long, default_value = "tcp://127.0.0.1:13055")]
/// darkfid JSON-RPC endpoint
endpoint: Url,
}
#[async_std::main]
async fn main() -> Result<()> {
let args = Args::from_args();
let log_level = get_log_level(args.verbose.into());
let log_config = get_log_config();
TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?;
let rpc_client = RpcClient::new(args.endpoint).await?;
let req = JsonRequest::new("update", json!([]));
rpc_client.request(req).await?;
rpc_client.close().await
}