consensus: minor fixes

This commit is contained in:
aggstam
2022-04-27 21:08:06 +03:00
parent 193e9d16c3
commit 341635f12b
4 changed files with 97 additions and 79 deletions

View File

@@ -14,7 +14,8 @@ make BINS=darkfid2
pids=()
# Starting node 0 (seed) in background
./darkfid2 \
LOG_TARGETS="!sled,!net" ./darkfid2 \
-v \
--consensus \
--consensus-p2p-accept 127.0.0.1:6000 \
--consensus-p2p-external 127.0.0.1:6000 \
@@ -34,7 +35,8 @@ sleep 20
bound=$(($nodes-2))
for i in $(eval echo "{1..$bound}")
do
./darkfid2 \
LOG_TARGETS="!sled,!net" ./darkfid2 \
-v \
--consensus \
--consensus-seed 127.0.0.1:6000 \
--sync-seed 127.0.0.1:6020 \
@@ -64,7 +66,8 @@ function ctrl_c() {
bound=$(($nodes-1))
# Starting last node
./darkfid2 \
LOG_TARGETS="!sled,!net" ./darkfid2 \
-v \
--consensus \
--consensus-seed 127.0.0.1:6000 \
--sync-seed 127.0.0.1:6020 \

View File

@@ -1,3 +1,4 @@
use async_std::sync::Arc;
use std::{fs::File, io::Write};
use darkfi::{
@@ -357,13 +358,13 @@ async fn main() -> Result<()> {
let nodes = 4;
let genesis_ts = Timestamp(1648383795);
let genesis_data = *TESTNET_GENESIS_HASH_BYTES;
let pass = "changeme";
for i in 0..nodes {
// Initialize or load wallet
let path = format!("../../../tmp/node{:?}/wallet.db", i);
let pass = "changeme";
let wallet = init_wallet(&path, &pass).await?;
Client::new(wallet.clone()).await?;
let address = wallet.get_default_address().await?;
let client = Arc::new(Client::new(wallet).await?);
// Initialize or load sled database
let path = format!("../../../tmp/node{:?}/blockchain/testnet", i);
@@ -372,7 +373,8 @@ async fn main() -> Result<()> {
// Data export
println!("Exporting data for node{:?} - {:?}", i, address.to_string());
let state = ValidatorState::new(&sled_db, address, genesis_ts, genesis_data)?;
let state =
ValidatorState::new(&sled_db, genesis_ts, genesis_data, client, vec![], vec![]).await?;
let info = StateInfo::new(&*state.read().await);
let info_string = format!("{:#?}", info);
let path = format!("node{:?}_testnet_db", i);

View File

@@ -118,8 +118,8 @@ pub struct ValidatorState {
pub client: Arc<Client>,
/// Pending transactions
pub unconfirmed_txs: Vec<Tx>,
/// Participation flag
pub participating: bool,
/// Participating start epoch
pub participating: Option<u64>,
}
impl ValidatorState {
@@ -137,7 +137,7 @@ impl ValidatorState {
let consensus = ConsensusState::new(genesis_ts, genesis_data)?;
let blockchain = Blockchain::new(db, genesis_ts, genesis_data)?;
let unconfirmed_txs = vec![];
let participating = false;
let participating = None;
let address = client.wallet.get_default_address().await?;
let state_machine = Arc::new(Mutex::new(State {
@@ -219,6 +219,12 @@ impl ValidatorState {
Duration::new(diff.num_seconds().try_into().unwrap(), 0)
}
/// Set participating epoch to next.
pub fn set_participating(&mut self) -> Result<()> {
self.participating = Some(self.current_epoch() + 1);
Ok(())
}
/// Find epoch leader, using a simple hash method.
/// Leader calculation is based on how many nodes are participating
/// in the network.
@@ -326,8 +332,13 @@ impl ValidatorState {
/// and proceed with voting on it.
pub fn receive_proposal(&mut self, proposal: &BlockProposal) -> Result<Option<Vote>> {
// Node hasn't started participating
if !self.participating {
return Ok(None)
match self.participating {
Some(start) => {
if self.current_epoch() < start {
return Ok(None)
}
}
None => return Ok(None),
}
// Node refreshes participants records
@@ -480,9 +491,15 @@ impl ValidatorState {
/// Finally, we check if the notarization of the proposal can finalize
/// parent proposals in its chain.
pub fn receive_vote(&mut self, vote: &Vote) -> Result<(bool, Option<Vec<BlockInfo>>)> {
let current_epoch = self.current_epoch();
// Node hasn't started participating
if !self.participating {
return Ok((false, None))
match self.participating {
Some(start) => {
if current_epoch < start {
return Ok((false, None))
}
}
None => return Ok((false, None)),
}
let mut encoded_proposal = vec![];
@@ -508,10 +525,23 @@ impl ValidatorState {
// Checking that the voter can actually vote.
match self.consensus.participants.get(&vote.address) {
Some(participant) => {
if self.current_epoch() <= participant.joined {
let mut participant = participant.clone();
if current_epoch <= participant.joined {
warn!(target: "consensus", "Voter ({}) joined after current epoch.", vote.address.to_string());
return Ok((false, None))
}
// Updating participant vote
match participant.voted {
Some(voted) => {
if vote.sl > voted {
participant.voted = Some(vote.sl);
}
}
None => participant.voted = Some(vote.sl),
}
self.consensus.participants.insert(participant.address, participant);
}
None => {
warn!(target: "consensus", "Voter ({}) is not a participant!", vote.address.to_string());
@@ -559,22 +589,6 @@ impl ValidatorState {
}
}
// Updating participant vote
let mut participant = match self.consensus.participants.get(&vote.address) {
Some(p) => p.clone(),
None => Participant::new(vote.address, vote.sl),
};
match participant.voted {
Some(voted) => {
if vote.sl > voted {
participant.voted = Some(vote.sl);
}
}
None => participant.voted = Some(vote.sl),
}
self.consensus.participants.insert(participant.address, participant);
Ok((true, Some(to_broadcast)))
}
@@ -762,8 +776,9 @@ impl ValidatorState {
}
}
None => {
if participant.joined < previous_epoch &&
participant.joined < previous_from_last_epoch
if (previous_epoch == last_epoch && participant.joined < previous_epoch) ||
(previous_epoch != last_epoch &&
participant.joined < previous_from_last_epoch)
{
warn!(
"refresh_participants(): Inactive participant: {:?} (joined {:?}, voted {:?})",

View File

@@ -46,15 +46,17 @@ pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
Err(e) => error!("Failed broadcasting consensus participation: {}", e),
}
// After initialization node waits for next epoch to start participating
let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs();
info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch);
sleep(seconds_until_next_epoch).await;
// Note modifies its participating flag to true.
state.write().await.participating = true;
// Node modifies its participating epoch to next.
match state.write().await.set_participating() {
Ok(()) => info!("Node will start participating at next epoch!"),
Err(e) => error!("Failed to set participation epoch: {}", e),
}
loop {
let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs();
info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch);
sleep(seconds_until_next_epoch).await;
// Node refreshes participants records
match state.write().await.refresh_participants() {
Ok(()) => debug!("Participants refreshed successfully."),
@@ -73,57 +75,53 @@ pub async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
Ok(proposal) => {
if proposal.is_none() {
info!(target: "consensus", "Node is not the epoch leader. Sleeping till next epoch...");
} else {
// Leader creates a vote for the proposal and broadcasts them both
let proposal = proposal.unwrap();
info!(target: "consensus", "Node is the epoch leader: Proposed block: {:?}", proposal);
let vote = state.write().await.receive_proposal(&proposal);
match vote {
Ok(v) => {
if v.is_none() {
debug!("proposal_task(): Node did not vote for the proposed block");
} else {
let vote = v.unwrap();
let result = state.write().await.receive_vote(&vote);
match result {
Ok(_) => info!(target: "consensus", "Vote saved successfully."),
Err(e) => {
error!(target: "consensus", "Vote save failed: {}", e)
}
continue
}
// Leader creates a vote for the proposal and broadcasts them both
let proposal = proposal.unwrap();
info!(target: "consensus", "Node is the epoch leader: Proposed block: {:?}", proposal);
let vote = state.write().await.receive_proposal(&proposal);
match vote {
Ok(v) => {
if v.is_none() {
debug!("proposal_task(): Node did not vote for the proposed block");
} else {
let vote = v.unwrap();
let result = state.write().await.receive_vote(&vote);
match result {
Ok(_) => info!(target: "consensus", "Vote saved successfully."),
Err(e) => {
error!(target: "consensus", "Vote save failed: {}", e)
}
}
// Broadcast block
let result = p2p.broadcast(proposal).await;
match result {
Ok(()) => {
info!(target: "consensus", "Proposal broadcasted successfully.")
}
Err(e) => {
error!(target: "consensus", "Failed broadcasting proposal: {}", e)
}
// Broadcast block
let result = p2p.broadcast(proposal).await;
match result {
Ok(()) => {
info!(target: "consensus", "Proposal broadcasted successfully.")
}
Err(e) => {
error!(target: "consensus", "Failed broadcasting proposal: {}", e)
}
}
// Broadcast leader vote
let result = p2p.broadcast(vote).await;
match result {
Ok(()) => {
info!(target: "consensus", "Leader vote broadcasted successfully.")
}
Err(e) => {
error!(target: "consensus", "Failed broadcasting leader vote: {}", e)
}
// Broadcast leader vote
let result = p2p.broadcast(vote).await;
match result {
Ok(()) => {
info!(target: "consensus", "Leader vote broadcasted successfully.")
}
Err(e) => {
error!(target: "consensus", "Failed broadcasting leader vote: {}", e)
}
}
}
Err(e) => error!(target: "consensus", "Failed processing proposal: {}", e),
}
Err(e) => error!(target: "consensus", "Failed processing proposal: {}", e),
}
}
Err(e) => error!("Block proposal failed: {}", e),
}
let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs();
info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch);
sleep(seconds_until_next_epoch).await;
}
}