diff --git a/bin/taud/src/crdt/event.rs b/bin/taud/src/crdt/event.rs index 98fc406a9..ad2eb03b7 100644 --- a/bin/taud/src/crdt/event.rs +++ b/bin/taud/src/crdt/event.rs @@ -2,11 +2,20 @@ use std::{cmp::Ordering, io}; use darkfi::{ net, - util::serial::{deserialize, serialize, Decodable, Encodable}, + util::serial::{ + deserialize, serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, + }, Result, }; +#[repr(u8)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd)] +pub enum EventCommand { + Sync = 0, + Update = 1, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, SerialEncodable, SerialDecodable)] pub struct Event { // the msg in the event pub value: Vec, @@ -15,31 +24,41 @@ pub struct Event { // It might be necessary to attach the node's name to the timestamp // so that it is possible to differentiate between events pub name: String, + pub command: EventCommand, } -impl Encodable for Event { - fn encode(&self, mut s: S) -> Result { +impl Encodable for EventCommand { + fn encode(&self, s: S) -> darkfi::Result { let mut len = 0; - len += self.value.encode(&mut s)?; - len += self.counter.encode(&mut s)?; - len += self.name.encode(&mut s)?; + match self { + Self::Sync => { + len += (0 as u8).encode(s)?; + } + Self::Update => { + len += (1 as u8).encode(s)?; + } + } Ok(len) } } -impl Decodable for Event { - fn decode(mut d: D) -> Result { - Ok(Self { - value: Decodable::decode(&mut d)?, - counter: Decodable::decode(&mut d)?, - name: Decodable::decode(&mut d)?, +impl Decodable for EventCommand { + fn decode(d: D) -> darkfi::Result { + let com: u8 = Decodable::decode(d)?; + Ok(match com { + 0 => Self::Sync, + _ => Self::Update, }) } } impl Event { - pub fn new(value: T, counter: u64, name: String) -> Self { - Self { value: serialize(&value), counter, name } + pub fn new_update_event(value: T, counter: u64, name: String) -> Self { + Self { value: serialize(&value), counter, name, command: EventCommand::Update } + } + + pub fn new_sync_event(name: String) -> Self { + Self { value: vec![], counter: 0, name, command: EventCommand::Sync } } pub fn get_value(&self) -> Result { diff --git a/bin/taud/src/crdt/gset.rs b/bin/taud/src/crdt/gset.rs index 2185ba9ee..159669a97 100644 --- a/bin/taud/src/crdt/gset.rs +++ b/bin/taud/src/crdt/gset.rs @@ -29,6 +29,10 @@ impl GSet { debug!(target: "crdt", "GSet merge a set of len: {:?}", other.len()); other.set.iter().for_each(|e| self.insert(e)) } + + pub fn get_set(&self) -> BTreeSet { + self.set.clone() + } } impl Default for GSet { diff --git a/bin/taud/src/crdt/mod.rs b/bin/taud/src/crdt/mod.rs index a45113867..7ae4b2a35 100644 --- a/bin/taud/src/crdt/mod.rs +++ b/bin/taud/src/crdt/mod.rs @@ -3,7 +3,7 @@ pub mod gset; pub mod net; pub mod node; -pub use event::Event; +pub use event::{Event, EventCommand}; pub use gset::GSet; pub use net::ProtocolCrdt; pub use node::Node; diff --git a/bin/taud/src/crdt/net.rs b/bin/taud/src/crdt/net.rs index a597578be..5219f81c4 100644 --- a/bin/taud/src/crdt/net.rs +++ b/bin/taud/src/crdt/net.rs @@ -6,7 +6,7 @@ use log::debug; use darkfi::{net, Result}; -use super::{Event, GSet}; +use super::{Event, EventCommand, GSet}; pub struct ProtocolCrdt { jobsman: net::ProtocolJobsManagerPtr, @@ -48,6 +48,14 @@ impl ProtocolCrdt { event ); + if event.command == EventCommand::Sync { + let gset = self.gset.lock().await; + for e in gset.get_set() { + self.p2p.broadcast(e.clone()).await?; + } + continue + } + if self.gset.lock().await.contains(&event) { continue } diff --git a/bin/taud/src/crdt/node.rs b/bin/taud/src/crdt/node.rs index 6d42596c6..caa5d9524 100644 --- a/bin/taud/src/crdt/node.rs +++ b/bin/taud/src/crdt/node.rs @@ -16,10 +16,15 @@ pub struct Node { // a counter for the node time: Mutex, p2p: net::P2pPtr, + notifier: async_channel::Sender>, } impl Node { - pub async fn new(name: &str, net_settings: net::Settings) -> Arc { + pub async fn new( + name: &str, + net_settings: net::Settings, + notifier: async_channel::Sender>, + ) -> Arc { debug!(target: "crdt", "Node::new() [BEGIN]"); let p2p = net::P2p::new(net_settings).await; Arc::new(Self { @@ -27,6 +32,7 @@ impl Node { gset: Arc::new(Mutex::new(GSet::new())), time: Mutex::new(0), p2p, + notifier, }) } @@ -55,10 +61,12 @@ impl Node { p2p.clone().start(executor.clone()).await?; // Actual main p2p session - let recv_task = executor.spawn(async move { + let recv_task: smol::Task> = executor.spawn(async move { loop { if let Ok(event) = rcv.recv().await { self.clone().receive_event(&event).await; + let payload = event.get_value()?; + self.notifier.send(payload).await?; } } }); @@ -92,7 +100,7 @@ impl Node { event_time = *time; } - let event = Event::new(value, event_time, self.name.clone()); + let event = Event::new_update_event(value, event_time, self.name.clone()); debug!(target: "crdt", "Node create new event: {:?}", event); { @@ -104,4 +112,14 @@ impl Node { Ok(()) } + + pub async fn sync(self: Arc) -> Result<()> { + debug!(target: "crdt", "Node create new sync event"); + let event = Event::new_sync_event(self.name.clone()); + + debug!(target: "crdt", "Node broadcast the sync event"); + self.p2p.broadcast(event).await?; + + Ok(()) + } } diff --git a/bin/taud/src/main.rs b/bin/taud/src/main.rs index 7481cf6aa..4d768127f 100644 --- a/bin/taud/src/main.rs +++ b/bin/taud/src/main.rs @@ -48,7 +48,9 @@ async fn start(config: TauConfig, executor: Arc>) -> Result<()> { let p2p_settings = P2pSettings::default(); - let node = Node::new("node", p2p_settings).await; + let (node_snd, node_rcv) = async_channel::unbounded::>(); + + let node = Node::new("node", p2p_settings, node_snd).await; let ex2 = executor.clone(); let node2 = node.clone(); @@ -77,10 +79,18 @@ async fn start(config: TauConfig, executor: Arc>) -> Result<()> { } }); + let recv_update_from_node: smol::Task> = executor.spawn(async move { + loop { + let payload = node_rcv.recv().await?; + // XXX + } + }); + listen_and_serve(server_config, rpc_interface, executor).await?; crdt_task.cancel().await; recv_update_from_rpc.cancel().await; + recv_update_from_node.cancel().await; Ok(()) }