bin/taud: impl sync event for crdt

This commit is contained in:
ghassmo
2022-03-31 09:50:04 +04:00
parent 65e827df1e
commit 2dfafeef1d
6 changed files with 79 additions and 20 deletions

View File

@@ -2,11 +2,20 @@ use std::{cmp::Ordering, io};
use darkfi::{
net,
util::serial::{deserialize, serialize, Decodable, Encodable},
util::serial::{
deserialize, serialize, Decodable, Encodable, SerialDecodable, SerialEncodable,
},
Result,
};
#[repr(u8)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd)]
pub enum EventCommand {
Sync = 0,
Update = 1,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, SerialEncodable, SerialDecodable)]
pub struct Event {
// the msg in the event
pub value: Vec<u8>,
@@ -15,31 +24,41 @@ pub struct Event {
// It might be necessary to attach the node's name to the timestamp
// so that it is possible to differentiate between events
pub name: String,
pub command: EventCommand,
}
impl Encodable for Event {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
impl Encodable for EventCommand {
fn encode<S: io::Write>(&self, s: S) -> darkfi::Result<usize> {
let mut len = 0;
len += self.value.encode(&mut s)?;
len += self.counter.encode(&mut s)?;
len += self.name.encode(&mut s)?;
match self {
Self::Sync => {
len += (0 as u8).encode(s)?;
}
Self::Update => {
len += (1 as u8).encode(s)?;
}
}
Ok(len)
}
}
impl Decodable for Event {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
Ok(Self {
value: Decodable::decode(&mut d)?,
counter: Decodable::decode(&mut d)?,
name: Decodable::decode(&mut d)?,
impl Decodable for EventCommand {
fn decode<D: io::Read>(d: D) -> darkfi::Result<Self> {
let com: u8 = Decodable::decode(d)?;
Ok(match com {
0 => Self::Sync,
_ => Self::Update,
})
}
}
impl Event {
pub fn new<T: Encodable>(value: T, counter: u64, name: String) -> Self {
Self { value: serialize(&value), counter, name }
pub fn new_update_event<T: Encodable>(value: T, counter: u64, name: String) -> Self {
Self { value: serialize(&value), counter, name, command: EventCommand::Update }
}
pub fn new_sync_event(name: String) -> Self {
Self { value: vec![], counter: 0, name, command: EventCommand::Sync }
}
pub fn get_value<T: Decodable>(&self) -> Result<T> {

View File

@@ -29,6 +29,10 @@ impl<T: Ord + Clone + Debug> GSet<T> {
debug!(target: "crdt", "GSet merge a set of len: {:?}", other.len());
other.set.iter().for_each(|e| self.insert(e))
}
pub fn get_set(&self) -> BTreeSet<T> {
self.set.clone()
}
}
impl<T: Ord + Clone + Debug> Default for GSet<T> {

View File

@@ -3,7 +3,7 @@ pub mod gset;
pub mod net;
pub mod node;
pub use event::Event;
pub use event::{Event, EventCommand};
pub use gset::GSet;
pub use net::ProtocolCrdt;
pub use node::Node;

View File

@@ -6,7 +6,7 @@ use log::debug;
use darkfi::{net, Result};
use super::{Event, GSet};
use super::{Event, EventCommand, GSet};
pub struct ProtocolCrdt {
jobsman: net::ProtocolJobsManagerPtr,
@@ -48,6 +48,14 @@ impl ProtocolCrdt {
event
);
if event.command == EventCommand::Sync {
let gset = self.gset.lock().await;
for e in gset.get_set() {
self.p2p.broadcast(e.clone()).await?;
}
continue
}
if self.gset.lock().await.contains(&event) {
continue
}

View File

@@ -16,10 +16,15 @@ pub struct Node {
// a counter for the node
time: Mutex<u64>,
p2p: net::P2pPtr,
notifier: async_channel::Sender<Vec<u8>>,
}
impl Node {
pub async fn new(name: &str, net_settings: net::Settings) -> Arc<Self> {
pub async fn new(
name: &str,
net_settings: net::Settings,
notifier: async_channel::Sender<Vec<u8>>,
) -> Arc<Self> {
debug!(target: "crdt", "Node::new() [BEGIN]");
let p2p = net::P2p::new(net_settings).await;
Arc::new(Self {
@@ -27,6 +32,7 @@ impl Node {
gset: Arc::new(Mutex::new(GSet::new())),
time: Mutex::new(0),
p2p,
notifier,
})
}
@@ -55,10 +61,12 @@ impl Node {
p2p.clone().start(executor.clone()).await?;
// Actual main p2p session
let recv_task = executor.spawn(async move {
let recv_task: smol::Task<Result<()>> = executor.spawn(async move {
loop {
if let Ok(event) = rcv.recv().await {
self.clone().receive_event(&event).await;
let payload = event.get_value()?;
self.notifier.send(payload).await?;
}
}
});
@@ -92,7 +100,7 @@ impl Node {
event_time = *time;
}
let event = Event::new(value, event_time, self.name.clone());
let event = Event::new_update_event(value, event_time, self.name.clone());
debug!(target: "crdt", "Node create new event: {:?}", event);
{
@@ -104,4 +112,14 @@ impl Node {
Ok(())
}
pub async fn sync(self: Arc<Self>) -> Result<()> {
debug!(target: "crdt", "Node create new sync event");
let event = Event::new_sync_event(self.name.clone());
debug!(target: "crdt", "Node broadcast the sync event");
self.p2p.broadcast(event).await?;
Ok(())
}
}

View File

@@ -48,7 +48,9 @@ async fn start(config: TauConfig, executor: Arc<Executor<'_>>) -> Result<()> {
let p2p_settings = P2pSettings::default();
let node = Node::new("node", p2p_settings).await;
let (node_snd, node_rcv) = async_channel::unbounded::<Vec<u8>>();
let node = Node::new("node", p2p_settings, node_snd).await;
let ex2 = executor.clone();
let node2 = node.clone();
@@ -77,10 +79,18 @@ async fn start(config: TauConfig, executor: Arc<Executor<'_>>) -> Result<()> {
}
});
let recv_update_from_node: smol::Task<Result<()>> = executor.spawn(async move {
loop {
let payload = node_rcv.recv().await?;
// XXX
}
});
listen_and_serve(server_config, rpc_interface, executor).await?;
crdt_task.cancel().await;
recv_update_from_rpc.cancel().await;
recv_update_from_node.cancel().await;
Ok(())
}