mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
consensus:proposal_task(): redesign completed, consensus minor fixes
This commit is contained in:
@@ -125,12 +125,13 @@ impl ProtocolSync {
|
||||
|
||||
// Check if node started participating in consensus.
|
||||
// Consensus-mode enabled nodes have already performed these steps,
|
||||
// during proposal finalization.
|
||||
// during proposal finalization. They still listen to this sub,
|
||||
// in case they go out of sync and become a none-consensus node.
|
||||
if self.consensus_mode && self.state.read().await.consensus.participating.is_some() {
|
||||
debug!(
|
||||
"ProtocolSync::handle_receive_block(): node runs in consensus mode, skipping..."
|
||||
);
|
||||
return Ok(())
|
||||
continue
|
||||
}
|
||||
|
||||
info!("ProtocolSync::handle_receive_block(): Received block: {}", info.blockhash());
|
||||
@@ -218,12 +219,13 @@ impl ProtocolSync {
|
||||
|
||||
// Check if node started participating in consensus.
|
||||
// Consensus-mode enabled nodes have already performed these steps,
|
||||
// during proposal finalization.
|
||||
// during proposal finalization. They still listen to this sub,
|
||||
// in case they go out of sync and become a none-consensus node.
|
||||
if self.consensus_mode && self.state.read().await.consensus.participating.is_some() {
|
||||
debug!(
|
||||
"ProtocolSync::handle_receive_slot_checkpoint(): node runs in consensus mode, skipping..."
|
||||
);
|
||||
return Ok(())
|
||||
continue
|
||||
}
|
||||
|
||||
info!(
|
||||
|
||||
@@ -122,8 +122,8 @@ impl ProtocolSyncConsensus {
|
||||
);
|
||||
|
||||
// Extra validations can be added here.
|
||||
let slot_checkpoints = !self.state.read().await.consensus.slot_checkpoints.is_empty();
|
||||
let response = ConsensusSlotCheckpointsResponse { slot_checkpoints };
|
||||
let is_empty = self.state.read().await.consensus.slot_checkpoints.is_empty();
|
||||
let response = ConsensusSlotCheckpointsResponse { is_empty };
|
||||
if let Err(e) = self.channel.send(response).await {
|
||||
error!("ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() channel send fail: {}", e);
|
||||
};
|
||||
|
||||
@@ -48,6 +48,8 @@ pub struct ConsensusState {
|
||||
pub genesis_block: blake3::Hash,
|
||||
/// Participating start slot
|
||||
pub participating: Option<u64>,
|
||||
/// Node is able to propose proposals
|
||||
pub proposing: bool,
|
||||
/// Last slot node check for finalization
|
||||
pub checked_finalization: u64,
|
||||
/// Slots offset since genesis,
|
||||
@@ -83,6 +85,7 @@ impl ConsensusState {
|
||||
genesis_ts,
|
||||
genesis_block,
|
||||
participating: None,
|
||||
proposing: false,
|
||||
checked_finalization: 0,
|
||||
offset: None,
|
||||
forks: vec![],
|
||||
@@ -448,7 +451,8 @@ impl ConsensusState {
|
||||
return constants::MAX_F.clone()
|
||||
}
|
||||
let hist_len = self.leaders_history.len();
|
||||
if self.leaders_history[hist_len - 1] == 0 &&
|
||||
if hist_len > 3 &&
|
||||
self.leaders_history[hist_len - 1] == 0 &&
|
||||
self.leaders_history[hist_len - 2] == 0 &&
|
||||
self.leaders_history[hist_len - 3] == 0 &&
|
||||
i == constants::FLOAT10_ZERO.clone()
|
||||
@@ -464,6 +468,10 @@ impl ConsensusState {
|
||||
/// * 'sigma1', 'sigma2': slot sigmas
|
||||
/// Returns: (check: bool, idx: usize) where idx is the winning coin's index
|
||||
pub fn is_slot_leader(&mut self, sigma1: pallas::Base, sigma2: pallas::Base) -> (bool, usize) {
|
||||
// Check if node can produce proposals
|
||||
if !self.proposing {
|
||||
return (false, 0)
|
||||
}
|
||||
let competing_coins = &self.coins.clone();
|
||||
|
||||
let mut won = false;
|
||||
@@ -589,7 +597,7 @@ impl ConsensusState {
|
||||
|
||||
/// Auxillary function to set nodes leaders count history to the largest fork sequence
|
||||
/// of leaders, by using provided index.
|
||||
pub fn set_leader_history(&mut self, index: i64) {
|
||||
pub fn set_leader_history(&mut self, index: i64, current_slot: u64) {
|
||||
// Check if we found longest fork to extract sequence from
|
||||
match index {
|
||||
-1 => {
|
||||
@@ -598,7 +606,7 @@ impl ConsensusState {
|
||||
_ => {
|
||||
info!("set_leader_history(): Checking last proposal of fork: {}", index);
|
||||
let last_proposal = &self.forks[index as usize].sequence.last().unwrap().proposal;
|
||||
if last_proposal.block.header.slot == self.current_slot() {
|
||||
if last_proposal.block.header.slot == current_slot {
|
||||
// Replacing our last history element with the leaders one
|
||||
self.leaders_history.pop();
|
||||
self.leaders_history.push(last_proposal.block.lead_info.leaders);
|
||||
@@ -654,6 +662,7 @@ impl ConsensusState {
|
||||
/// Auxiliary structure to reset consensus state for a resync
|
||||
pub fn reset(&mut self) {
|
||||
self.participating = None;
|
||||
self.proposing = false;
|
||||
self.forks = vec![];
|
||||
self.slot_checkpoints = vec![];
|
||||
self.leaders_history = vec![0];
|
||||
@@ -708,7 +717,7 @@ impl net::Message for ConsensusSlotCheckpointsRequest {
|
||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
|
||||
pub struct ConsensusSlotCheckpointsResponse {
|
||||
/// Node has hot/live slot checkpoints
|
||||
pub slot_checkpoints: bool,
|
||||
pub is_empty: bool,
|
||||
}
|
||||
|
||||
impl net::Message for ConsensusSlotCheckpointsResponse {
|
||||
|
||||
@@ -31,57 +31,9 @@ use crate::{
|
||||
};
|
||||
|
||||
/// async task used for consensus state syncing.
|
||||
pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Result<()> {
|
||||
info!("Starting consensus state sync...");
|
||||
let channels_map = p2p.channels().lock().await;
|
||||
let values = channels_map.values();
|
||||
// Using len here because is_empty() uses unstable library feature
|
||||
// called 'exact_size_is_empty'.
|
||||
if values.len() != 0 {
|
||||
// Node iterates the channel peers to ask for their consensus state
|
||||
for channel in values {
|
||||
// Communication setup
|
||||
let msg_subsystem = channel.get_message_subsystem();
|
||||
msg_subsystem.add_dispatch::<ConsensusResponse>().await;
|
||||
let response_sub = channel.subscribe_msg::<ConsensusResponse>().await?;
|
||||
|
||||
// Node creates a `ConsensusRequest` and sends it
|
||||
let request = ConsensusRequest {};
|
||||
channel.send(request).await?;
|
||||
|
||||
// Node verifies response came from a participating node.
|
||||
// Extra validations can be added here.
|
||||
let response = response_sub.receive().await?;
|
||||
if response.nullifiers.is_empty() {
|
||||
warn!("Retrieved consensus state from a new node, retrying...");
|
||||
continue
|
||||
}
|
||||
|
||||
// Node stores response data.
|
||||
let mut lock = state.write().await;
|
||||
lock.consensus.offset = response.offset;
|
||||
let mut forks = vec![];
|
||||
for fork in &response.forks {
|
||||
forks.push(fork.clone().into());
|
||||
}
|
||||
lock.consensus.forks = forks;
|
||||
lock.unconfirmed_txs = response.unconfirmed_txs.clone();
|
||||
lock.consensus.slot_checkpoints = response.slot_checkpoints.clone();
|
||||
lock.consensus.leaders_history = response.leaders_history.clone();
|
||||
lock.consensus.nullifiers = response.nullifiers.clone();
|
||||
|
||||
break
|
||||
}
|
||||
} else {
|
||||
warn!("Node is not connected to other nodes");
|
||||
}
|
||||
|
||||
info!("Consensus state synced!");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// async task used for consensus state syncing.
|
||||
pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Result<()> {
|
||||
/// Returns flag if node is not connected to other peers or consensus hasn't started,
|
||||
/// so it can immediately start proposing proposals.
|
||||
pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Result<bool> {
|
||||
info!("Starting consensus state sync...");
|
||||
// Loop through connected channels
|
||||
let channels_map = p2p.channels().lock().await;
|
||||
@@ -91,7 +43,7 @@ pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Resu
|
||||
if values.len() == 0 {
|
||||
warn!("Node is not connected to other nodes");
|
||||
info!("Consensus state synced!");
|
||||
return Ok(())
|
||||
return Ok(true)
|
||||
}
|
||||
|
||||
// Node iterates the channel peers to check if at least on peer has seen slot checkpoints
|
||||
@@ -108,7 +60,7 @@ pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Resu
|
||||
|
||||
// Node checks response
|
||||
let response = response_sub.receive().await?;
|
||||
if !response.slot_checkpoints {
|
||||
if response.is_empty {
|
||||
warn!("Node has not seen any slot checkpoints, retrying...");
|
||||
continue
|
||||
}
|
||||
@@ -126,7 +78,7 @@ pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Resu
|
||||
if peer.is_none() {
|
||||
warn!("No node that has seen any slot checkpoints was found.");
|
||||
info!("Consensus state synced!");
|
||||
return Ok(())
|
||||
return Ok(true)
|
||||
}
|
||||
let peer = peer.unwrap();
|
||||
|
||||
@@ -166,5 +118,5 @@ pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Resu
|
||||
lock.consensus.nullifiers = response.nullifiers.clone();
|
||||
|
||||
info!("Consensus state synced!");
|
||||
Ok(())
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
@@ -34,155 +34,12 @@ pub async fn proposal_task(
|
||||
sync_p2p: P2pPtr,
|
||||
state: ValidatorStatePtr,
|
||||
ex: Arc<smol::Executor<'_>>,
|
||||
) {
|
||||
// Node waits just before the current or next epoch last finalization syncing period, so it can
|
||||
// start syncing latest state.
|
||||
let mut seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1);
|
||||
let sync_offset = Duration::new(constants::FINAL_SYNC_DUR + 1, 0);
|
||||
|
||||
loop {
|
||||
if seconds_until_next_epoch > sync_offset {
|
||||
seconds_until_next_epoch -= sync_offset;
|
||||
break
|
||||
}
|
||||
|
||||
info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch);
|
||||
sleep(seconds_until_next_epoch.as_secs()).await;
|
||||
seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1);
|
||||
}
|
||||
|
||||
info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch);
|
||||
sleep(seconds_until_next_epoch.as_secs()).await;
|
||||
|
||||
// Node syncs its consensus state
|
||||
if let Err(e) = consensus_sync_task(consensus_p2p.clone(), state.clone()).await {
|
||||
error!("consensus: Failed syncing consensus state: {}. Quitting consensus.", e);
|
||||
// TODO: Perhaps notify over a channel in order to
|
||||
// stop consensus p2p protocols.
|
||||
return
|
||||
};
|
||||
|
||||
// Node modifies its participating slot to next.
|
||||
match state.write().await.consensus.set_participating() {
|
||||
Ok(()) => info!("consensus: Node will start participating in the next slot"),
|
||||
Err(e) => error!("consensus: Failed to set participation slot: {}", e),
|
||||
}
|
||||
|
||||
loop {
|
||||
// Node sleeps until finalization sync period start (2 seconds before next slot)
|
||||
let seconds_sync_period = (state.read().await.consensus.next_n_slot_start(1) -
|
||||
Duration::new(constants::FINAL_SYNC_DUR, 0))
|
||||
.as_secs();
|
||||
info!("consensus: Waiting for finalization sync period ({} sec)", seconds_sync_period);
|
||||
sleep(seconds_sync_period).await;
|
||||
|
||||
// Check if any forks can be finalized
|
||||
match state.write().await.chain_finalization().await {
|
||||
Ok((to_broadcast_block, to_broadcast_slot_checkpoints)) => {
|
||||
// Broadcasting in background
|
||||
if !to_broadcast_block.is_empty() || !to_broadcast_slot_checkpoints.is_empty() {
|
||||
let _sync_p2p = sync_p2p.clone();
|
||||
ex.spawn(async move {
|
||||
// Broadcast finalized blocks info, if any:
|
||||
info!("consensus: Broadcasting finalized blocks");
|
||||
for info in to_broadcast_block {
|
||||
match _sync_p2p.broadcast(info).await {
|
||||
Ok(()) => info!("consensus: Broadcasted block"),
|
||||
Err(e) => error!("consensus: Failed broadcasting block: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast finalized slot checkpoints, if any:
|
||||
info!("consensus: Broadcasting finalized slot checkpoints");
|
||||
for slot_checkpoint in to_broadcast_slot_checkpoints {
|
||||
match _sync_p2p.broadcast(slot_checkpoint).await {
|
||||
Ok(()) => info!("consensus: Broadcasted slot_checkpoint"),
|
||||
Err(e) => {
|
||||
error!("consensus: Failed broadcasting slot_checkpoint: {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
} else {
|
||||
info!("consensus: No finalized blocks or slot checkpoints to broadcast");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Finalization check failed: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Node sleeps until next slot
|
||||
let seconds_next_slot = state.read().await.consensus.next_n_slot_start(1).as_secs();
|
||||
info!("consensus: Waiting for next slot ({} sec)", seconds_next_slot);
|
||||
sleep(seconds_next_slot).await;
|
||||
|
||||
// Retrieve slot sigmas
|
||||
let (sigma1, sigma2) = state.write().await.consensus.sigmas();
|
||||
// Node checks if epoch has changed, to generate new epoch coins
|
||||
let epoch_changed = state.write().await.consensus.epoch_changed(sigma1, sigma2).await;
|
||||
match epoch_changed {
|
||||
Ok(changed) => {
|
||||
if changed {
|
||||
info!("consensus: New epoch started: {}", state.read().await.consensus.epoch);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Epoch check failed: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
// Node checks if it's the slot leader to generate a new proposal
|
||||
// for that slot.
|
||||
let (won, idx) = state.write().await.consensus.is_slot_leader(sigma1, sigma2);
|
||||
let result = if won { state.write().await.propose(idx, sigma1, sigma2) } else { Ok(None) };
|
||||
let (proposal, coin) = match result {
|
||||
Ok(pair) => {
|
||||
if pair.is_none() {
|
||||
info!("consensus: Node is not the slot lead");
|
||||
continue
|
||||
}
|
||||
pair.unwrap()
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Block proposal failed: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
// Node stores the proposal and broadcast to rest nodes
|
||||
info!("consensus: Node is the slot leader: Proposed block: {}", proposal);
|
||||
debug!("consensus: Full proposal: {:?}", proposal);
|
||||
match state.write().await.receive_proposal(&proposal, Some((idx, coin))).await {
|
||||
Ok(()) => {
|
||||
info!("consensus: Block proposal saved successfully");
|
||||
// Broadcast proposal to other consensus nodes
|
||||
match consensus_p2p.broadcast(proposal).await {
|
||||
Ok(()) => info!("consensus: Proposal broadcasted successfully"),
|
||||
Err(e) => error!("consensus: Failed broadcasting proposal: {}", e),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Block proposal save failed: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// async task used for participating in the consensus protocol
|
||||
pub async fn proposal_task2(
|
||||
consensus_p2p: P2pPtr,
|
||||
sync_p2p: P2pPtr,
|
||||
state: ValidatorStatePtr,
|
||||
ex: Arc<smol::Executor<'_>>,
|
||||
) {
|
||||
let mut retries = 0;
|
||||
// Sync loop
|
||||
loop {
|
||||
// Resetting consensus state, so node can still follow the finalized blocks by
|
||||
// the sync p2p network/protocols
|
||||
// TODO: verify that if a consensus node stops participating will receive finalized blocks
|
||||
state.write().await.consensus.reset();
|
||||
|
||||
// Checking sync retries
|
||||
@@ -193,11 +50,21 @@ pub async fn proposal_task2(
|
||||
}
|
||||
|
||||
// Node syncs its consensus state
|
||||
if let Err(e) = consensus_sync_task(consensus_p2p.clone(), state.clone()).await {
|
||||
error!("consensus: Failed syncing consensus state: {}. Quitting consensus.", e);
|
||||
// TODO: Perhaps notify over a channel in order to
|
||||
// stop consensus p2p protocols.
|
||||
return
|
||||
match consensus_sync_task(consensus_p2p.clone(), state.clone()).await {
|
||||
Ok(p) => {
|
||||
// Check if node is not connected to other nodes and can
|
||||
// start proposing immediately.
|
||||
if p {
|
||||
info!("consensus: Node can start proposing!");
|
||||
state.write().await.consensus.proposing = p;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Failed syncing consensus state: {}. Quitting consensus.", e);
|
||||
// TODO: Perhaps notify over a channel in order to
|
||||
// stop consensus p2p protocols.
|
||||
return
|
||||
}
|
||||
};
|
||||
|
||||
// Node modifies its participating slot to next.
|
||||
@@ -230,135 +97,174 @@ async fn consensus_loop(
|
||||
state: ValidatorStatePtr,
|
||||
ex: Arc<smol::Executor<'_>>,
|
||||
) {
|
||||
// Note: when a node can start produce proposals is only enforced in code,
|
||||
// where we verify if the hardware can keep up with the consensus, by
|
||||
// counting how many consecutive slots node successfully listend and process
|
||||
// everything. Later, this will be enforced via contract, where it will be explicit
|
||||
// when a node can produce proposals, and after which slot they can be considered as valid.
|
||||
let mut listened_slots = 0;
|
||||
let mut changed_status = false;
|
||||
loop {
|
||||
// TODO: Change order, since node extis consensus sync right before next slot
|
||||
// Node sleeps until finalization sync period starts
|
||||
let next_slot_start = state.read().await.consensus.next_n_slot_start(1);
|
||||
let seconds_sync_period = if next_slot_start.as_secs() > constants::FINAL_SYNC_DUR {
|
||||
(next_slot_start - Duration::new(constants::FINAL_SYNC_DUR, 0)).as_secs()
|
||||
// Check if node can start proposing.
|
||||
// This code ensures that we only change the status once
|
||||
// and listened_slots doesn't increment further.
|
||||
if listened_slots > constants::EPOCH_LENGTH {
|
||||
if !changed_status {
|
||||
info!("consensus: Node can start proposing!");
|
||||
state.write().await.consensus.proposing = true;
|
||||
changed_status = true;
|
||||
}
|
||||
} else {
|
||||
next_slot_start.as_secs()
|
||||
};
|
||||
info!("consensus: Waiting for finalization sync period ({} sec)", seconds_sync_period);
|
||||
sleep(seconds_sync_period).await;
|
||||
|
||||
// Keep a record of slot to verify if next slot got skipped during processing
|
||||
let completed_slot = state.read().await.consensus.current_slot();
|
||||
|
||||
// Check if any forks can be finalized
|
||||
match state.write().await.chain_finalization().await {
|
||||
Ok((to_broadcast_block, to_broadcast_slot_checkpoints)) => {
|
||||
// Broadcasting in background
|
||||
if !to_broadcast_block.is_empty() || !to_broadcast_slot_checkpoints.is_empty() {
|
||||
let _sync_p2p = sync_p2p.clone();
|
||||
ex.spawn(async move {
|
||||
// Broadcast finalized blocks info, if any:
|
||||
info!("consensus: Broadcasting finalized blocks");
|
||||
for info in to_broadcast_block {
|
||||
match _sync_p2p.broadcast(info).await {
|
||||
Ok(()) => info!("consensus: Broadcasted block"),
|
||||
Err(e) => error!("consensus: Failed broadcasting block: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast finalized slot checkpoints, if any:
|
||||
info!("consensus: Broadcasting finalized slot checkpoints");
|
||||
for slot_checkpoint in to_broadcast_slot_checkpoints {
|
||||
match _sync_p2p.broadcast(slot_checkpoint).await {
|
||||
Ok(()) => info!("consensus: Broadcasted slot_checkpoint"),
|
||||
Err(e) => {
|
||||
error!("consensus: Failed broadcasting slot_checkpoint: {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
} else {
|
||||
info!("consensus: No finalized blocks or slot checkpoints to broadcast");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Finalization check failed: {}", e);
|
||||
}
|
||||
listened_slots += 1;
|
||||
}
|
||||
|
||||
// Verify node didn't skip next slot
|
||||
let current_slot = state.read().await.consensus.current_slot();
|
||||
if completed_slot != current_slot {
|
||||
// Node waits and execute consensus protocol propose period.
|
||||
if propose_period(consensus_p2p.clone(), state.clone()).await {
|
||||
// Node needs to resync
|
||||
warn!(
|
||||
"consensus: Node missed slot {} due to finalizated blocks processing, resyncing...",
|
||||
current_slot
|
||||
"consensus: Node missed slot {} due to proposal processing, resyncing...",
|
||||
state.read().await.consensus.current_slot()
|
||||
);
|
||||
break
|
||||
}
|
||||
|
||||
// Node sleeps until next slot
|
||||
let seconds_next_slot = state.read().await.consensus.next_n_slot_start(1).as_secs();
|
||||
info!("consensus: Waiting for next slot ({} sec)", seconds_next_slot);
|
||||
sleep(seconds_next_slot).await;
|
||||
|
||||
// Keep a record of slot to verify if next slot got skipped during processing
|
||||
let processing_slot = state.read().await.consensus.current_slot();
|
||||
|
||||
// Retrieve slot sigmas
|
||||
let (sigma1, sigma2) = state.write().await.consensus.sigmas();
|
||||
// Node checks if epoch has changed and generate slot checkpoint
|
||||
let epoch_changed = state.write().await.consensus.epoch_changed(sigma1, sigma2).await;
|
||||
match epoch_changed {
|
||||
Ok(changed) => {
|
||||
if changed {
|
||||
info!("consensus: New epoch started: {}", state.read().await.consensus.epoch);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Epoch check failed: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
// Node checks if it's the slot leader to generate a new proposal
|
||||
// for that slot.
|
||||
let (won, idx) = state.write().await.consensus.is_slot_leader(sigma1, sigma2);
|
||||
let result = if won { state.write().await.propose(idx, sigma1, sigma2) } else { Ok(None) };
|
||||
let (proposal, coin) = match result {
|
||||
Ok(pair) => {
|
||||
if pair.is_none() {
|
||||
info!("consensus: Node is not the slot lead");
|
||||
continue
|
||||
}
|
||||
pair.unwrap()
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Block proposal failed: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
// Node stores the proposal and broadcast to rest nodes
|
||||
info!("consensus: Node is the slot leader: Proposed block: {}", proposal);
|
||||
debug!("consensus: Full proposal: {:?}", proposal);
|
||||
match state.write().await.receive_proposal(&proposal, Some((idx, coin))).await {
|
||||
Ok(()) => {
|
||||
info!("consensus: Block proposal saved successfully");
|
||||
// Broadcast proposal to other consensus nodes
|
||||
match consensus_p2p.broadcast(proposal).await {
|
||||
Ok(()) => info!("consensus: Proposal broadcasted successfully"),
|
||||
Err(e) => error!("consensus: Failed broadcasting proposal: {}", e),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Block proposal save failed: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Verify node didn't skip next slot
|
||||
let current_slot = state.read().await.consensus.current_slot();
|
||||
if processing_slot != current_slot {
|
||||
// Node waits and execute consensus protocol finalization period.
|
||||
if finalization_period(sync_p2p.clone(), state.clone(), ex.clone()).await {
|
||||
// Node needs to resync
|
||||
warn!(
|
||||
"consensus: Node missed slot {} due to proposal processing, resyncing...",
|
||||
current_slot
|
||||
"consensus: Node missed slot {} due to finalizated blocks processing, resyncing...",
|
||||
state.read().await.consensus.current_slot()
|
||||
);
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// async function to wait and execute consensus protocol propose period.
|
||||
/// Propose period consists of 2 parts:
|
||||
/// - Generate slot sigmas and checkpoint
|
||||
/// - Check if slot leader to generate and broadcast proposal
|
||||
/// Returns flag in case node needs to resync.
|
||||
async fn propose_period(consensus_p2p: P2pPtr, state: ValidatorStatePtr) -> bool {
|
||||
// Node sleeps until next slot
|
||||
let seconds_next_slot = state.read().await.consensus.next_n_slot_start(1).as_secs();
|
||||
info!("consensus: Waiting for next slot ({} sec)", seconds_next_slot);
|
||||
sleep(seconds_next_slot).await;
|
||||
|
||||
// Keep a record of slot to verify if next slot got skipped during processing
|
||||
let processing_slot = state.read().await.consensus.current_slot();
|
||||
|
||||
// Retrieve slot sigmas
|
||||
let (sigma1, sigma2) = state.write().await.consensus.sigmas();
|
||||
// Node checks if epoch has changed and generate slot checkpoint
|
||||
let epoch_changed = state.write().await.consensus.epoch_changed(sigma1, sigma2).await;
|
||||
match epoch_changed {
|
||||
Ok(changed) => {
|
||||
if changed {
|
||||
info!("consensus: New epoch started: {}", state.read().await.consensus.epoch);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Epoch check failed: {}", e);
|
||||
return false
|
||||
}
|
||||
};
|
||||
|
||||
// Node checks if it's the slot leader to generate a new proposal
|
||||
// for that slot.
|
||||
let (won, idx) = state.write().await.consensus.is_slot_leader(sigma1, sigma2);
|
||||
let result = if won { state.write().await.propose(idx, sigma1, sigma2) } else { Ok(None) };
|
||||
let (proposal, coin) = match result {
|
||||
Ok(pair) => {
|
||||
if pair.is_none() {
|
||||
info!("consensus: Node is not the slot lead");
|
||||
return false
|
||||
}
|
||||
pair.unwrap()
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Block proposal failed: {}", e);
|
||||
return false
|
||||
}
|
||||
};
|
||||
|
||||
// Node stores the proposal and broadcast to rest nodes
|
||||
info!("consensus: Node is the slot leader: Proposed block: {}", proposal);
|
||||
debug!("consensus: Full proposal: {:?}", proposal);
|
||||
match state.write().await.receive_proposal(&proposal, Some((idx, coin))).await {
|
||||
Ok(()) => {
|
||||
info!("consensus: Block proposal saved successfully");
|
||||
// Broadcast proposal to other consensus nodes
|
||||
match consensus_p2p.broadcast(proposal).await {
|
||||
Ok(()) => info!("consensus: Proposal broadcasted successfully"),
|
||||
Err(e) => error!("consensus: Failed broadcasting proposal: {}", e),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Block proposal save failed: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Verify node didn't skip next slot
|
||||
processing_slot != state.read().await.consensus.current_slot()
|
||||
}
|
||||
|
||||
/// async function to wait and execute consensus protocol finalization period.
|
||||
/// Returns flag in case node needs to resync.
|
||||
async fn finalization_period(
|
||||
sync_p2p: P2pPtr,
|
||||
state: ValidatorStatePtr,
|
||||
ex: Arc<smol::Executor<'_>>,
|
||||
) -> bool {
|
||||
// Node sleeps until finalization sync period starts
|
||||
let next_slot_start = state.read().await.consensus.next_n_slot_start(1);
|
||||
let seconds_sync_period = if next_slot_start.as_secs() > constants::FINAL_SYNC_DUR {
|
||||
(next_slot_start - Duration::new(constants::FINAL_SYNC_DUR, 0)).as_secs()
|
||||
} else {
|
||||
next_slot_start.as_secs()
|
||||
};
|
||||
info!("consensus: Waiting for finalization sync period ({} sec)", seconds_sync_period);
|
||||
sleep(seconds_sync_period).await;
|
||||
|
||||
// Keep a record of slot to verify if next slot got skipped during processing
|
||||
let completed_slot = state.read().await.consensus.current_slot();
|
||||
|
||||
// Check if any forks can be finalized
|
||||
match state.write().await.chain_finalization().await {
|
||||
Ok((to_broadcast_block, to_broadcast_slot_checkpoints)) => {
|
||||
// Broadcasting in background
|
||||
if !to_broadcast_block.is_empty() || !to_broadcast_slot_checkpoints.is_empty() {
|
||||
ex.spawn(async move {
|
||||
// Broadcast finalized blocks info, if any:
|
||||
info!("consensus: Broadcasting finalized blocks");
|
||||
for info in to_broadcast_block {
|
||||
match sync_p2p.broadcast(info).await {
|
||||
Ok(()) => info!("consensus: Broadcasted block"),
|
||||
Err(e) => error!("consensus: Failed broadcasting block: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast finalized slot checkpoints, if any:
|
||||
info!("consensus: Broadcasting finalized slot checkpoints");
|
||||
for slot_checkpoint in to_broadcast_slot_checkpoints {
|
||||
match sync_p2p.broadcast(slot_checkpoint).await {
|
||||
Ok(()) => info!("consensus: Broadcasted slot_checkpoint"),
|
||||
Err(e) => {
|
||||
error!("consensus: Failed broadcasting slot_checkpoint: {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
} else {
|
||||
info!("consensus: No finalized blocks or slot checkpoints to broadcast");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("consensus: Finalization check failed: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Verify node didn't skip next slot
|
||||
completed_slot != state.read().await.consensus.current_slot()
|
||||
}
|
||||
|
||||
@@ -259,12 +259,16 @@ impl ValidatorState {
|
||||
sigma1: pallas::Base,
|
||||
sigma2: pallas::Base,
|
||||
) -> Result<Option<(BlockProposal, LeadCoin)>> {
|
||||
// Check if node can produce proposals
|
||||
if !self.consensus.proposing {
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
// Generate proposal
|
||||
let slot = self.consensus.current_slot();
|
||||
let (prev_hash, index) = self.consensus.longest_chain_last_hash().unwrap();
|
||||
let unproposed_txs = self.unproposed_txs(index);
|
||||
|
||||
// TODO: [PLACEHOLDER] Create and add rewards transaction
|
||||
|
||||
let mut tree = BridgeTree::<MerkleNode, MERKLE_DEPTH>::new(100);
|
||||
// The following is pretty weird, so something better should be done.
|
||||
for tx in &unproposed_txs {
|
||||
@@ -581,12 +585,14 @@ impl ValidatorState {
|
||||
let mut fork_index = -1;
|
||||
// Use this index to extract leaders count sequence from longest fork
|
||||
let mut index_for_history = -1;
|
||||
let mut max_length_for_history = 0;
|
||||
let mut max_length = 0;
|
||||
for (index, fork) in self.consensus.forks.iter().enumerate() {
|
||||
let length = fork.sequence.len();
|
||||
// Check if greater than max to retain index for history
|
||||
if length > max_length {
|
||||
if length > max_length_for_history {
|
||||
index_for_history = index as i64;
|
||||
max_length_for_history = length;
|
||||
}
|
||||
// Ignore forks with less that 3 blocks
|
||||
if length < 3 {
|
||||
@@ -612,12 +618,12 @@ impl ValidatorState {
|
||||
match fork_index {
|
||||
-2 => {
|
||||
info!("chain_finalization(): Eligible forks with same height exist, nothing to finalize.");
|
||||
self.consensus.set_leader_history(index_for_history);
|
||||
self.consensus.set_leader_history(index_for_history, slot);
|
||||
return Ok((vec![], vec![]))
|
||||
}
|
||||
-1 => {
|
||||
info!("chain_finalization(): All chains have less than 3 proposals, nothing to finalize.");
|
||||
self.consensus.set_leader_history(index_for_history);
|
||||
self.consensus.set_leader_history(index_for_history, slot);
|
||||
return Ok((vec![], vec![]))
|
||||
}
|
||||
_ => info!("chain_finalization(): Chain {} can be finalized!", fork_index),
|
||||
|
||||
Reference in New Issue
Block a user