mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-09 14:48:08 -05:00
bin/taud: remove obsolete crdt and add raft
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -6091,7 +6091,6 @@ dependencies = [
|
||||
"serde_json",
|
||||
"simplelog",
|
||||
"smol",
|
||||
"sqlx",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ edition = "2021"
|
||||
|
||||
[dependencies.darkfi]
|
||||
path = "../../"
|
||||
features = ["rpc", "net"]
|
||||
features = ["rpc", "net", "raft"]
|
||||
|
||||
[dependencies]
|
||||
# Async
|
||||
@@ -30,6 +30,4 @@ thiserror = "1.0.24"
|
||||
serde_json = "1.0.79"
|
||||
serde = {version = "1.0.136", features = ["derive"]}
|
||||
|
||||
# Database
|
||||
sqlx = { version = "0.5.11", features = [ "runtime-async-std-native-tls" ] }
|
||||
|
||||
|
||||
1
bin/taud/src/crdt/.gitignore
vendored
1
bin/taud/src/crdt/.gitignore
vendored
@@ -1 +0,0 @@
|
||||
target/
|
||||
@@ -1,83 +0,0 @@
|
||||
use std::{cmp::Ordering, io};
|
||||
|
||||
use darkfi::{
|
||||
net,
|
||||
util::serial::{
|
||||
deserialize, serialize, Decodable, Encodable, SerialDecodable, SerialEncodable,
|
||||
},
|
||||
Result,
|
||||
};
|
||||
|
||||
#[repr(u8)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd)]
|
||||
pub enum EventCommand {
|
||||
Sync = 0,
|
||||
Update = 1,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, SerialEncodable, SerialDecodable)]
|
||||
pub struct Event {
|
||||
// the msg in the event
|
||||
pub value: Vec<u8>,
|
||||
// the counter for lamport clock
|
||||
pub counter: u64,
|
||||
// It might be necessary to attach the node's name to the timestamp
|
||||
// so that it is possible to differentiate between events
|
||||
pub name: String,
|
||||
pub command: EventCommand,
|
||||
}
|
||||
|
||||
impl Encodable for EventCommand {
|
||||
fn encode<S: io::Write>(&self, s: S) -> darkfi::Result<usize> {
|
||||
let mut len = 0;
|
||||
match self {
|
||||
Self::Sync => {
|
||||
len += (0 as u8).encode(s)?;
|
||||
}
|
||||
Self::Update => {
|
||||
len += (1 as u8).encode(s)?;
|
||||
}
|
||||
}
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for EventCommand {
|
||||
fn decode<D: io::Read>(d: D) -> darkfi::Result<Self> {
|
||||
let com: u8 = Decodable::decode(d)?;
|
||||
Ok(match com {
|
||||
0 => Self::Sync,
|
||||
_ => Self::Update,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub fn new_update_event<T: Encodable>(value: T, counter: u64, name: String) -> Self {
|
||||
Self { value: serialize(&value), counter, name, command: EventCommand::Update }
|
||||
}
|
||||
|
||||
pub fn new_sync_event(name: String) -> Self {
|
||||
Self { value: vec![], counter: 0, name, command: EventCommand::Sync }
|
||||
}
|
||||
|
||||
pub fn get_value<T: Decodable>(&self) -> Result<T> {
|
||||
deserialize(&self.value)
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for Event {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
let ord = self.counter.cmp(&other.counter);
|
||||
if ord == Ordering::Equal {
|
||||
return self.name.cmp(&other.name)
|
||||
}
|
||||
ord
|
||||
}
|
||||
}
|
||||
|
||||
impl net::Message for Event {
|
||||
fn name() -> &'static str {
|
||||
"event"
|
||||
}
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
use std::{collections::BTreeSet, fmt::Debug};
|
||||
|
||||
use log::debug;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct GSet<T: Ord> {
|
||||
set: BTreeSet<T>,
|
||||
}
|
||||
|
||||
impl<T: Ord + Clone + Debug> GSet<T> {
|
||||
pub fn new() -> Self {
|
||||
Self { set: BTreeSet::new() }
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, element: &T) {
|
||||
debug!(target: "crdt", "GSet insert an element: {:?}", element);
|
||||
self.set.insert(element.clone());
|
||||
}
|
||||
|
||||
pub fn contains(&self, element: &T) -> bool {
|
||||
self.set.contains(element)
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.set.len()
|
||||
}
|
||||
|
||||
pub fn merge(&mut self, other: &Self) {
|
||||
debug!(target: "crdt", "GSet merge a set of len: {:?}", other.len());
|
||||
other.set.iter().for_each(|e| self.insert(e))
|
||||
}
|
||||
|
||||
pub fn get_set(&self) -> BTreeSet<T> {
|
||||
self.set.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Ord + Clone + Debug> Default for GSet<T> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
pub mod event;
|
||||
pub mod gset;
|
||||
pub mod net;
|
||||
pub mod node;
|
||||
|
||||
pub use event::{Event, EventCommand};
|
||||
pub use gset::GSet;
|
||||
pub use net::ProtocolCrdt;
|
||||
pub use node::Node;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {}
|
||||
@@ -1,87 +0,0 @@
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
|
||||
use async_executor::Executor;
|
||||
use async_trait::async_trait;
|
||||
use log::debug;
|
||||
|
||||
use darkfi::{net, Result};
|
||||
|
||||
use super::{Event, EventCommand, GSet};
|
||||
|
||||
pub struct ProtocolCrdt {
|
||||
jobsman: net::ProtocolJobsManagerPtr,
|
||||
notify_queue_sender: async_channel::Sender<Event>,
|
||||
event_sub: net::MessageSubscription<Event>,
|
||||
p2p: net::P2pPtr,
|
||||
gset: Arc<Mutex<GSet<Event>>>,
|
||||
}
|
||||
|
||||
impl ProtocolCrdt {
|
||||
pub async fn init(
|
||||
channel: net::ChannelPtr,
|
||||
notify_queue_sender: async_channel::Sender<Event>,
|
||||
p2p: net::P2pPtr,
|
||||
gset: Arc<Mutex<GSet<Event>>>,
|
||||
) -> net::ProtocolBasePtr {
|
||||
let message_subsytem = channel.get_message_subsystem();
|
||||
message_subsytem.add_dispatch::<Event>().await;
|
||||
|
||||
let event_sub = channel.subscribe_msg::<Event>().await.expect("Missing Event dispatcher!");
|
||||
|
||||
Arc::new(Self {
|
||||
notify_queue_sender,
|
||||
event_sub,
|
||||
jobsman: net::ProtocolJobsManager::new("ProtocolCrdt", channel),
|
||||
p2p,
|
||||
gset,
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_receive_event(self: Arc<Self>) -> Result<()> {
|
||||
debug!(target: "crdt", "ProtocolCrdt::handle_receive_event() [START]");
|
||||
loop {
|
||||
let event = self.event_sub.receive().await?;
|
||||
|
||||
debug!(
|
||||
target: "crdt",
|
||||
"ProtocolCrdt::handle_receive_event() received {:?}",
|
||||
event
|
||||
);
|
||||
|
||||
if event.command == EventCommand::Sync {
|
||||
let gset = self.gset.lock().await;
|
||||
for e in gset.get_set() {
|
||||
self.p2p.broadcast(e.clone()).await?;
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if self.gset.lock().await.contains(&event) {
|
||||
continue
|
||||
}
|
||||
|
||||
let event = (*event).clone();
|
||||
self.p2p.broadcast(event.clone()).await?;
|
||||
|
||||
self.notify_queue_sender.send(event).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl net::ProtocolBase for ProtocolCrdt {
|
||||
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
|
||||
/// protocol task manager, then queues the reply. Sends out a ping and
|
||||
/// waits for pong reply. Waits for ping and replies with a pong.
|
||||
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
debug!(target: "crdt", "ProtocolCrdt::start() [START]");
|
||||
self.jobsman.clone().start(executor.clone());
|
||||
self.jobsman.clone().spawn(self.clone().handle_receive_event(), executor.clone()).await;
|
||||
debug!(target: "crdt", "ProtocolCrdt::start() [END]");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
"ProtocolCrdt"
|
||||
}
|
||||
}
|
||||
@@ -1,125 +0,0 @@
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
use std::{cmp::max, fmt::Debug};
|
||||
|
||||
use async_executor::Executor;
|
||||
use log::debug;
|
||||
|
||||
use darkfi::{net, util::serial::Encodable, Result};
|
||||
|
||||
use super::{Event, GSet, ProtocolCrdt};
|
||||
|
||||
pub struct Node {
|
||||
// name to idnetifie the node
|
||||
name: String,
|
||||
// a grow-only set
|
||||
gset: Arc<Mutex<GSet<Event>>>,
|
||||
// a counter for the node
|
||||
time: Mutex<u64>,
|
||||
p2p: net::P2pPtr,
|
||||
notifier: async_channel::Sender<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
pub async fn new(
|
||||
name: &str,
|
||||
net_settings: net::Settings,
|
||||
notifier: async_channel::Sender<Vec<u8>>,
|
||||
) -> Arc<Self> {
|
||||
debug!(target: "crdt", "Node::new() [BEGIN]");
|
||||
let p2p = net::P2p::new(net_settings).await;
|
||||
Arc::new(Self {
|
||||
name: name.into(),
|
||||
gset: Arc::new(Mutex::new(GSet::new())),
|
||||
time: Mutex::new(0),
|
||||
p2p,
|
||||
notifier,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
debug!(target: "crdt", "Node::start() [BEGIN]");
|
||||
let (snd, rcv) = async_channel::unbounded::<Event>();
|
||||
|
||||
let p2p = self.p2p.clone();
|
||||
|
||||
let registry = p2p.protocol_registry();
|
||||
|
||||
let gset = self.gset.clone();
|
||||
|
||||
registry
|
||||
.register(!net::SESSION_SEED, move |channel, p2p| {
|
||||
let sender = snd.clone();
|
||||
let gset = gset.clone();
|
||||
async move { ProtocolCrdt::init(channel, sender, p2p, gset).await }
|
||||
})
|
||||
.await;
|
||||
|
||||
//
|
||||
// p2p network main instance
|
||||
//
|
||||
// Performs seed session
|
||||
p2p.clone().start(executor.clone()).await?;
|
||||
// Actual main p2p session
|
||||
|
||||
let recv_task: smol::Task<Result<()>> = executor.spawn(async move {
|
||||
loop {
|
||||
if let Ok(event) = rcv.recv().await {
|
||||
self.clone().receive_event(&event).await;
|
||||
let payload = event.get_value()?;
|
||||
self.notifier.send(payload).await?;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
p2p.clone().run(executor.clone()).await?;
|
||||
|
||||
recv_task.cancel().await;
|
||||
|
||||
debug!(target: "crdt", "Node::start() [END]");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn receive_event(self: Arc<Self>, event: &Event) {
|
||||
debug!(target: "crdt", "Node receive an event: {:?}", event);
|
||||
|
||||
let mut time = self.time.lock().await;
|
||||
*time = max(*time, event.counter) + 1;
|
||||
|
||||
self.gset.lock().await.insert(event);
|
||||
}
|
||||
|
||||
pub async fn send_event<T: Encodable + Debug>(self: Arc<Self>, value: T) -> Result<()> {
|
||||
debug!(target: "crdt", "Node send an event: {:?}", value);
|
||||
|
||||
let event_time: u64;
|
||||
|
||||
{
|
||||
let mut time = self.time.lock().await;
|
||||
*time += 1;
|
||||
event_time = *time;
|
||||
}
|
||||
|
||||
let event = Event::new_update_event(value, event_time, self.name.clone());
|
||||
debug!(target: "crdt", "Node create new event: {:?}", event);
|
||||
|
||||
{
|
||||
self.gset.lock().await.insert(&event);
|
||||
}
|
||||
|
||||
debug!(target: "crdt", "Node broadcast the event: {:?}", event);
|
||||
self.p2p.broadcast(event).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn sync(self: Arc<Self>) -> Result<()> {
|
||||
debug!(target: "crdt", "Node create new sync event");
|
||||
let event = Event::new_sync_event(self.name.clone());
|
||||
|
||||
debug!(target: "crdt", "Node broadcast the sync event");
|
||||
self.p2p.broadcast(event).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -15,7 +15,6 @@ use darkfi::{
|
||||
Error, Result,
|
||||
};
|
||||
|
||||
mod crdt;
|
||||
mod error;
|
||||
mod jsonrpc;
|
||||
mod month_tasks;
|
||||
@@ -23,7 +22,6 @@ mod task_info;
|
||||
mod util;
|
||||
|
||||
use crate::{
|
||||
crdt::Node,
|
||||
jsonrpc::JsonRpcInterface,
|
||||
task_info::TaskInfo,
|
||||
util::{CliTaud, Settings, TauConfig, CONFIG_FILE_CONTENTS},
|
||||
@@ -42,19 +40,7 @@ async fn start(config: TauConfig, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
|
||||
let settings = Settings { dataset_path };
|
||||
|
||||
//
|
||||
// Crdt
|
||||
//
|
||||
|
||||
let p2p_settings = P2pSettings::default();
|
||||
|
||||
let (node_snd, node_rcv) = async_channel::unbounded::<Vec<u8>>();
|
||||
|
||||
let node = Node::new("node", p2p_settings, node_snd).await;
|
||||
|
||||
let ex2 = executor.clone();
|
||||
let node2 = node.clone();
|
||||
let crdt_task = executor.spawn(node2.start(ex2.clone()));
|
||||
let _p2p_settings = P2pSettings::default();
|
||||
|
||||
//
|
||||
// RPC
|
||||
@@ -71,26 +57,16 @@ async fn start(config: TauConfig, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
|
||||
let rpc_interface = Arc::new(JsonRpcInterface::new(snd, settings));
|
||||
|
||||
let node2 = node.clone();
|
||||
let recv_update_from_rpc: smol::Task<Result<()>> = executor.spawn(async move {
|
||||
loop {
|
||||
let task_info = rcv.recv().await?;
|
||||
node2.clone().send_event(task_info).await?;
|
||||
}
|
||||
});
|
||||
|
||||
let recv_update_from_node: smol::Task<Result<()>> = executor.spawn(async move {
|
||||
loop {
|
||||
let payload = node_rcv.recv().await?;
|
||||
let _task_info = rcv.recv().await?;
|
||||
// XXX
|
||||
}
|
||||
});
|
||||
|
||||
listen_and_serve(server_config, rpc_interface, executor).await?;
|
||||
|
||||
crdt_task.cancel().await;
|
||||
recv_update_from_rpc.cancel().await;
|
||||
recv_update_from_node.cancel().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user