validatord: participants logic implemented

This commit is contained in:
aggstam
2022-04-01 23:33:07 +03:00
parent 56436d0ab9
commit bc49396a64
27 changed files with 320 additions and 102 deletions

View File

@@ -16,7 +16,7 @@ done
pids=()
# Starting node 0 (seed) in background
cargo run -- --nodes $nodes &
cargo run -- &
pids[${#pids[@]}]=$!
# Waiting for seed to setup
@@ -26,7 +26,7 @@ sleep 2
bound=$(($nodes-2))
for i in $(eval echo "{1..$bound}")
do
cargo run -- --accept 0.0.0.0:1100$i --seeds 127.0.0.1:11000 --rpc 127.0.0.1:666$i --external 127.0.0.1:1100$i --id $i --state ~/.config/darkfi/validatord_state_$i --nodes $nodes &
cargo run -- --accept 0.0.0.0:1100$i --seeds 127.0.0.1:11000 --rpc 127.0.0.1:666$i --external 127.0.0.1:1100$i --id $i --state ~/.config/darkfi/validatord_state_$i &
pids[${#pids[@]}]=$!
# waiting for node to setup
sleep 2
@@ -45,7 +45,7 @@ function ctrl_c() {
bound=$(($nodes-1))
# Starting last node
cargo run -- --accept 0.0.0.0:1100$bound --seeds 127.0.0.1:11000 --rpc 127.0.0.1:666$bound --external 127.0.0.1:1100$bound --id 3 --state ~/.config/darkfi/validatord_state_$bound --nodes $nodes
cargo run -- --accept 0.0.0.0:1100$bound --seeds 127.0.0.1:11000 --rpc 127.0.0.1:666$bound --external 127.0.0.1:1100$bound --id $bound --state ~/.config/darkfi/validatord_state_$bound
# Node states are flushed on each node state file at epoch end (every 2 minutes).
# To sugmit a TX, telnet to a node and push the json as per following example:

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -16,7 +16,7 @@ done
pids=()
# Starting node 0 (seed) in background
cargo run -- --nodes $nodes &
cargo run -- &
pids[${#pids[@]}]=$!
# Waiting for seed to setup
@@ -26,7 +26,7 @@ sleep 2
bound=$(($nodes-2))
for i in $(eval echo "{1..$bound}")
do
cargo run -- --accept 0.0.0.0:1100$i --seeds 127.0.0.1:11000 --rpc 127.0.0.1:666$i --external 127.0.0.1:1100$i --id $i --state ~/.config/darkfi/validatord_state_$i --nodes $nodes &
cargo run -- --accept 0.0.0.0:1100$i --seeds 127.0.0.1:11000 --rpc 127.0.0.1:666$i --external 127.0.0.1:1100$i --id $i --state ~/.config/darkfi/validatord_state_$i &
pids[${#pids[@]}]=$!
# waiting for node to setup
sleep 2
@@ -45,7 +45,7 @@ function ctrl_c() {
bound=$(($nodes-1))
# Starting last node
cargo run -- --accept 0.0.0.0:1100$bound --seeds 127.0.0.1:11000 --rpc 127.0.0.1:666$bound --external 127.0.0.1:1100$bound --id 3 --state ~/.config/darkfi/validatord_state_$bound --nodes $nodes
cargo run -- --accept 0.0.0.0:1100$bound --seeds 127.0.0.1:11000 --rpc 127.0.0.1:666$bound --external 127.0.0.1:1100$bound --id $bound --state ~/.config/darkfi/validatord_state_$bound
# Node states are flushed on each node state file at epoch end (every 2 minutes).
# To sugmit a TX, telnet to a node and push the json as per following example:

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -18,7 +18,8 @@
"sm": {
"votes": [],
"notarized": true,
"finalized": true
"finalized": true,
"participants": []
},
"timestamp": 1648383795
}
@@ -27,5 +28,7 @@
},
"node_blockchains": [],
"unconfirmed_txs": [],
"orphan_votes": []
"orphan_votes": [],
"participants": {},
"pending_participants": []
}

View File

@@ -13,6 +13,7 @@ use structopt_toml::StructOptToml;
use darkfi::{
consensus::{
participant::Participant,
state::{State, StatePtr},
tx::Tx,
},
@@ -35,7 +36,8 @@ use darkfi::{
};
use validatord::protocols::{
protocol_proposal::ProtocolProposal, protocol_tx::ProtocolTx, protocol_vote::ProtocolVote,
protocol_participant::ProtocolParticipant, protocol_proposal::ProtocolProposal,
protocol_tx::ProtocolTx, protocol_vote::ProtocolVote,
};
const CONFIG_FILE: &str = r"validatord_config.toml";
@@ -83,9 +85,6 @@ struct Opt {
#[structopt(long, default_value = "0")]
/// Node ID, used only for testing
id: u64,
#[structopt(long, default_value = "1")]
/// Nodes count, used only for testing
nodes: u64,
#[structopt(short, long, default_value = "0")]
/// How many threads to utilize
threads: usize,
@@ -94,15 +93,26 @@ struct Opt {
verbose: u8,
}
// TODO: 1. Nodes count retrieval.
async fn proposal_task(p2p: net::P2pPtr, state: StatePtr, state_path: &PathBuf, nodes_count: u64) {
async fn proposal_task(p2p: net::P2pPtr, state: StatePtr, state_path: &PathBuf) {
// Node signals the network that it starts participating
let participant =
Participant::new(state.read().unwrap().id, state.read().unwrap().get_current_epoch());
state.write().unwrap().append_participant(participant.clone());
let result = p2p.broadcast(participant).await;
match result {
Ok(()) => info!("Participation message broadcasted successfuly."),
Err(e) => error!("Broadcast failed. Error: {:?}", e),
}
// After initialization node should wait for next epoch
let seconds_until_next_epoch = state.read().unwrap().get_seconds_until_next_epoch_start();
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
loop {
let result = if state.read().unwrap().check_if_epoch_leader(nodes_count) {
state.write().unwrap().refresh_participants();
let result = if state.write().unwrap().check_if_epoch_leader() {
state.read().unwrap().propose_block()
} else {
Ok(None)
@@ -114,18 +124,14 @@ async fn proposal_task(p2p: net::P2pPtr, state: StatePtr, state_path: &PathBuf,
} else {
let unwrapped = proposal.unwrap();
info!("Node is the epoch leader. Proposed block: {:?}", unwrapped);
let vote = state.write().unwrap().receive_proposed_block(
&unwrapped,
nodes_count,
true,
);
let vote = state.write().unwrap().receive_proposed_block(&unwrapped, true);
match vote {
Ok(x) => {
if x.is_none() {
debug!("Node did not vote for the proposed block.");
} else {
let vote = x.unwrap();
state.write().unwrap().receive_vote(&vote, nodes_count as usize);
state.write().unwrap().receive_vote(&vote);
// Broadcasting block
let result = p2p.broadcast(unwrapped).await;
match result {
@@ -149,9 +155,6 @@ async fn proposal_task(p2p: net::P2pPtr, state: StatePtr, state_path: &PathBuf,
Err(e) => error!("Broadcast failed. Error: {:?}", e),
}
let seconds_until_next_epoch = state.read().unwrap().get_seconds_until_next_epoch_start();
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
let result = state.read().unwrap().save(state_path);
match result {
Ok(()) => (),
@@ -159,6 +162,9 @@ async fn proposal_task(p2p: net::P2pPtr, state: StatePtr, state_path: &PathBuf,
error!("State could not be flushed: {:?}", e)
}
};
let seconds_until_next_epoch = state.read().unwrap().get_seconds_until_next_epoch_start();
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
}
}
@@ -182,7 +188,6 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
// State setup
let state_path = expand_path(&opts.state).unwrap();
let id = opts.id.clone();
let nodes_count = opts.nodes.clone();
let state = State::load_current_state(id, &state_path).unwrap();
// P2P registry setup
@@ -200,23 +205,28 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
// Adding PropotolVote to the registry
let state2 = state.clone();
let nodes_count2 = nodes_count.clone() as usize;
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let state = state2.clone();
let nodes_count = nodes_count2.clone();
async move { ProtocolVote::init(channel, state, p2p, nodes_count).await }
async move { ProtocolVote::init(channel, state, p2p).await }
})
.await;
// Adding ProtocolProposal to the registry
let state2 = state.clone();
let nodes_count2 = nodes_count.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let state = state2.clone();
let nodes_count = nodes_count2.clone();
async move { ProtocolProposal::init(channel, state, p2p, nodes_count).await }
async move { ProtocolProposal::init(channel, state, p2p).await }
})
.await;
// Adding ProtocolParticipant to the registry
let state2 = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let state = state2.clone();
async move { ProtocolParticipant::init(channel, state, p2p).await }
})
.await;
@@ -245,7 +255,7 @@ async fn start(executor: Arc<Executor<'_>>, opts: &Opt) -> Result<()> {
.spawn(async move { listen_and_serve(rpc_server_config, rpc_interface, ex3).await })
.detach();
proposal_task(p2p, state, &state_path, nodes_count).await;
proposal_task(p2p, state, &state_path).await;
Ok(())
}

View File

@@ -1,7 +1,9 @@
pub mod protocol_participant;
pub mod protocol_proposal;
pub mod protocol_tx;
pub mod protocol_vote;
pub use protocol_participant::ProtocolParticipant;
pub use protocol_proposal::ProtocolProposal;
pub use protocol_tx::ProtocolTx;
pub use protocol_vote::ProtocolVote;

View File

@@ -0,0 +1,74 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{participant::Participant, state::StatePtr},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
use log::debug;
use std::sync::Arc;
pub struct ProtocolParticipant {
participant_sub: MessageSubscription<Participant>,
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
p2p: P2pPtr,
}
impl ProtocolParticipant {
pub async fn init(channel: ChannelPtr, state: StatePtr, p2p: P2pPtr) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Participant>().await;
let participant_sub =
channel.subscribe_msg::<Participant>().await.expect("Missing Participant dispatcher!");
Arc::new(Self {
participant_sub,
jobsman: ProtocolJobsManager::new("ParticipantProtocol", channel),
state,
p2p,
})
}
async fn handle_receive_participant(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolParticipant::handle_receive_participant() [START]");
loop {
let participant = self.participant_sub.receive().await?;
debug!(
target: "ircd",
"ProtocolParticipant::handle_receive_participant() received {:?}",
participant
);
if self.state.write().unwrap().append_participant((*participant).clone()) {
let pending_participants = self.state.read().unwrap().pending_participants.clone();
for pending_participant in pending_participants {
self.p2p.broadcast(pending_participant.clone()).await?;
}
}
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolParticipant {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolParticipant::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman
.clone()
.spawn(self.clone().handle_receive_participant(), executor.clone())
.await;
debug!(target: "ircd", "ProtocolParticipant::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolParticipant"
}
}

View File

@@ -17,16 +17,10 @@ pub struct ProtocolProposal {
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
p2p: P2pPtr,
nodes_count: u64,
}
impl ProtocolProposal {
pub async fn init(
channel: ChannelPtr,
state: StatePtr,
p2p: P2pPtr,
nodes_count: u64,
) -> ProtocolBasePtr {
pub async fn init(channel: ChannelPtr, state: StatePtr, p2p: P2pPtr) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<BlockProposal>().await;
@@ -38,11 +32,9 @@ impl ProtocolProposal {
jobsman: ProtocolJobsManager::new("ProposalProtocol", channel),
state,
p2p,
nodes_count,
})
}
// TODO: 1. Nodes count retrieval.
async fn handle_receive_proposal(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolBlock::handle_receive_proposal() [START]");
loop {
@@ -54,18 +46,14 @@ impl ProtocolProposal {
proposal
);
let proposal_copy = (*proposal).clone();
let vote = self.state.write().unwrap().receive_proposed_block(
&proposal_copy,
self.nodes_count,
false,
);
let vote = self.state.write().unwrap().receive_proposed_block(&proposal_copy, false);
match vote {
Ok(x) => {
if x.is_none() {
debug!("Node did not vote for the proposed block.");
} else {
let vote = x.unwrap();
self.state.write().unwrap().receive_vote(&vote, self.nodes_count as usize);
self.state.write().unwrap().receive_vote(&vote);
// Broadcasting block to rest nodes
self.p2p.broadcast(proposal_copy).await?;
// Broadcasting vote

View File

@@ -17,16 +17,10 @@ pub struct ProtocolVote {
jobsman: ProtocolJobsManagerPtr,
state: StatePtr,
p2p: P2pPtr,
nodes_count: usize,
}
impl ProtocolVote {
pub async fn init(
channel: ChannelPtr,
state: StatePtr,
p2p: P2pPtr,
nodes_count: usize,
) -> ProtocolBasePtr {
pub async fn init(channel: ChannelPtr, state: StatePtr, p2p: P2pPtr) -> ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Vote>().await;
@@ -37,11 +31,9 @@ impl ProtocolVote {
jobsman: ProtocolJobsManager::new("VoteProtocol", channel),
state,
p2p,
nodes_count,
})
}
// TODO: 1. Nodes count retrieval.
async fn handle_receive_vote(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolVote::handle_receive_vote() [START]");
loop {
@@ -53,7 +45,7 @@ impl ProtocolVote {
vote
);
let vote_copy = (*vote).clone();
if self.state.write().unwrap().receive_vote(&vote_copy, self.nodes_count) {
if self.state.write().unwrap().receive_vote(&vote_copy) {
self.p2p.broadcast(vote_copy).await?;
};
}

View File

@@ -43,7 +43,3 @@ state = "~/.config/darkfi/validatord_state_0"
# Node ID, used only for testing
id = 0
# Nodes count, used only for testing
nodes = 1

View File

@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use std::hash::{Hash, Hasher};
use super::{metadata::Metadata, tx::Tx};
use super::{metadata::Metadata, participant::Participant, tx::Tx};
use crate::{
crypto::{keypair::PublicKey, schnorr::Signature},
@@ -24,8 +24,16 @@ pub struct Block {
}
impl Block {
pub fn new(st: String, sl: u64, txs: Vec<Tx>, proof: String, r: String, s: String) -> Block {
Block { st, sl, txs, metadata: Metadata::new(proof, r, s) }
pub fn new(
st: String,
sl: u64,
txs: Vec<Tx>,
proof: String,
r: String,
s: String,
participants: Vec<Participant>,
) -> Block {
Block { st, sl, txs, metadata: Metadata::new(proof, r, s, participants) }
}
}

View File

@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use super::{
participant::Participant,
util::{get_current_time, Timestamp},
vote::Vote,
};
@@ -19,10 +20,10 @@ pub struct Metadata {
}
impl Metadata {
pub fn new(proof: String, r: String, s: String) -> Metadata {
pub fn new(proof: String, r: String, s: String, participants: Vec<Participant>) -> Metadata {
Metadata {
om: OuroborosMetadata::new(proof, r, s),
sm: StreamletMetadata::new(),
sm: StreamletMetadata::new(participants),
timestamp: get_current_time(),
}
}
@@ -54,10 +55,12 @@ pub struct StreamletMetadata {
pub notarized: bool,
/// Block finalization flag
pub finalized: bool,
/// Nodes participated in the voting process
pub participants: Vec<Participant>,
}
impl StreamletMetadata {
pub fn new() -> StreamletMetadata {
StreamletMetadata { votes: Vec::new(), notarized: false, finalized: false }
pub fn new(participants: Vec<Participant>) -> StreamletMetadata {
StreamletMetadata { votes: Vec::new(), notarized: false, finalized: false, participants }
}
}

View File

@@ -1,6 +1,7 @@
pub mod block;
pub mod blockchain;
pub mod metadata;
pub mod participant;
pub mod state;
pub mod tx;
pub mod util;
@@ -9,6 +10,7 @@ pub mod vote;
pub use block::{proposal_eq_block, Block, BlockProposal};
pub use blockchain::Blockchain;
pub use metadata::Metadata;
pub use participant::Participant;
pub use state::State;
pub use tx::Tx;
pub use vote::Vote;

View File

@@ -0,0 +1,33 @@
use serde::{Deserialize, Serialize};
use std::io;
use crate::{
impl_vec, net,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
/// This struct represents a tuple of the form (node_id, epoch_joined, last_epoch_voted).
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, SerialEncodable, SerialDecodable)]
pub struct Participant {
/// Node id
pub id: u64,
/// Epoch node joined the network
pub joined: u64,
/// Last epoch node voted
pub voted: Option<u64>,
}
impl Participant {
pub fn new(id: u64, joined: u64) -> Participant {
Participant { id, joined, voted: None }
}
}
impl net::Message for Participant {
fn name() -> &'static str {
"participant"
}
}
impl_vec!(Participant);

View File

@@ -2,7 +2,7 @@ use chrono::{NaiveDateTime, Utc};
use log::{debug, error};
use serde::{Deserialize, Serialize};
use std::{
collections::hash_map::DefaultHasher,
collections::{hash_map::DefaultHasher, BTreeMap},
hash::{Hash, Hasher},
path::Path,
sync::{Arc, RwLock},
@@ -22,6 +22,7 @@ use rand::rngs::OsRng;
use super::{
block::{proposal_eq_block, Block, BlockProposal},
blockchain::Blockchain,
participant::Participant,
tx::Tx,
util::{get_current_time, load, save, Timestamp},
vote::Vote,
@@ -46,6 +47,8 @@ pub struct State {
pub node_blockchains: Vec<Blockchain>,
pub unconfirmed_txs: Vec<Tx>,
pub orphan_votes: Vec<Vote>,
pub participants: BTreeMap<u64, Participant>,
pub pending_participants: Vec<Participant>,
}
impl State {
@@ -61,6 +64,8 @@ impl State {
node_blockchains: Vec::new(),
unconfirmed_txs: Vec::new(),
orphan_votes: Vec::new(),
participants: BTreeMap::new(),
pending_participants: Vec::new(),
}
}
@@ -96,16 +101,18 @@ impl State {
/// Node finds epochs leader, using a simple hash method.
/// Leader calculation is based on how many nodes are participating in the network.
pub fn get_epoch_leader(&self, nodes_count: u64) -> u64 {
pub fn get_epoch_leader(&mut self) -> u64 {
let epoch = self.get_current_epoch();
let mut hasher = DefaultHasher::new();
epoch.hash(&mut hasher);
hasher.finish() % nodes_count
self.zero_participants_check();
let pos = hasher.finish() % (self.participants.len() as u64);
self.participants.iter().nth(pos as usize).unwrap().1.id
}
/// Node checks if they are the current epoch leader.
pub fn check_if_epoch_leader(&self, nodes_count: u64) -> bool {
let leader = self.get_epoch_leader(nodes_count);
pub fn check_if_epoch_leader(&mut self) -> bool {
let leader = self.get_epoch_leader();
self.id == leader
}
@@ -167,10 +174,9 @@ impl State {
pub fn receive_proposed_block(
&mut self,
proposed_block: &BlockProposal,
nodes_count: u64,
leader: bool,
) -> Result<Option<Vote>> {
assert!(self.get_epoch_leader(nodes_count) == proposed_block.id);
assert!(self.get_epoch_leader() == proposed_block.id);
let mut encoded_block = vec![];
proposed_block.st.encode(&mut encoded_block)?;
proposed_block.sl.encode(&mut encoded_block)?;
@@ -183,6 +189,7 @@ impl State {
/// If block extends the canonical blockchain, a new fork blockchain is created.
/// Node votes on the block, only if it extends the longest notarized chain it has seen.
pub fn vote_block(&mut self, proposal: &BlockProposal, leader: bool) -> Result<Option<Vote>> {
self.zero_participants_check();
let mut block = Block::new(
proposal.st.clone(),
proposal.sl,
@@ -190,6 +197,7 @@ impl State {
String::from("proof"),
String::from("r"),
String::from("s"),
self.participants.values().cloned().collect(),
);
// Add orphan votes
@@ -273,7 +281,7 @@ impl State {
/// nodes unconfirmed transactions list.
/// Finally, we check if the notarization of the block can finalize parent blocks
/// in its blockchain.
pub fn receive_vote(&mut self, vote: &Vote, nodes_count: usize) -> bool {
pub fn receive_vote(&mut self, vote: &Vote) -> bool {
let mut encoded_block = vec![];
let result = vote.block.encode(&mut encoded_block);
match result {
@@ -284,6 +292,10 @@ impl State {
}
};
assert!(&vote.node_public_key.verify(&encoded_block[..], &vote.vote));
let nodes_count = self.participants.len();
self.zero_participants_check();
let vote_block = self.find_block(&vote.block);
if vote_block == None {
debug!("Received vote for unknown block.");
@@ -304,6 +316,11 @@ impl State {
self.check_blockchain_finalization(blockchain_index);
}
// updating participant vote
let mut participant = self.participants.get(&vote.id).unwrap().clone();
participant.voted = Some(vote.block.sl);
self.participants.insert(participant.id, participant);
return true
}
false
@@ -396,6 +413,56 @@ impl State {
}
}
/// Node retreives a new participant and appends it to the pending participants list.
pub fn append_participant(&mut self, participant: Participant) -> bool {
if self.pending_participants.contains(&participant) {
return false
}
self.pending_participants.push(participant);
true
}
/// This prevent the extreme case scenario where network is initialized, but some nodes
/// have not pushed the initial participants in the map.
pub fn zero_participants_check(&mut self) {
if self.participants.len() == 0 {
for participant in &self.pending_participants {
self.participants.insert(participant.id, participant.clone());
}
self.pending_participants = Vec::new();
}
}
/// Node refreshes participants map, to retain only the active ones.
/// Active nodes are considered those who joined or voted on previous epoch.
pub fn refresh_participants(&mut self) {
// adding pending participants
for participant in &self.pending_participants {
self.participants.insert(participant.id, participant.clone());
}
self.pending_participants = Vec::new();
let mut inactive = Vec::new();
let previous_epoch = self.get_current_epoch() - 1;
for (index, participant) in self.participants.clone().iter() {
match participant.voted {
Some(epoch) => {
if epoch < previous_epoch {
inactive.push(index.clone());
}
}
None => {
if participant.joined < previous_epoch {
inactive.push(index.clone());
}
}
}
}
for index in inactive {
self.participants.remove(&index);
}
}
/// Util function to save the current node state to provided file path.
pub fn save(&self, path: &Path) -> Result<()> {
save::<Self>(path, self)
@@ -426,6 +493,7 @@ impl State {
String::from("proof"),
String::from("r"),
String::from("s"),
vec![],
);
genesis_block.metadata.sm.notarized = true;
genesis_block.metadata.sm.finalized = true;