script/research/raft_p2p: remove borsh crate

This commit is contained in:
ghassmo
2022-04-07 10:57:49 +04:00
parent 10de8ad3e2
commit 0b75044e0a
4 changed files with 68 additions and 127 deletions

View File

@@ -2,17 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom",
"once_cell",
"version_check",
]
[[package]]
name = "anyhow"
version = "1.0.56"
@@ -309,51 +298,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "borsh"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15bf3650200d8bffa99015595e10f1fbd17de07abbc25bb067da79e769939bfa"
dependencies = [
"borsh-derive",
"hashbrown",
]
[[package]]
name = "borsh-derive"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6441c552f230375d18e3cc377677914d2ca2b0d36e52129fe15450a2dce46775"
dependencies = [
"borsh-derive-internal",
"borsh-schema-derive-internal",
"proc-macro-crate 0.1.5",
"proc-macro2",
"syn",
]
[[package]]
name = "borsh-derive-internal"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5449c28a7b352f2d1e592a8a28bf139bc71afb0764a14f3c02500935d8c44065"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "borsh-schema-derive-internal"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdbd5696d8bfa21d53d9fe39a714a18538bad11492a42d066dbbc395fb1951c0"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "bumpalo"
version = "3.9.1"
@@ -562,7 +506,7 @@ name = "darkfi-derive"
version = "0.3.0"
dependencies = [
"darkfi-derive-internal",
"proc-macro-crate 1.1.3",
"proc-macro-crate",
"proc-macro2",
"syn",
]
@@ -853,9 +797,6 @@ name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
]
[[package]]
name = "heck"
@@ -1200,15 +1141,6 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "proc-macro-crate"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d6ea3c4595b96363c13943497db34af4460fb474a95c43f4446ad341b8c9785"
dependencies = [
"toml",
]
[[package]]
name = "proc-macro-crate"
version = "1.1.3"
@@ -1270,7 +1202,6 @@ dependencies = [
"async-std",
"async-trait",
"blake3",
"borsh",
"clap",
"darkfi",
"easy-parallel",

View File

@@ -19,7 +19,6 @@ async-executor = "1.4.1"
async-trait = "0.1.53"
# Misc
borsh = "0.9.3"
log = "0.4.16"
rand = "0.8.5"

View File

@@ -1,8 +1,8 @@
use std::{io, net::SocketAddr};
use borsh::{BorshDeserialize, BorshSerialize};
use darkfi::util::serial::{serialize, Decodable, Encodable, SerialDecodable, SerialEncodable};
use darkfi::util::serial::{
serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt,
};
pub mod datastore;
pub mod p2p;
@@ -18,7 +18,7 @@ pub enum Role {
Leader,
}
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)]
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
pub struct VoteRequest {
node_id: NodeId,
current_term: u64,
@@ -26,24 +26,24 @@ pub struct VoteRequest {
last_term: u64,
}
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)]
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
pub struct VoteResponse {
node_id: NodeId,
current_term: u64,
ok: bool,
}
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)]
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
pub struct LogRequest {
leader_id: NodeId,
current_term: u64,
prefix_len: u64,
prefix_term: u64,
commit_length: u64,
suffix: VecR<Log>,
suffix: Logs,
}
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)]
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
pub struct LogResponse {
node_id: NodeId,
current_term: u64,
@@ -57,23 +57,13 @@ impl VoteResponse {
}
}
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, SerialDecodable, SerialEncodable)]
#[derive(Clone, Debug, SerialDecodable, SerialEncodable)]
pub struct Log {
term: u64,
msg: Vec<u8>,
}
#[derive(
BorshSerialize,
BorshDeserialize,
Clone,
Debug,
Eq,
PartialEq,
Hash,
SerialDecodable,
SerialEncodable,
)]
#[derive(Clone, Debug, Eq, PartialEq, Hash, SerialDecodable, SerialEncodable)]
pub struct NodeId(pub Vec<u8>);
impl From<SocketAddr> for NodeId {
@@ -84,14 +74,14 @@ impl From<SocketAddr> for NodeId {
}
}
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)]
pub struct VecR<T: BorshSerialize + BorshDeserialize>(pub Vec<T>);
#[derive(Clone, Debug)]
pub struct Logs(pub Vec<Log>);
impl<T: BorshSerialize + BorshDeserialize + Clone> VecR<T> {
impl Logs {
pub fn len(&self) -> u64 {
self.0.len() as u64
}
pub fn push(&mut self, d: &T) {
pub fn push(&mut self, d: &Log) {
self.0.push(d.clone());
}
@@ -103,18 +93,16 @@ impl<T: BorshSerialize + BorshDeserialize + Clone> VecR<T> {
Self(self.0[..end as usize].to_vec())
}
pub fn get(&self, index: u64) -> T {
pub fn get(&self, index: u64) -> Log {
self.0[index as usize].clone()
}
pub fn to_vec(&self) -> Vec<T> {
pub fn to_vec(&self) -> Vec<Log> {
self.0.clone()
}
}
#[derive(
BorshSerialize, BorshDeserialize, SerialDecodable, SerialEncodable, Clone, Debug, PartialEq, Eq,
)]
#[derive(SerialDecodable, SerialEncodable, Clone, Debug, PartialEq, Eq)]
pub struct NetMsg {
id: u64,
recipient_id: Option<NodeId>,
@@ -122,7 +110,7 @@ pub struct NetMsg {
payload: Vec<u8>,
}
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
#[repr(u8)]
pub enum NetMsgMethod {
LogResponse = 0,
@@ -155,10 +143,34 @@ impl Decodable for NetMsgMethod {
}
}
pub fn try_from_slice_unchecked<T: BorshDeserialize>(data: &[u8]) -> Result<T, io::Error> {
let mut data_mut = data;
let result = T::deserialize(&mut data_mut)?;
Ok(result)
impl Encodable for Logs {
fn encode<S: io::Write>(&self, s: S) -> darkfi::Result<usize> {
encode_vec(&self.0, s)
}
}
impl Decodable for Logs {
fn decode<D: io::Read>(d: D) -> darkfi::Result<Self> {
Ok(Self(decode_vec(d)?))
}
}
fn encode_vec<T: Encodable, S: io::Write>(vec: &[T], mut s: S) -> darkfi::Result<usize> {
let mut len = 0;
len += VarInt(vec.len() as u64).encode(&mut s)?;
for c in vec.iter() {
len += c.encode(&mut s)?;
}
Ok(len)
}
fn decode_vec<T: Decodable, D: io::Read>(mut d: D) -> darkfi::Result<Vec<T>> {
let len = VarInt::decode(&mut d)?.0;
let mut ret = Vec::with_capacity(len as usize);
for _ in 0..len {
ret.push(Decodable::decode(&mut d)?);
}
Ok(ret)
}
#[cfg(test)]

View File

@@ -5,7 +5,6 @@ use async_std::{
use std::{cmp::min, collections::HashMap, net::SocketAddr, path::PathBuf, time::Duration};
use async_executor::Executor;
use borsh::BorshSerialize;
use futures::{select, FutureExt};
use log::error;
use rand::Rng;
@@ -17,8 +16,8 @@ use darkfi::{
};
use crate::{
try_from_slice_unchecked, DataStore, Log, LogRequest, LogResponse, NetMsg, NetMsgMethod,
NodeId, ProtocolRaft, Role, VecR, VoteRequest, VoteResponse,
DataStore, Log, LogRequest, LogResponse, Logs, NetMsg, NetMsgMethod, NodeId, ProtocolRaft,
Role, VoteRequest, VoteResponse,
};
const HEARTBEATTIMEOUT: u64 = 100;
@@ -35,7 +34,7 @@ pub struct Raft<T> {
// these five vars should be on local storage
current_term: u64,
voted_for: Option<NodeId>,
logs: VecR<Log>,
logs: Logs,
commit_length: u64,
// the log will be added to this vector if it's committed by the majority of nodes
commits: Arc<Mutex<Vec<T>>>,
@@ -44,7 +43,7 @@ pub struct Raft<T> {
current_leader: Option<NodeId>,
votes_received: VecR<NodeId>,
votes_received: Vec<NodeId>,
sent_length: HashMap<NodeId, u64>,
acked_length: HashMap<NodeId, u64>,
@@ -71,7 +70,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let mut current_term = 0;
let mut voted_for = None;
let mut logs = VecR(vec![]);
let mut logs = Logs(vec![]);
let mut commit_length = 0;
let mut commits = Arc::new(Mutex::new(vec![]));
@@ -79,7 +78,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let datastore = DataStore::new(db_path_str)?;
current_term = datastore.current_term.get_last()?.unwrap_or(0);
voted_for = datastore.voted_for.get_last()?.flatten();
logs = VecR(datastore.logs.get_all()?);
logs = Logs(datastore.logs.get_all()?);
commit_length = datastore.commits_length.get_last()?.unwrap_or(0);
commits = Arc::new(Mutex::new(datastore.commits.get_all()?));
datastore
@@ -99,7 +98,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
commits,
role: Role::Follower,
current_leader: None,
votes_received: VecR(vec![]),
votes_received: vec![],
sent_length: HashMap::new(),
acked_length: HashMap::new(),
nodes: Arc::new(Mutex::new(HashMap::new())),
@@ -212,19 +211,19 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
async fn handle_method(&mut self, msg: NetMsg) -> Result<()> {
match msg.method {
NetMsgMethod::LogResponse => {
let lr: LogResponse = try_from_slice_unchecked(&msg.payload)?;
let lr: LogResponse = deserialize(&msg.payload)?;
self.receive_log_response(lr).await;
}
NetMsgMethod::LogRequest => {
let lr: LogRequest = try_from_slice_unchecked(&msg.payload)?;
let lr: LogRequest = deserialize(&msg.payload)?;
self.receive_log_request(lr).await;
}
NetMsgMethod::VoteResponse => {
let vr: VoteResponse = try_from_slice_unchecked(&msg.payload)?;
let vr: VoteResponse = deserialize(&msg.payload)?;
self.receive_vote_response(vr).await;
}
NetMsgMethod::VoteRequest => {
let vr: VoteRequest = try_from_slice_unchecked(&msg.payload)?;
let vr: VoteRequest = deserialize(&msg.payload)?;
self.receive_vote_request(vr).await;
}
}
@@ -249,7 +248,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.set_current_term(&(self.current_term + 1)).unwrap();
self.role = Role::Candidate;
self.set_voted_for(&Some(self.id.clone())).unwrap();
self.votes_received.push(&self.id);
self.votes_received.push(self.id.clone());
self.reset_last_term();
@@ -260,7 +259,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
last_term: self.last_term,
};
let payload = request.try_to_vec().unwrap();
let payload = serialize(&request);
self.send(None, &payload, NetMsgMethod::VoteRequest).await;
}
@@ -292,16 +291,16 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
response.set_ok(true);
}
let payload = response.try_to_vec().unwrap();
let payload = serialize(&response);
self.send(Some(vr.node_id), &payload, NetMsgMethod::VoteResponse).await;
}
async fn receive_vote_response(&mut self, vr: VoteResponse) {
if self.role == Role::Candidate && vr.current_term == self.current_term && vr.ok {
self.votes_received.push(&vr.node_id);
self.votes_received.push(vr.node_id);
let nodes = self.nodes.lock().await;
if self.votes_received.len() >= (((nodes.len() + 1) / 2) as u64) {
if self.votes_received.len() >= ((nodes.len() + 1) / 2) {
self.role = Role::Leader;
self.current_leader = Some(self.id.clone());
for node in nodes.iter() {
@@ -320,7 +319,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
async fn update_logs(&self, node_id: &NodeId) {
let prefix_len = self.sent_length[node_id];
let suffix: VecR<Log> = self.logs.slice_from(prefix_len);
let suffix: Logs = self.logs.slice_from(prefix_len);
let mut prefix_term = 0;
if prefix_len > 0 {
@@ -336,7 +335,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
suffix,
};
let payload = request.try_to_vec().unwrap();
let payload = serialize(&request);
self.send(Some(node_id.clone()), &payload, NetMsgMethod::LogRequest).await;
}
@@ -367,7 +366,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
};
let payload = response.try_to_vec().unwrap();
let payload = serialize(&response);
self.send(Some(lr.leader_id.clone()), &payload, NetMsgMethod::LogResponse).await;
}
@@ -430,7 +429,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
}
async fn append_log(&mut self, prefix_len: u64, leader_commit: u64, suffix: &VecR<Log>) {
async fn append_log(&mut self, prefix_len: u64, leader_commit: u64, suffix: &Logs) {
if suffix.len() > 0 && self.logs.len() > prefix_len {
let index = min(self.logs.len(), prefix_len + suffix.len()) - 1;
if self.logs.get(index).term != suffix.get(index - prefix_len).term {
@@ -473,7 +472,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.logs.push(i);
self.datastore.logs.insert(i)
}
fn push_logs(&mut self, i: &VecR<Log>) -> Result<()> {
fn push_logs(&mut self, i: &Logs) -> Result<()> {
self.logs = i.clone();
self.datastore.logs.wipe_insert_all(&i.to_vec())
}