script/research/crdt: implement base structs for p2p

This commit is contained in:
ghassmo
2022-03-23 10:18:34 +04:00
committed by parazyd
parent 53938b8e64
commit 0457fc65f8
8 changed files with 1441 additions and 127 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -5,8 +5,29 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies.darkfi]
path = "../../../"
features = ["net"]
[dependencies]
serde = {version = "1.0.136", features = ["derive"]}
# Async
smol = "1.2.5"
futures = "0.3.21"
async-std = "1.10.0"
async-trait = "0.1.52"
async-channel = "1.6.1"
async-executor = "1.4.1"
easy-parallel = "3.2.0"
# Crypto
rand = "0.8.5"
# Misc
clap = "3.1.6"
log = "0.4.14"
simplelog = "0.11.2"
[workspace]

View File

@@ -0,0 +1,30 @@
use std::cmp::Ordering;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd)]
pub struct Event<T: Clone> {
// the msg in the event
pub value: T,
// the counter for lamport clock
pub counter: u64,
// It might be necessary to attach the node's name to the timestamp
// so that it is possible to differentiate between events
pub name: String,
}
impl<T: Clone> Event<T> {
pub fn new(value: &T, counter: u64, name: String) -> Self {
Self { value: value.clone(), counter, name }
}
}
impl<T: Eq + PartialOrd + Clone> Ord for Event<T> {
fn cmp(&self, other: &Self) -> Ordering {
let ord = self.counter.cmp(&other.counter);
if ord == Ordering::Equal {
return self.name.cmp(&other.name)
}
ord
}
}

View File

@@ -1,67 +1,9 @@
use std::{
cmp::{max, Ordering},
collections::BTreeSet,
};
use std::collections::BTreeSet;
use serde::{Deserialize, Serialize};
// CRDT using gset and lamport clock
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
struct Node<T: Ord + Clone> {
// name to idnetifie the node
name: String,
// a grow-only set
gset: GSet<Event<T>>,
// a counter for the node
time: u64,
}
impl<T: Ord + Clone> Node<T> {
pub fn new(name: &str) -> Self {
Self { name: name.into(), gset: GSet::new(), time: 0 }
}
pub fn receive_event(&mut self, event: &Event<T>) {
self.time = max(self.time, event.counter) + 1;
self.gset.insert(event);
}
pub fn send_event(&mut self, value: &T) -> Event<T> {
self.time += 1;
let event = Event::new(value, self.time, self.name.clone());
self.gset.insert(&event);
event
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd)]
struct Event<T: Clone> {
// the msg in the event
value: T,
// the counter for lamport clock
counter: u64,
// It might be necessary to attach the node's name to the timestamp
// so that it is possible to differentiate between events
name: String,
}
impl<T: Clone> Event<T> {
pub fn new(value: &T, counter: u64, name: String) -> Self {
Self { value: value.clone(), counter, name }
}
}
impl<T: Eq + PartialOrd + Clone> Ord for Event<T> {
fn cmp(&self, other: &Self) -> Ordering {
let ord = self.counter.cmp(&other.counter);
if ord == Ordering::Equal {
return self.name.cmp(&other.name)
}
ord
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct GSet<T: Ord> {
set: BTreeSet<T>,
@@ -94,67 +36,3 @@ impl<T: Ord + Clone> Default for GSet<T> {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sync_simulation(
mut a: Node<String>,
mut b: Node<String>,
mut c: Node<String>,
) -> (Node<String>, Node<String>, Node<String>) {
a.gset.merge(&b.gset);
a.gset.merge(&c.gset);
b.gset.merge(&a.gset);
b.gset.merge(&c.gset);
c.gset.merge(&a.gset);
c.gset.merge(&b.gset);
(a, b, c)
}
#[test]
fn test_crdt_gset() {
let mut a: Node<String> = Node::new("Node A");
let mut b: Node<String> = Node::new("Node B");
let mut c: Node<String> = Node::new("Node C");
// node a
a.send_event(&"a_msg1".to_string());
a.send_event(&"a_msg2".to_string());
// node b
b.send_event(&"b_msg1".to_string());
// node c
c.send_event(&"c_msg1".to_string());
// node b
b.send_event(&"b_msg2".to_string());
let (a, mut b, mut c) = sync_simulation(a, b, c);
assert_eq!(a.gset.len(), 5);
assert_eq!(b.gset.len(), 5);
assert_eq!(c.gset.len(), 5);
// node c
c.send_event(&"c_msg2".to_string());
c.send_event(&"c_msg3".to_string());
c.send_event(&"c_msg4".to_string());
c.send_event(&"c_msg5".to_string());
// node b
b.send_event(&"b_msg3".to_string());
let (a, b, c) = sync_simulation(a, b, c);
assert_eq!(a.gset.len(), 10);
assert_eq!(b.gset.len(), 10);
assert_eq!(c.gset.len(), 10);
}
}

View File

@@ -1 +1,73 @@
pub mod event;
pub mod gset;
pub mod net;
pub mod node;
pub use event::Event;
pub use gset::GSet;
pub use net::CrdtP2p;
pub use node::Node;
#[cfg(test)]
mod tests {
use super::*;
fn sync_simulation(
mut a: Node<String>,
mut b: Node<String>,
mut c: Node<String>,
) -> (Node<String>, Node<String>, Node<String>) {
a.gset.merge(&b.gset);
a.gset.merge(&c.gset);
b.gset.merge(&a.gset);
b.gset.merge(&c.gset);
c.gset.merge(&a.gset);
c.gset.merge(&b.gset);
(a, b, c)
}
#[test]
fn test_crdt_gset() {
let mut a: Node<String> = Node::new("Node A");
let mut b: Node<String> = Node::new("Node B");
let mut c: Node<String> = Node::new("Node C");
// node a
a.send_event(&"a_msg1".to_string());
a.send_event(&"a_msg2".to_string());
// node b
b.send_event(&"b_msg1".to_string());
// node c
c.send_event(&"c_msg1".to_string());
// node b
b.send_event(&"b_msg2".to_string());
let (a, mut b, mut c) = sync_simulation(a, b, c);
assert_eq!(a.gset.len(), 5);
assert_eq!(b.gset.len(), 5);
assert_eq!(c.gset.len(), 5);
// node c
c.send_event(&"c_msg2".to_string());
c.send_event(&"c_msg3".to_string());
c.send_event(&"c_msg4".to_string());
c.send_event(&"c_msg5".to_string());
// node b
b.send_event(&"b_msg3".to_string());
let (a, b, c) = sync_simulation(a, b, c);
assert_eq!(a.gset.len(), 10);
assert_eq!(b.gset.len(), 10);
assert_eq!(c.gset.len(), 10);
}
}

View File

@@ -1,3 +1,42 @@
fn main() {
println!("Hello, world!");
use std::sync::Arc;
extern crate clap;
use async_executor::Executor;
use easy_parallel::Parallel;
use simplelog::{ColorChoice, Config, LevelFilter, TermLogger, TerminalMode};
use darkfi::Result;
use crdt::CrdtP2p;
fn main() -> Result<()> {
let ex = Arc::new(Executor::new());
let (signal, shutdown) = async_channel::unbounded::<()>();
let ex2 = ex.clone();
TermLogger::init(
LevelFilter::Debug,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
)?;
// let nthreads = num_cpus::get();
// debug!(target: "IRC DAEMON", "Run {} executor threads", nthreads);
let (sender, _) = async_channel::unbounded::<crdt::net::Event>();
let (_, result) = Parallel::new()
.each(0..4, |_| smol::future::block_on(ex.run(shutdown.recv())))
// Run the main future on the current thread.
.finish(|| {
smol::future::block_on(async move {
CrdtP2p::start(ex2.clone(), sender).await?;
drop(signal);
Ok::<(), darkfi::Error>(())
})
});
result
}

View File

@@ -0,0 +1,122 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
net,
util::serial::{Decodable, Encodable},
Result,
};
use log::debug;
use std::{io, sync::Arc};
pub struct CrdtP2p {}
impl CrdtP2p {
pub async fn start(
executor: Arc<Executor<'_>>,
notify_queue_sender: async_channel::Sender<Event>,
) -> Result<()> {
let p2p = net::P2p::new(net::Settings::default()).await;
let registry = p2p.protocol_registry();
registry
.register(!net::SESSION_SEED, move |channel, p2p| {
let sender = notify_queue_sender.clone();
async move { ProtocolCrdt::init(channel, sender, p2p).await }
})
.await;
//
// p2p network main instance
//
// Performs seed session
p2p.clone().start(executor.clone()).await?;
// Actual main p2p session
p2p.run(executor).await
}
}
#[derive(Debug, Clone)]
pub struct Event {}
impl net::Message for Event {
fn name() -> &'static str {
"event"
}
}
impl Encodable for Event {
fn encode<S: io::Write>(&self, mut _s: S) -> Result<usize> {
let len = 0;
Ok(len)
}
}
impl Decodable for Event {
fn decode<D: io::Read>(mut _d: D) -> Result<Self> {
Ok(Self {})
}
}
struct ProtocolCrdt {
jobsman: net::ProtocolJobsManagerPtr,
notify_queue_sender: async_channel::Sender<Event>,
event_sub: net::MessageSubscription<Event>,
p2p: net::P2pPtr,
}
impl ProtocolCrdt {
pub async fn init(
channel: net::ChannelPtr,
notify_queue_sender: async_channel::Sender<Event>,
p2p: net::P2pPtr,
) -> net::ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Event>().await;
let event_sub = channel.subscribe_msg::<Event>().await.expect("Missing Event dispatcher!");
Arc::new(Self {
notify_queue_sender,
event_sub,
jobsman: net::ProtocolJobsManager::new("ProtocolCrdt", channel),
p2p,
})
}
async fn handle_receive_event(self: Arc<Self>) -> Result<()> {
debug!(target: "crdt", "ProtocolCrdt::handle_receive_event() [START]");
loop {
let event = self.event_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolCrdt::handle_receive_event() received {:?}",
event
);
let event = (*event).clone();
self.p2p.broadcast(event.clone()).await?;
self.notify_queue_sender.send(event).await?;
}
}
}
#[async_trait]
impl net::ProtocolBase for ProtocolCrdt {
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "crdt", "ProtocolCrdt::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_event(), executor.clone()).await;
debug!(target: "crdt", "ProtocolCrdt::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolCrdt"
}
}

View File

@@ -0,0 +1,33 @@
use std::cmp::max;
use serde::{Deserialize, Serialize};
use crate::{Event, GSet};
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Node<T: Ord + Clone> {
// name to idnetifie the node
name: String,
// a grow-only set
pub(crate) gset: GSet<Event<T>>,
// a counter for the node
time: u64,
}
impl<T: Ord + Clone> Node<T> {
pub fn new(name: &str) -> Self {
Self { name: name.into(), gset: GSet::new(), time: 0 }
}
pub fn receive_event(&mut self, event: &Event<T>) {
self.time = max(self.time, event.counter) + 1;
self.gset.insert(event);
}
pub fn send_event(&mut self, value: &T) -> Event<T> {
self.time += 1;
let event = Event::new(value, self.time, self.name.clone());
self.gset.insert(&event);
event
}
}