diff --git a/src/consensus/task/proposal.rs b/src/consensus/task/proposal.rs index 430c65d6a..3a7989f1e 100644 --- a/src/consensus/task/proposal.rs +++ b/src/consensus/task/proposal.rs @@ -4,63 +4,66 @@ use log::{debug, error, info}; use super::consensus_sync_task; use crate::{ - consensus::{state::ValidatorStatePtr, Participant}, - net, - util::async_util::sleep, + consensus::{Participant, ValidatorStatePtr}, + net::P2pPtr, + util::sleep, }; /// async task used for participating in the consensus protocol -pub async fn proposal_task(p2p: net::P2pPtr, sync_p2p: net::P2pPtr, state: ValidatorStatePtr) { - // Node waits just before the current or next epoch end, - // so it can start syncing latest state. +pub async fn proposal_task(consensus_p2p: P2pPtr, sync_p2p: P2pPtr, state: ValidatorStatePtr) { + // Node waits just before the current or next epoch end, so it can + // start syncing latest state. let mut seconds_until_next_epoch = state.read().await.next_epoch_start(); let one_sec = Duration::new(1, 0); + loop { if seconds_until_next_epoch > one_sec { seconds_until_next_epoch -= one_sec; break } - info!("Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch); + + info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch); sleep(seconds_until_next_epoch.as_secs()).await; seconds_until_next_epoch = state.read().await.next_epoch_start(); } - info!("Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch); + + info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch); sleep(seconds_until_next_epoch.as_secs()).await; // Node syncs its consensus state - match consensus_sync_task(p2p.clone(), state.clone()).await { - Ok(()) => {} - Err(e) => { - error!("Failed syncing consensus state: {}. Quitting consensus.", e); - return - } - } + if let Err(e) = consensus_sync_task(consensus_p2p.clone(), state.clone()).await { + error!("Failed syncing consensus state: {}. Quitting consensus.", e); + // TODO: Perhaps notify over a channel in order to + // stop consensus p2p protocols. + return + }; - // Node signals the network that it will start participating - let participant = - Participant::new(state.read().await.address, state.read().await.current_epoch()); + // Node signals the network that iw till start participating + let address = state.read().await.address; + let cur_epoch = state.read().await.current_epoch(); + let participant = Participant::new(address, cur_epoch); state.write().await.append_participant(participant.clone()); - match p2p.broadcast(participant).await { - Ok(()) => info!("Consensus participation message broadcasted successfully."), + match consensus_p2p.broadcast(participant).await { + Ok(()) => info!("consensus: Participation message broadcasted successfully."), Err(e) => error!("Failed broadcasting consensus participation: {}", e), } // Node modifies its participating epoch to next. match state.write().await.set_participating() { - Ok(()) => info!("Node will start participating at next epoch!"), + Ok(()) => info!("consensus: Node will start participating in the next epoch"), Err(e) => error!("Failed to set participation epoch: {}", e), } loop { - let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs(); - info!(target: "consensus", "Waiting for next epoch ({:?} sec)...", seconds_until_next_epoch); - sleep(seconds_until_next_epoch).await; + let seconds_next_epoch = state.read().await.next_epoch_start().as_secs(); + info!("consensus: Waiting for next epoch ({} sec)", seconds_next_epoch); + sleep(seconds_next_epoch).await; // Node refreshes participants records match state.write().await.refresh_participants() { Ok(()) => debug!("Participants refreshed successfully."), - Err(e) => error!("Failed refreshing participants: {}", e), + Err(e) => error!("Failed refreshing consensus participants: {}", e), } // Node checks if it's the epoch leader to generate a new proposal @@ -71,79 +74,69 @@ pub async fn proposal_task(p2p: net::P2pPtr, sync_p2p: net::P2pPtr, state: Valid Ok(None) }; - match result { - Ok(proposal) => { - if proposal.is_none() { - info!(target: "consensus", "Node is not the epoch leader. Sleeping till next epoch..."); + let proposal = match result { + Ok(prop) => { + if prop.is_none() { + info!("consensus: Node is not the epoch lead"); continue } - // Leader creates a vote for the proposal and broadcasts them both - let proposal = proposal.unwrap(); - info!(target: "consensus", "Node is the epoch leader: Proposed block: {:?}", proposal); - let vote = state.write().await.receive_proposal(&proposal); - match vote { - Ok(v) => { - if v.is_none() { - debug!("proposal_task(): Node did not vote for the proposed block"); - } else { - let vote = v.unwrap(); - let result = state.write().await.receive_vote(&vote).await; - match result { - Ok((_, to_broadcast)) => { - info!(target: "consensus", "Vote saved successfully."); - // Broadcast finalized blocks info, if any - match to_broadcast { - Some(blocks) => { - debug!("handle_receive_vote(): Broadcasting finalized blocks"); - for info in blocks { - let result = sync_p2p.broadcast(info).await; - match result { - Ok(()) => { - info!(target: "consensus", "Finalized block broadcasted successfully.") - } - Err(e) => { - error!(target: "consensus", "Failed broadcasting finalized block: {}", e) - } - } - } - } - None => { - debug!("handle_receive_vote(): No finalized blocks to broadcast"); - } - } - } - Err(e) => { - error!(target: "consensus", "Vote save failed: {}", e) - } - } + prop.unwrap() + } + Err(e) => { + error!("consensus: Block proposal failed: {}", e); + continue + } + }; - // Broadcast block - let result = p2p.broadcast(proposal).await; - match result { - Ok(()) => { - info!(target: "consensus", "Proposal broadcasted successfully.") - } - Err(e) => { - error!(target: "consensus", "Failed broadcasting proposal: {}", e) - } - } + info!("consensus: Node is the epoch leader: Proposed block: {:?}", proposal); + let vote = state.write().await.receive_proposal(&proposal); + let vote = match vote { + Ok(v) => { + if v.is_none() { + debug!("proposal_task(): Node did not vote for the proposed block"); + continue + } + v.unwrap() + } + Err(e) => { + error!("consensus: Failed processing proposal: {}", e); + continue + } + }; - // Broadcast leader vote - let result = p2p.broadcast(vote).await; - match result { - Ok(()) => { - info!(target: "consensus", "Leader vote broadcasted successfully.") - } - Err(e) => { - error!(target: "consensus", "Failed broadcasting leader vote: {}", e) - } - } + let result = state.write().await.receive_vote(&vote).await; + match result { + Ok((_, to_broadcast)) => { + info!("consensus: Vote saved successfully"); + // Broadcast finalized blocks info, if any: + if let Some(blocks) = to_broadcast { + info!("consensus: Broadcasting finalized blocks"); + for info in blocks { + match sync_p2p.broadcast(info).await { + Ok(()) => info!("consensus: Broadcasted block"), + Err(e) => error!("consensus: Failed broadcasting block: {}", e), } } - Err(e) => error!(target: "consensus", "Failed processing proposal: {}", e), + } else { + info!("consensus: No finalized blocks to broadcast"); } } - Err(e) => error!("Block proposal failed: {}", e), + Err(e) => { + error!("consensus: Vote save failed: {}", e); + // TODO: Is this fallthrough ok? + } + } + + // Broadcast block to other consensus nodes + match consensus_p2p.broadcast(proposal).await { + Ok(()) => info!("consensus: Proposal broadcasted successfully"), + Err(e) => error!("consensus: Failed broadcasting proposal: {}", e), + } + + // Broadcast leader vote + match consensus_p2p.broadcast(vote).await { + Ok(()) => info!("consensus: Leader vote broadcasted successfully"), + Err(e) => error!("consensus: Failed broadcasting leader vote: {}", e), } } }