WIP: consensus/consensus_sync(): redesign

This commit is contained in:
aggstam
2022-12-16 18:40:30 +02:00
parent 3cedf88cdc
commit abfc86c0fb
4 changed files with 170 additions and 26 deletions

View File

@@ -23,7 +23,10 @@ use smol::Executor;
use crate::{
consensus::{
state::{ConsensusRequest, ConsensusResponse},
state::{
ConsensusRequest, ConsensusResponse, ConsensusSlotCheckpointsRequest,
ConsensusSlotCheckpointsResponse,
},
ValidatorStatePtr,
},
net::{
@@ -36,6 +39,7 @@ use crate::{
pub struct ProtocolSyncConsensus {
channel: ChannelPtr,
request_sub: MessageSubscription<ConsensusRequest>,
slot_checkpoints_request_sub: MessageSubscription<ConsensusSlotCheckpointsRequest>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
}
@@ -48,12 +52,16 @@ impl ProtocolSyncConsensus {
) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<ConsensusRequest>().await;
msg_subsystem.add_dispatch::<ConsensusSlotCheckpointsRequest>().await;
let request_sub = channel.subscribe_msg::<ConsensusRequest>().await?;
let slot_checkpoints_request_sub =
channel.subscribe_msg::<ConsensusSlotCheckpointsRequest>().await?;
Ok(Arc::new(Self {
channel: channel.clone(),
request_sub,
slot_checkpoints_request_sub,
jobsman: ProtocolJobsManager::new("SyncConsensusProtocol", channel),
state,
}))
@@ -62,7 +70,7 @@ impl ProtocolSyncConsensus {
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!("ProtocolSyncConsensus::handle_receive_request() [START]");
loop {
let order = match self.request_sub.receive().await {
let req = match self.request_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!("ProtocolSyncConsensus::handle_receive_request() recv fail: {}", e);
@@ -70,7 +78,7 @@ impl ProtocolSyncConsensus {
}
};
debug!("ProtocolSyncConsensuss::handle_receive_request() received {:?}", order);
debug!("ProtocolSyncConsensuss::handle_receive_request() received {:?}", req);
// Extra validations can be added here.
let lock = self.state.read().await;
@@ -96,6 +104,31 @@ impl ProtocolSyncConsensus {
};
}
}
async fn handle_receive_slot_checkpoints_request(self: Arc<Self>) -> Result<()> {
debug!("ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() [START]");
loop {
let req = match self.slot_checkpoints_request_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!("ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() recv fail: {}", e);
continue
}
};
debug!(
"ProtocolSyncConsensuss::handle_receive_slot_checkpoints_request() received {:?}",
req
);
// Extra validations can be added here.
let slot_checkpoints = !self.state.read().await.consensus.slot_checkpoints.is_empty();
let response = ConsensusSlotCheckpointsResponse { slot_checkpoints };
if let Err(e) = self.channel.send(response).await {
error!("ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() channel send fail: {}", e);
};
}
}
}
#[async_trait]
@@ -104,6 +137,10 @@ impl ProtocolBase for ProtocolSyncConsensus {
debug!("ProtocolSyncConsensus::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_slot_checkpoints_request(), executor.clone())
.await;
debug!("ProtocolSyncConsensus::start() [END]");
Ok(())
}

View File

@@ -650,6 +650,15 @@ impl ConsensusState {
}
}
}
/// Auxiliary structure to reset consensus state for a resync
pub fn reset(&mut self) {
self.participating = None;
self.forks = vec![];
self.slot_checkpoints = vec![];
self.leaders_history = vec![0];
self.nullifiers = vec![];
}
}
/// Auxiliary structure used for consensus syncing.
@@ -685,6 +694,29 @@ impl net::Message for ConsensusResponse {
}
}
/// Auxiliary structure used for consensus syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct ConsensusSlotCheckpointsRequest {}
impl net::Message for ConsensusSlotCheckpointsRequest {
fn name() -> &'static str {
"consensusslotcheckpointsrequest"
}
}
/// Auxiliary structure used for consensus syncing.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ConsensusSlotCheckpointsResponse {
/// Node has hot/live slot checkpoints
pub slot_checkpoints: bool,
}
impl net::Message for ConsensusSlotCheckpointsResponse {
fn name() -> &'static str {
"consensusslotcheckpointsresponse"
}
}
/// Auxiliary structure used to keep track of slot validation parameters.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct SlotCheckpoint {

View File

@@ -20,7 +20,10 @@ use log::{info, warn};
use crate::{
consensus::{
state::{ConsensusRequest, ConsensusResponse},
state::{
ConsensusRequest, ConsensusResponse, ConsensusSlotCheckpointsRequest,
ConsensusSlotCheckpointsResponse,
},
ValidatorStatePtr,
},
net::P2pPtr,
@@ -76,3 +79,92 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul
info!("Consensus state synced!");
Ok(())
}
/// async task used for consensus state syncing.
pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Result<()> {
info!("Starting consensus state sync...");
// Loop through connected channels
let channels_map = p2p.channels().lock().await;
let values = channels_map.values();
// Using len here because is_empty() uses unstable library feature
// called 'exact_size_is_empty'.
if values.len() == 0 {
warn!("Node is not connected to other nodes");
info!("Consensus state synced!");
return Ok(())
}
// Node iterates the channel peers to check if at least on peer has seen slot checkpoints
let mut peer = None;
for channel in values {
// Communication setup
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<ConsensusSlotCheckpointsResponse>().await;
let response_sub = channel.subscribe_msg::<ConsensusSlotCheckpointsResponse>().await?;
// Node creates a `ConsensusSlotCheckpointsRequest` and sends it
let request = ConsensusSlotCheckpointsRequest {};
channel.send(request).await?;
// Node checks response
let response = response_sub.receive().await?;
if !response.slot_checkpoints {
warn!("Node has not seen any slot checkpoints, retrying...");
continue
}
// Keep peer to ask for consensus state
peer = Some(channel.clone());
break
}
// Release channels lock
drop(channels_map);
// If no peer knows about any slot checkpoints, that means that the network was bootstrapped or restarted
// and no node has started consensus.
if peer.is_none() {
warn!("No node that has seen any slot checkpoints was found.");
info!("Consensus state synced!");
return Ok(())
}
let peer = peer.unwrap();
// Listen for next finalization
info!("Waiting for next finalization...");
let subscriber = state.read().await.subscribers.get("blocks").unwrap().clone();
let subscription = subscriber.subscribe().await;
subscription.receive().await;
// After finalization occurs, sync our consensus state.
// This ensures that the received state always consists of 1 fork with one proposal.
info!("Finalization signal received, requesting consensus state...");
// Communication setup
let msg_subsystem = peer.get_message_subsystem();
msg_subsystem.add_dispatch::<ConsensusResponse>().await;
let response_sub = peer.subscribe_msg::<ConsensusResponse>().await?;
// Node creates a `ConsensusRequest` and sends it
let request = ConsensusRequest {};
peer.send(request).await?;
// Node verifies response came from a participating node.
// Extra validations can be added here.
let response = response_sub.receive().await?;
// Node stores response data.
let mut lock = state.write().await;
lock.consensus.offset = response.offset;
let mut forks = vec![];
for fork in &response.forks {
forks.push(fork.clone().into());
}
lock.consensus.forks = forks;
lock.unconfirmed_txs = response.unconfirmed_txs.clone();
lock.consensus.slot_checkpoints = response.slot_checkpoints.clone();
lock.consensus.leaders_history = response.leaders_history.clone();
lock.consensus.nullifiers = response.nullifiers.clone();
info!("Consensus state synced!");
Ok(())
}

View File

@@ -180,9 +180,10 @@ pub async fn proposal_task2(
let mut retries = 0;
// Sync loop
loop {
// Setting up participating to None, so node can still follow the finalized blocks by
// Resetting consensus state, so node can still follow the finalized blocks by
// the sync p2p network/protocols
state.write().await.consensus.participating = None;
// TODO: verify that if a consensus node stops participating will receive finalized blocks
state.write().await.consensus.reset();
// Checking sync retries
if retries > constants::SYNC_MAX_RETRIES {
@@ -191,25 +192,6 @@ pub async fn proposal_task2(
break
}
// Node waits just before the current or next epoch last finalization syncing period, so it can
// start syncing latest state.
let mut seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1);
let sync_offset = Duration::new(constants::FINAL_SYNC_DUR + 1, 0);
loop {
if seconds_until_next_epoch > sync_offset {
seconds_until_next_epoch -= sync_offset;
break
}
info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch);
sleep(seconds_until_next_epoch.as_secs()).await;
seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1);
}
info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch);
sleep(seconds_until_next_epoch.as_secs()).await;
// Node syncs its consensus state
if let Err(e) = consensus_sync_task(consensus_p2p.clone(), state.clone()).await {
error!("consensus: Failed syncing consensus state: {}. Quitting consensus.", e);
@@ -249,6 +231,7 @@ async fn consensus_loop(
ex: Arc<smol::Executor<'_>>,
) {
loop {
// TODO: Change order, since node extis consensus sync right before next slot
// Node sleeps until finalization sync period starts
let next_slot_start = state.read().await.consensus.next_n_slot_start(1);
let seconds_sync_period = if next_slot_start.as_secs() > constants::FINAL_SYNC_DUR {
@@ -301,7 +284,7 @@ async fn consensus_loop(
// Verify node didn't skip next slot
let current_slot = state.read().await.consensus.current_slot();
if completed_slot == current_slot {
if completed_slot != current_slot {
warn!(
"consensus: Node missed slot {} due to finalizated blocks processing, resyncing...",
current_slot