diff --git a/src/consensus2/block.rs b/src/consensus2/block.rs index 3fc0f9674..c8e3d348c 100644 --- a/src/consensus2/block.rs +++ b/src/consensus2/block.rs @@ -11,6 +11,8 @@ use crate::{ }; /// This struct represents a tuple of the form (`st`, `sl`, txs`, `metadata`). +/// The transactions here are stored as hashes, which serve as pointers to +/// the actual transaction data in the blockchain database. #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct Block { /// Previous block hash @@ -37,6 +39,69 @@ impl Block { } } +/// Auxiliary structure used for blockchain syncing. +#[derive(Debug, SerialEncodable, SerialDecodable)] +pub struct BlockOrder { + /// Slot UID + pub sl: u64, + /// Blockhash of that slot + pub block: blake3::Hash, +} + +impl net::Message for BlockOrder { + fn name() -> &'static str { + "blockorder" + } +} + +/// Structure representing full block data. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct BlockInfo { + /// Previous block hash + pub st: blake3::Hash, + /// Slot UID, generated by the beacon + pub sl: u64, + /// Transactions payload + pub txs: Vec, + /// Additional proposal information + pub metadata: Metadata, + /// Proposal information used by Streamlet consensus + pub sm: StreamletMetadata, +} + +impl BlockInfo { + pub fn new( + st: blake3::Hash, + sl: u64, + txs: Vec, + metadata: Metadata, + sm: StreamletMetadata, + ) -> Self { + Self { st, sl, txs, metadata, sm } + } +} + +impl net::Message for BlockInfo { + fn name() -> &'static str { + "blockinfo" + } +} + +impl_vec!(BlockInfo); + +/// Auxiliary structure used for blockchain syncing +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct BlockResponse { + /// Response blocks. + pub blocks: Vec, +} + +impl net::Message for BlockResponse { + fn name() -> &'static str { + "blockresponse" + } +} + /// This struct represents a block proposal, used for consensus. #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct BlockProposal { @@ -46,16 +111,8 @@ pub struct BlockProposal { pub signature: Signature, /// Leader ID pub id: u64, - /// Previous block hash - pub st: blake3::Hash, - /// Slot uid, generated by the beacon - pub sl: u64, - /// Transactions payload - pub txs: Vec, - /// Additional proposal information - pub metadata: Metadata, - /// Proposal information used by Streamlet consensus - pub sm: StreamletMetadata, + /// Block data + pub block: BlockInfo, } impl BlockProposal { @@ -70,12 +127,13 @@ impl BlockProposal { metadata: Metadata, sm: StreamletMetadata, ) -> Self { - Self { public_key, signature, id, st, sl, txs, metadata, sm } + let block = BlockInfo::new(st, sl, txs, metadata, sm); + Self { public_key, signature, id, block } } /// Produce proposal hash using `st`, `sl`, `txs`, and `metadata`. pub fn hash(&self) -> blake3::Hash { - Self::to_proposal_hash(self.st, self.sl, &self.txs, &self.metadata) + Self::to_proposal_hash(self.block.st, self.block.sl, &self.block.txs, &self.block.metadata) } /// Generate a proposal hash using provided `st`, `sl`, `txs`, and `metadata`. @@ -99,10 +157,10 @@ impl PartialEq for BlockProposal { self.public_key == other.public_key && self.signature == other.signature && self.id == other.id && - self.st == other.st && - self.sl == other.sl && - self.txs == other.txs && - self.metadata == other.metadata + self.block.st == other.block.st && + self.block.sl == other.block.sl && + self.block.txs == other.block.txs && + self.block.metadata == other.block.metadata } } @@ -114,6 +172,12 @@ impl net::Message for BlockProposal { impl_vec!(BlockProposal); +impl From for BlockInfo { + fn from(block: BlockProposal) -> BlockInfo { + block.block + } +} + /// This struct represents a sequence of block proposals. #[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)] pub struct ProposalChain { @@ -131,13 +195,13 @@ impl ProposalChain { /// excluding the genesis block proposal. /// Additional validity rules can be applied. pub fn check_proposal(&self, proposal: &BlockProposal, previous: &BlockProposal) -> bool { - if proposal.st == self.genesis_block { + if proposal.block.st == self.genesis_block { debug!("check_proposal(): Genesis block proposal provided."); return false } let prev_hash = previous.hash(); - if proposal.st != prev_hash || proposal.sl <= previous.sl { + if proposal.block.st != prev_hash || proposal.block.sl <= previous.block.sl { debug!("check_proposal(): Provided proposal is invalid."); return false } @@ -167,7 +231,7 @@ impl ProposalChain { /// Proposals chain notarization check. pub fn notarized(&self) -> bool { for proposal in &self.proposals { - if !proposal.sm.notarized { + if !proposal.block.sm.notarized { return false } } @@ -177,3 +241,29 @@ impl ProposalChain { } impl_vec!(ProposalChain); + +/// Auxiliary structure used for forks syncing. +#[derive(Debug, SerialEncodable, SerialDecodable)] +pub struct ForkOrder { + /// Validator ID + pub id: u64, +} + +impl net::Message for ForkOrder { + fn name() -> &'static str { + "forkorder" + } +} + +/// Auxiliary structure used for forks syncing. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct ForkResponse { + /// Fork chains containing block proposals + pub proposals: Vec, +} + +impl net::Message for ForkResponse { + fn name() -> &'static str { + "forkresponse" + } +} diff --git a/src/consensus2/proto/mod.rs b/src/consensus2/proto/mod.rs index e4cb9a51b..bec39bdb0 100644 --- a/src/consensus2/proto/mod.rs +++ b/src/consensus2/proto/mod.rs @@ -1,13 +1,27 @@ -// TODO: Handle ? in these modules' loops +// TODO: FIXME: Handle ? in these modules' loops. There should be no +// uncaught and unhandled errors that could potentially break out of +// the loops. +/// Participant announce protocol mod protocol_participant; pub use protocol_participant::ProtocolParticipant; +/// Block proposal protocol mod protocol_proposal; pub use protocol_proposal::ProtocolProposal; +/// Transaction broadcast protocol mod protocol_tx; pub use protocol_tx::ProtocolTx; +/// Consensus vote protocol mod protocol_vote; pub use protocol_vote::ProtocolVote; + +/// Validator + Replicator blockchain sync protocol +mod protocol_sync; +pub use protocol_sync::ProtocolSync; + +/// Validator forks sync protocol +mod protocol_sync_forks; +pub use protocol_sync_forks::ProtocolSyncForks; diff --git a/src/consensus2/proto/protocol_sync.rs b/src/consensus2/proto/protocol_sync.rs new file mode 100644 index 000000000..3f1731841 --- /dev/null +++ b/src/consensus2/proto/protocol_sync.rs @@ -0,0 +1,104 @@ +use async_executor::Executor; +use async_std::sync::Arc; +use async_trait::async_trait; +use log::debug; + +use crate::{ + consensus2::{ + block::{BlockInfo, BlockOrder, BlockResponse}, + ValidatorStatePtr, + }, + net::{ + ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + Result, +}; + +// Constant defining how many blocks we send during syncing. +const BATCH: u64 = 10; + +pub struct ProtocolSync { + channel: ChannelPtr, + order_sub: MessageSubscription, + block_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, +} + +impl ProtocolSync { + pub async fn init( + channel: ChannelPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, + ) -> Result { + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + + let order_sub = channel.subscribe_msg::().await?; + let block_sub = channel.subscribe_msg::().await?; + + Ok(Arc::new(Self { + channel: channel.clone(), + order_sub, + block_sub, + jobsman: ProtocolJobsManager::new("SyncProtocol", channel), + state, + p2p, + })) + } + + async fn handle_receive_order(self: Arc) -> Result<()> { + debug!("ProtocolSync::handle_receive_order() [START]"); + loop { + let order = self.order_sub.receive().await?; + + debug!("ProtocolSync::handle_receive_order() received {:?}", order); + + // Extra validations can be added here + let key = order.sl; + let slot_range: Vec = (key..=(key + BATCH)).collect(); + let blocks = self.state.read().await.blockchain.get_blocks_by_slot(&slot_range)?; + let response = BlockResponse { blocks }; + self.channel.send(response).await?; + } + } + + async fn handle_receive_block(self: Arc) -> Result<()> { + debug!("ProtocolSync::handle_receive_block() [START]"); + loop { + let info = self.block_sub.receive().await?; + + debug!("ProtocolSync::handle_receive_block() received block"); + + // TODO: The following code should be executed only by replicators, not + // consensus nodes. + + // Node stores finalized flock, if it doesn't exist (checking by slot). + // Extra validations can be added here. + let info_copy = (*info).clone(); + if !self.state.read().await.blockchain.has_block(&info_copy)? { + self.state.write().await.blockchain.add(&[info_copy.clone()])?; + self.p2p.broadcast(info_copy).await?; + } + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolSync { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!("ProtocolSync::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await; + debug!("ProtocolSync::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolSync" + } +} diff --git a/src/consensus2/proto/protocol_sync_forks.rs b/src/consensus2/proto/protocol_sync_forks.rs new file mode 100644 index 000000000..b8cb8d94c --- /dev/null +++ b/src/consensus2/proto/protocol_sync_forks.rs @@ -0,0 +1,68 @@ +use async_executor::Executor; +use async_std::sync::Arc; +use async_trait::async_trait; +use log::debug; + +use crate::{ + consensus2::{ + block::{ForkOrder, ForkResponse}, + state::ValidatorStatePtr, + }, + net::{ + ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, + ProtocolJobsManagerPtr, + }, + Result, +}; + +pub struct ProtocolSyncForks { + channel: ChannelPtr, + order_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + state: ValidatorStatePtr, +} + +impl ProtocolSyncForks { + pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> Result { + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + + let order_sub = channel.subscribe_msg::().await?; + + Ok(Arc::new(Self { + channel: channel.clone(), + order_sub, + jobsman: ProtocolJobsManager::new("SyncForkProtocol", channel), + state, + })) + } + + async fn handle_receive_order(self: Arc) -> Result<()> { + debug!("ProtocolSyncForks::handle_receive_order() [START]"); + loop { + let order = self.order_sub.receive().await?; + + debug!("ProtocolSyncForks::handle_receive_order() received {:?}", order); + + // Extra validations can be added here. + let proposals = self.state.read().await.consensus.proposals.clone(); + let response = ForkResponse { proposals }; + self.channel.send(response).await?; + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolSyncForks { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!("ProtocolSyncForks::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await; + debug!("ProtocolSyncForks::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolSyncForks" + } +} diff --git a/src/consensus2/state.rs b/src/consensus2/state.rs index 935d85249..ff682885f 100644 --- a/src/consensus2/state.rs +++ b/src/consensus2/state.rs @@ -191,7 +191,7 @@ impl ValidatorState { let mut unproposed_txs = self.unconfirmed_txs.clone(); for chain in &self.consensus.proposals { for proposal in &chain.proposals { - for tx in &proposal.txs { + for tx in &proposal.block.txs { if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) { unproposed_txs.remove(pos); } @@ -239,10 +239,10 @@ impl ValidatorState { if !proposal.public_key.verify( BlockProposal::to_proposal_hash( - proposal.st, - proposal.sl, - &proposal.txs, - &proposal.metadata, + proposal.block.st, + proposal.block.sl, + &proposal.block.txs, + &proposal.block.metadata, ) .as_bytes(), &proposal.signature, @@ -256,7 +256,7 @@ impl ValidatorState { /// Given a proposal, the node finds which blockchain it extends. /// If the proposal extends the canonical blockchain, a new fork chain - // is created. The node votes on the proposal only if it extends the + /// is created. The node votes on the proposal only if it extends the /// longest notarized fork chain it has seen. pub fn vote(&mut self, proposal: &BlockProposal) -> Result> { self.zero_participants_check(); @@ -269,7 +269,7 @@ impl ValidatorState { let mut orphans = Vec::new(); for vote in self.consensus.orphan_votes.iter() { if vote.proposal == proposal_hash { - proposal.sm.votes.push(vote.clone()); + proposal.block.sm.votes.push(vote.clone()); orphans.push(vote.clone()); } } @@ -302,13 +302,13 @@ impl ValidatorState { } let signed_hash = self.secret.sign(&serialize(&proposal_hash)); - Ok(Some(Vote::new(self.public, signed_hash, proposal_hash, proposal.sl, self.id))) + Ok(Some(Vote::new(self.public, signed_hash, proposal_hash, proposal.block.sl, self.id))) } /// Verify if the provided chain is notarized excluding the last block. pub fn extends_notarized_chain(&self, chain: &ProposalChain) -> bool { for proposal in &chain.proposals[..(chain.proposals.len() - 1)] { - if !proposal.sm.notarized { + if !proposal.block.sm.notarized { return false } } @@ -321,18 +321,18 @@ impl ValidatorState { for (index, chain) in self.consensus.proposals.iter().enumerate() { let last = chain.proposals.last().unwrap(); let hash = last.hash(); - if proposal.st == hash && proposal.sl > last.sl { + if proposal.block.st == hash && proposal.block.sl > last.block.sl { return Ok(index as i64) } - if proposal.st == last.st && proposal.sl == last.sl { + if proposal.block.st == last.block.st && proposal.block.sl == last.block.sl { debug!("find_extended_chain_index(): Proposal already received"); return Ok(-2) } } let (last_sl, last_block) = self.blockchain.last()?.unwrap(); - if proposal.st != last_block || proposal.sl <= last_sl { + if proposal.block.st != last_block || proposal.block.sl <= last_sl { debug!("find_extended_chain_index(): Proposal doesn't extend any known chain"); return Ok(-2) } @@ -401,16 +401,16 @@ impl ValidatorState { } let (proposal, chain_idx) = proposal.unwrap(); - if proposal.sm.votes.contains(vote) { + if proposal.block.sm.votes.contains(vote) { debug!("receive_vote(): Already seen this proposal"); return Ok(false) } - proposal.sm.votes.push(vote.clone()); + proposal.block.sm.votes.push(vote.clone()); - if !proposal.sm.notarized && proposal.sm.votes.len() > (2 * node_count / 3) { + if !proposal.block.sm.notarized && proposal.block.sm.votes.len() > (2 * node_count / 3) { debug!("receive_vote(): Notarized a block"); - proposal.sm.notarized = true; + proposal.block.sm.notarized = true; match self.chain_finalization(chain_idx) { Ok(()) => {} Err(e) => { @@ -476,7 +476,7 @@ impl ValidatorState { let mut consecutive = 0; for proposal in &chain.proposals { - if proposal.sm.notarized { + if proposal.block.sm.notarized { consecutive += 1; continue } @@ -494,9 +494,9 @@ impl ValidatorState { let mut finalized = vec![]; for proposal in &mut chain.proposals[..(consecutive - 1)] { - proposal.sm.finalized = true; - finalized.push(proposal.clone()); - for tx in proposal.txs.clone() { + proposal.block.sm.finalized = true; + finalized.push(proposal.clone().into()); + for tx in proposal.block.txs.clone() { if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) { self.unconfirmed_txs.remove(pos); } @@ -520,7 +520,7 @@ impl ValidatorState { let mut dropped = vec![]; for chain in self.consensus.proposals.iter() { let first = chain.proposals.first().unwrap(); - if first.st != last_block || first.sl <= last_sl { + if first.block.st != last_block || first.block.sl <= last_sl { dropped.push(chain.clone()); } }