diff --git a/script/research/validatord/Cargo.toml b/script/research/validatord/Cargo.toml index a5bdee6bf..ab2ea4e3b 100644 --- a/script/research/validatord/Cargo.toml +++ b/script/research/validatord/Cargo.toml @@ -17,6 +17,10 @@ async-channel = "1.6.1" async-executor = "1.4.1" easy-parallel = "3.2.0" +# Crypto +rand = "0.8.5" +blake3 = "1.3.1" + # Storage sled = "0.34.7" @@ -29,6 +33,7 @@ structopt-toml = "0.5.0" toml = "0.5.9" # Misc +chrono = "0.4.19" log = "0.4.16" num_cpus = "1.13.1" simplelog = "0.12.0" diff --git a/src/consensus/block.rs b/script/research/validatord/src/consensus/block.rs similarity index 99% rename from src/consensus/block.rs rename to script/research/validatord/src/consensus/block.rs index f10a20f13..abf22c2ea 100644 --- a/src/consensus/block.rs +++ b/script/research/validatord/src/consensus/block.rs @@ -1,6 +1,6 @@ use std::io; -use crate::{ +use darkfi::{ crypto::{keypair::PublicKey, schnorr::Signature}, impl_vec, net, util::serial::{ diff --git a/src/consensus/blockchain.rs b/script/research/validatord/src/consensus/blockchain.rs similarity index 99% rename from src/consensus/blockchain.rs rename to script/research/validatord/src/consensus/blockchain.rs index 06c1a9a1e..3afdb8408 100644 --- a/src/consensus/blockchain.rs +++ b/script/research/validatord/src/consensus/blockchain.rs @@ -2,7 +2,7 @@ use std::io; use log::debug; -use crate::{ +use darkfi::{ impl_vec, util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, Result, diff --git a/src/consensus/metadata.rs b/script/research/validatord/src/consensus/metadata.rs similarity index 99% rename from src/consensus/metadata.rs rename to script/research/validatord/src/consensus/metadata.rs index 99064c79a..a179d4873 100644 --- a/src/consensus/metadata.rs +++ b/script/research/validatord/src/consensus/metadata.rs @@ -1,4 +1,4 @@ -use crate::{ +use darkfi::{ util::serial::{deserialize, serialize, SerialDecodable, SerialEncodable}, Result, }; diff --git a/src/consensus/mod.rs b/script/research/validatord/src/consensus/mod.rs similarity index 100% rename from src/consensus/mod.rs rename to script/research/validatord/src/consensus/mod.rs diff --git a/src/consensus/participant.rs b/script/research/validatord/src/consensus/participant.rs similarity index 99% rename from src/consensus/participant.rs rename to script/research/validatord/src/consensus/participant.rs index 895aa19d3..7273e0c85 100644 --- a/src/consensus/participant.rs +++ b/script/research/validatord/src/consensus/participant.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeMap, io}; -use crate::{ +use darkfi::{ impl_vec, net, util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, Result, diff --git a/src/consensus/state.rs b/script/research/validatord/src/consensus/state.rs similarity index 84% rename from src/consensus/state.rs rename to script/research/validatord/src/consensus/state.rs index 441bbfced..a691124d2 100644 --- a/src/consensus/state.rs +++ b/script/research/validatord/src/consensus/state.rs @@ -1,5 +1,5 @@ use chrono::{NaiveDateTime, Utc}; -use log::{debug, error}; +use log::{debug, error, warn}; use rand::rngs::OsRng; use std::{ collections::{hash_map::DefaultHasher, BTreeMap}, @@ -9,7 +9,7 @@ use std::{ time::Duration, }; -use crate::{ +use darkfi::{ crypto::{ keypair::{PublicKey, SecretKey}, schnorr::{SchnorrPublic, SchnorrSecret}, @@ -41,10 +41,14 @@ pub struct ConsensusState { pub proposals: Vec, /// Orphan votes pool, in case a vote reaches a node before the corresponding block pub orphan_votes: Vec, - /// Validators currently participating in the concensus + /// Node participation identity + pub participant: Option, + /// Validators currently participating in the consensus pub participants: BTreeMap, - /// Validators to be added on next epoch as participants + /// Validators to be added on the next epoch as participants pub pending_participants: Vec, + /// Last slot participants where refreshed + pub refreshed: u64, } impl ConsensusState { @@ -57,8 +61,10 @@ impl ConsensusState { genesis: Timestamp(genesis), proposals: Vec::new(), orphan_votes: Vec::new(), + participant: None, participants: BTreeMap::new(), - pending_participants: Vec::new(), + pending_participants: vec![], + refreshed: 0, }; let serialized = serialize(&consensus); tree.insert(id.to_ne_bytes(), serialized)?; @@ -172,14 +178,40 @@ impl ValidatorState { pub fn current_epoch(&self) -> u64 { self.consensus.genesis.clone().elapsed() / (2 * DELTA) } + + /// Finds the last epoch a proposal or block was generated. + pub fn last_epoch(&self) -> Result { + let mut epoch = 0; + for chain in &self.consensus.proposals { + for proposal in &chain.proposals { + if proposal.block.sl > epoch { + epoch = proposal.block.sl; + } + } + } + + // We return here in case proposals exist, + // so we don't query the sled database. + if epoch > 0 { + return Ok(epoch) + } + + let (last_sl, _) = self.blockchain.last()?.unwrap(); + Ok(last_sl) + } /// Node finds epochs leader, using a simple hash method. /// Leader calculation is based on how many nodes are participating in the network. pub fn epoch_leader(&mut self) -> u64 { let epoch = self.current_epoch(); + // DefaultHasher is used to hash the epoch number + // because it produces a number string which then can be modulated by the len. + // blake3 produces alphanumeric let mut hasher = DefaultHasher::new(); epoch.hash(&mut hasher); let pos = hasher.finish() % (self.consensus.participants.len() as u64); + // Since BTreeMap orders by key in asceding order, each node will have + // the same key in calculated position. self.consensus.participants.iter().nth(pos as usize).unwrap().1.id } @@ -275,6 +307,9 @@ impl ValidatorState { if !self.participating { return Ok(None) } + + // Node refreshes participants records + self.refresh_participants()?; let leader = self.epoch_leader(); if leader != proposal.id { @@ -416,6 +451,9 @@ impl ValidatorState { debug!("Voter signature couldn't be verified. Voter: {:?}", vote.id); return Ok((false, None)) } + + // Node refreshes participants records + self.refresh_participants()?; let nodes_count = self.consensus.participants.len(); // Checking that the voter can actually vote. @@ -565,6 +603,12 @@ impl ValidatorState { Ok(to_broadcast) } + + /// Append node participant identity to the pending participants list. + pub fn append_self_participant(&mut self, participant: Participant) { + self.consensus.participant = Some(participant.clone()); + self.append_participant(participant); + } /// Node retreives a new participant and appends it to the pending participants list. pub fn append_participant(&mut self, participant: Participant) -> bool { @@ -575,40 +619,81 @@ impl ValidatorState { true } - /// Node refreshes participants map, to retain only the active ones. - /// Active nodes are considered those who joined or voted on previous epoch. - pub fn refresh_participants(&mut self) { - // adding pending participants + /// Refresh the participants map, to retain only the active ones. + /// Active nodes are considered those that on the epoch the last proposal + /// was generated, either voted or joined the previous epoch. + /// That ensures we cover the case of chosen leader beign inactive. + pub fn refresh_participants(&mut self) -> Result<()> { + // Node checks if it should refresh its participants list + let epoch = self.current_epoch(); + if epoch <= self.consensus.refreshed { + debug!("refresh_participants(): Participants have been refreshed this epoch."); + return Ok(()) + } + + debug!("refresh_participants(): Adding pending participants"); for participant in &self.consensus.pending_participants { self.consensus.participants.insert(participant.id, participant.clone()); } - self.consensus.pending_participants = Vec::new(); + + if self.consensus.participants.is_empty() { + debug!( + "refresh_participants(): Didn't manage to add any participant, pending were empty." + ); + } + + self.consensus.pending_participants = vec![]; let mut inactive = Vec::new(); - let previous_epoch = self.current_epoch() - 1; + let mut last_epoch = self.last_epoch()?; + + // This check ensures that we don't chech the current epoch, + // as a node might receive the proposal of current epoch before + // starting refreshing participants, so the last_epoch will be + // the current one. + if last_epoch >= epoch { + last_epoch = epoch - 1; + } + + let previous_epoch = last_epoch - 1; + + error!( + "refresh_participants(): Checking epochs: previous - {:?}, last - {:?}", + previous_epoch, last_epoch + ); + for (index, participant) in self.consensus.participants.clone().iter() { match participant.voted { Some(epoch) => { - if epoch < previous_epoch { - inactive.push(index.clone()); + if epoch < last_epoch { + warn!("refresh_participants(): Inactive participant: {:?}", participant); + inactive.push(*index); } } None => { if participant.joined < previous_epoch { - inactive.push(index.clone()); + warn!("refresh_participants(): Inactive participant: {:?}", participant); + inactive.push(*index); } } } } + for index in inactive { self.consensus.participants.remove(&index); } if self.consensus.participants.is_empty() { // If no nodes are active, node becomes a single node network. - let participant = Participant::new(self.id, self.current_epoch()); + let mut participant = self.consensus.participant.clone().unwrap(); + participant.joined = epoch; + self.consensus.participant = Some(participant.clone()); self.consensus.participants.insert(participant.id, participant.clone()); } + + self.consensus.refreshed = epoch; + + Ok(()) } /// Util function to save the current consensus state to provided file path. @@ -628,8 +713,10 @@ impl ValidatorState { genesis, proposals: Vec::new(), orphan_votes: Vec::new(), + participant: None, participants: BTreeMap::new(), - pending_participants: Vec::new(), + pending_participants: vec![], + refreshed: 0, }; self.consensus = consensus; diff --git a/src/consensus/tx.rs b/script/research/validatord/src/consensus/tx.rs similarity index 99% rename from src/consensus/tx.rs rename to script/research/validatord/src/consensus/tx.rs index a816c03e4..1c0a87d96 100644 --- a/src/consensus/tx.rs +++ b/script/research/validatord/src/consensus/tx.rs @@ -1,6 +1,6 @@ use std::io; -use crate::{ +use darkfi::{ impl_vec, net, util::serial::{ deserialize, serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt, diff --git a/src/consensus/util.rs b/script/research/validatord/src/consensus/util.rs similarity index 93% rename from src/consensus/util.rs rename to script/research/validatord/src/consensus/util.rs index 785fb2f74..76372619d 100644 --- a/src/consensus/util.rs +++ b/script/research/validatord/src/consensus/util.rs @@ -1,6 +1,6 @@ use chrono::{NaiveDateTime, Utc}; -use crate::util::serial::{SerialDecodable, SerialEncodable}; +use darkfi::util::serial::{SerialDecodable, SerialEncodable}; /// Serialized blake3 hash bytes for character "⊥" pub const EMPTY_HASH_BYTES: [u8; 32] = [ diff --git a/src/consensus/vote.rs b/script/research/validatord/src/consensus/vote.rs similarity index 98% rename from src/consensus/vote.rs rename to script/research/validatord/src/consensus/vote.rs index 21e8ecf67..cdb031264 100644 --- a/src/consensus/vote.rs +++ b/script/research/validatord/src/consensus/vote.rs @@ -1,6 +1,6 @@ use std::io; -use crate::{ +use darkfi::{ crypto::{keypair::PublicKey, schnorr::Signature}, impl_vec, net, util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, diff --git a/script/research/validatord/src/lib.rs b/script/research/validatord/src/lib.rs deleted file mode 100644 index 4cfd65d53..000000000 --- a/script/research/validatord/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod protocols; diff --git a/script/research/validatord/src/main.rs b/script/research/validatord/src/main.rs index a13d4c84f..685d43bd1 100644 --- a/script/research/validatord/src/main.rs +++ b/script/research/validatord/src/main.rs @@ -11,11 +11,12 @@ use structopt::StructOpt; use structopt_toml::StructOptToml; use darkfi::{ - consensus::{ + consensus2::{ block::{BlockOrder, BlockResponse}, participant::Participant, state::{ConsensusRequest, ConsensusResponse, ValidatorState, ValidatorStatePtr}, tx::Tx, + proto::{ProtocolSync, ProtocolTx, ProtocolVote, ProtocolProposal, ProtocolParticipant, ProtocolSyncConsensus} }, net, rpc::{ @@ -35,12 +36,6 @@ use darkfi::{ Result, }; -use validatord::protocols::{ - protocol_participant::ProtocolParticipant, protocol_proposal::ProtocolProposal, - protocol_sync::ProtocolSync, protocol_sync_consensus::ProtocolSyncConsensus, - protocol_tx::ProtocolTx, protocol_vote::ProtocolVote, -}; - const CONFIG_FILE: &str = r"validatord_config.toml"; const CONFIG_FILE_CONTENTS: &[u8] = include_bytes!("../validatord_config.toml"); @@ -124,7 +119,7 @@ async fn syncing_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> // Nodes sends the last known block hash of the canonical blockchain // and loops until the respond is the same block (used to utilize batch requests) - let mut last = state.read().unwrap().blockchain.last()?.unwrap(); + let mut last = state.read().await.blockchain.last()?.unwrap(); info!("Last known block: {:?} - {:?}", last.0, last.1); loop { // Node creates a BlockOrder and sends it @@ -134,9 +129,9 @@ async fn syncing_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> // Node stores responce data. Extra validations can be added here. let response = response_sub.receive().await?; for info in &response.blocks { - state.write().unwrap().blockchain.add_by_info(info.clone())?; + state.write().await.blockchain.add_by_info(info.clone())?; } - let last_received = state.read().unwrap().blockchain.last()?.unwrap(); + let last_received = state.read().await.blockchain.last()?.unwrap(); info!("Last received block: {:?} - {:?}", last_received.0, last_received.1); if last == last_received { break @@ -167,15 +162,15 @@ async fn syncing_consensus_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> R .expect("Missing ConsensusResponse dispatcher!"); // Node creates a ConsensusRequest and sends it - let request = ConsensusRequest { id: state.read().unwrap().id }; + let request = ConsensusRequest { id: state.read().await.id }; channel.send(request).await?; // Node stores responce data. Extra validations can be added here. let response = response_sub.receive().await?; - state.write().unwrap().consensus = response.consensus.clone(); + state.write().await.consensus = response.consensus.clone(); } else { info!("Node is not connected to other nodes, resetting consensus state."); - state.write().unwrap().reset_consensus_state()?; + state.write().await.reset_consensus_state()?; } info!("Node synced!"); @@ -185,7 +180,7 @@ async fn syncing_consensus_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> R async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { // Node waits just before the current or next epoch end, // so it can start syncing latest state. - let mut seconds_until_next_epoch = state.read().unwrap().next_epoch_start(); + let mut seconds_until_next_epoch = state.read().await.next_epoch_start(); let one_sec = Duration::new(1, 0); loop { if seconds_until_next_epoch > one_sec { @@ -194,7 +189,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { } info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch); thread::sleep(seconds_until_next_epoch); - seconds_until_next_epoch = state.read().unwrap().next_epoch_start(); + seconds_until_next_epoch = state.read().await.next_epoch_start(); } info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch); thread::sleep(seconds_until_next_epoch); @@ -208,8 +203,8 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { // Node signals the network that it will start participating let participant = - Participant::new(state.read().unwrap().id, state.read().unwrap().current_epoch()); - state.write().unwrap().append_participant(participant.clone()); + Participant::new(state.read().await.id, state.read().await.current_epoch()); + state.write().await.append_self_participant(participant.clone()); let result = p2p.broadcast(participant.clone()).await; match result { Ok(()) => info!("Participation message broadcasted successfuly."), @@ -217,20 +212,20 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { } // After initialization node waits for next epoch to start participating - let seconds_until_next_epoch = state.read().unwrap().next_epoch_start(); + let seconds_until_next_epoch = state.read().await.next_epoch_start(); info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch); thread::sleep(seconds_until_next_epoch); // Node modifies its participating flag to true - state.write().unwrap().participating = true; + state.write().await.participating = true; loop { // Node refreshes participants records - state.write().unwrap().refresh_participants(); + state.write().await.refresh_participants(); // Node checks if its the epoch leader to generate a new proposal for that epoch - let result = if state.write().unwrap().is_epoch_leader() { - state.read().unwrap().propose() + let result = if state.write().await.is_epoch_leader() { + state.read().await.propose() } else { Ok(None) }; @@ -242,14 +237,14 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { // Leader creates a vote for the proposal and broadcasts them both let unwrapped = proposal.unwrap(); info!("Node is the epoch leader. Proposed block: {:?}", unwrapped); - let vote = state.write().unwrap().receive_proposal(&unwrapped); + let vote = state.write().await.receive_proposal(&unwrapped); match vote { Ok(x) => { if x.is_none() { error!("Node did not vote for the proposed block."); } else { let vote = x.unwrap(); - let result = state.write().unwrap().receive_vote(&vote); + let result = state.write().await.receive_vote(&vote); match result { Ok(_) => info!("Vote saved successfuly."), Err(e) => error!("Vote save failed. Error: {:?}", e), @@ -278,7 +273,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { } // Current node state is flushed to sled database - let result = state.read().unwrap().save_consensus_state(); + let result = state.read().await.save_consensus_state(); match result { Ok(()) => (), Err(e) => { @@ -287,7 +282,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { }; // Node waits until next epoch - let seconds_until_next_epoch = state.read().unwrap().next_epoch_start(); + let seconds_until_next_epoch = state.read().await.next_epoch_start(); info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch); thread::sleep(seconds_until_next_epoch); } @@ -491,7 +486,7 @@ impl JsonRpcInterface { let payload = String::from(args[0].as_str().unwrap()); let tx = Tx { payload }; - self.state.write().unwrap().append_tx(tx.clone()); + self.state.write().await.append_tx(tx.clone()); let result = self.p2p.broadcast(tx).await; match result { diff --git a/script/research/validatord/src/protocols/protocol_participant.rs b/script/research/validatord/src/protocols/protocol_participant.rs index 1d7adfd1b..92137e049 100644 --- a/script/research/validatord/src/protocols/protocol_participant.rs +++ b/script/research/validatord/src/protocols/protocol_participant.rs @@ -1,8 +1,8 @@ use async_executor::Executor; use async_trait::async_trait; -use darkfi::{ - consensus::{participant::Participant, state::ValidatorStatePtr}, +use darkfi::{ + consensus::{participant::Participant, state::ValidatorStatePtr}, net::{ ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr, diff --git a/src/consensus2/task/proposal.rs b/src/consensus2/task/proposal.rs index f98c2d389..0614be0f1 100644 --- a/src/consensus2/task/proposal.rs +++ b/src/consensus2/task/proposal.rs @@ -39,7 +39,6 @@ pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) { // Node signals the network that it will start participating let participant = Participant::new(state.read().await.id, state.read().await.current_epoch()); state.write().await.append_self_participant(participant.clone()); - state.write().await.append_participant(participant.clone()); match p2p.broadcast(participant).await { Ok(()) => info!("Consensus participation message broadcasted successfully."),