consensus: Add sync protocols.

ty skoupidi
This commit is contained in:
parazyd
2022-04-21 14:01:54 +02:00
parent 1ae263b4de
commit 95d74a034a
5 changed files with 317 additions and 41 deletions

View File

@@ -11,6 +11,8 @@ use crate::{
};
/// This struct represents a tuple of the form (`st`, `sl`, txs`, `metadata`).
/// The transactions here are stored as hashes, which serve as pointers to
/// the actual transaction data in the blockchain database.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct Block {
/// Previous block hash
@@ -37,6 +39,69 @@ impl Block {
}
}
/// Auxiliary structure used for blockchain syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct BlockOrder {
/// Slot UID
pub sl: u64,
/// Blockhash of that slot
pub block: blake3::Hash,
}
impl net::Message for BlockOrder {
fn name() -> &'static str {
"blockorder"
}
}
/// Structure representing full block data.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct BlockInfo {
/// Previous block hash
pub st: blake3::Hash,
/// Slot UID, generated by the beacon
pub sl: u64,
/// Transactions payload
pub txs: Vec<Tx>,
/// Additional proposal information
pub metadata: Metadata,
/// Proposal information used by Streamlet consensus
pub sm: StreamletMetadata,
}
impl BlockInfo {
pub fn new(
st: blake3::Hash,
sl: u64,
txs: Vec<Tx>,
metadata: Metadata,
sm: StreamletMetadata,
) -> Self {
Self { st, sl, txs, metadata, sm }
}
}
impl net::Message for BlockInfo {
fn name() -> &'static str {
"blockinfo"
}
}
impl_vec!(BlockInfo);
/// Auxiliary structure used for blockchain syncing
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct BlockResponse {
/// Response blocks.
pub blocks: Vec<BlockInfo>,
}
impl net::Message for BlockResponse {
fn name() -> &'static str {
"blockresponse"
}
}
/// This struct represents a block proposal, used for consensus.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct BlockProposal {
@@ -46,16 +111,8 @@ pub struct BlockProposal {
pub signature: Signature,
/// Leader ID
pub id: u64,
/// Previous block hash
pub st: blake3::Hash,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Transactions payload
pub txs: Vec<Tx>,
/// Additional proposal information
pub metadata: Metadata,
/// Proposal information used by Streamlet consensus
pub sm: StreamletMetadata,
/// Block data
pub block: BlockInfo,
}
impl BlockProposal {
@@ -70,12 +127,13 @@ impl BlockProposal {
metadata: Metadata,
sm: StreamletMetadata,
) -> Self {
Self { public_key, signature, id, st, sl, txs, metadata, sm }
let block = BlockInfo::new(st, sl, txs, metadata, sm);
Self { public_key, signature, id, block }
}
/// Produce proposal hash using `st`, `sl`, `txs`, and `metadata`.
pub fn hash(&self) -> blake3::Hash {
Self::to_proposal_hash(self.st, self.sl, &self.txs, &self.metadata)
Self::to_proposal_hash(self.block.st, self.block.sl, &self.block.txs, &self.block.metadata)
}
/// Generate a proposal hash using provided `st`, `sl`, `txs`, and `metadata`.
@@ -99,10 +157,10 @@ impl PartialEq for BlockProposal {
self.public_key == other.public_key &&
self.signature == other.signature &&
self.id == other.id &&
self.st == other.st &&
self.sl == other.sl &&
self.txs == other.txs &&
self.metadata == other.metadata
self.block.st == other.block.st &&
self.block.sl == other.block.sl &&
self.block.txs == other.block.txs &&
self.block.metadata == other.block.metadata
}
}
@@ -114,6 +172,12 @@ impl net::Message for BlockProposal {
impl_vec!(BlockProposal);
impl From<BlockProposal> for BlockInfo {
fn from(block: BlockProposal) -> BlockInfo {
block.block
}
}
/// This struct represents a sequence of block proposals.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct ProposalChain {
@@ -131,13 +195,13 @@ impl ProposalChain {
/// excluding the genesis block proposal.
/// Additional validity rules can be applied.
pub fn check_proposal(&self, proposal: &BlockProposal, previous: &BlockProposal) -> bool {
if proposal.st == self.genesis_block {
if proposal.block.st == self.genesis_block {
debug!("check_proposal(): Genesis block proposal provided.");
return false
}
let prev_hash = previous.hash();
if proposal.st != prev_hash || proposal.sl <= previous.sl {
if proposal.block.st != prev_hash || proposal.block.sl <= previous.block.sl {
debug!("check_proposal(): Provided proposal is invalid.");
return false
}
@@ -167,7 +231,7 @@ impl ProposalChain {
/// Proposals chain notarization check.
pub fn notarized(&self) -> bool {
for proposal in &self.proposals {
if !proposal.sm.notarized {
if !proposal.block.sm.notarized {
return false
}
}
@@ -177,3 +241,29 @@ impl ProposalChain {
}
impl_vec!(ProposalChain);
/// Auxiliary structure used for forks syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct ForkOrder {
/// Validator ID
pub id: u64,
}
impl net::Message for ForkOrder {
fn name() -> &'static str {
"forkorder"
}
}
/// Auxiliary structure used for forks syncing.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ForkResponse {
/// Fork chains containing block proposals
pub proposals: Vec<ProposalChain>,
}
impl net::Message for ForkResponse {
fn name() -> &'static str {
"forkresponse"
}
}

View File

@@ -1,13 +1,27 @@
// TODO: Handle ? in these modules' loops
// TODO: FIXME: Handle ? in these modules' loops. There should be no
// uncaught and unhandled errors that could potentially break out of
// the loops.
/// Participant announce protocol
mod protocol_participant;
pub use protocol_participant::ProtocolParticipant;
/// Block proposal protocol
mod protocol_proposal;
pub use protocol_proposal::ProtocolProposal;
/// Transaction broadcast protocol
mod protocol_tx;
pub use protocol_tx::ProtocolTx;
/// Consensus vote protocol
mod protocol_vote;
pub use protocol_vote::ProtocolVote;
/// Validator + Replicator blockchain sync protocol
mod protocol_sync;
pub use protocol_sync::ProtocolSync;
/// Validator forks sync protocol
mod protocol_sync_forks;
pub use protocol_sync_forks::ProtocolSyncForks;

View File

@@ -0,0 +1,104 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use crate::{
consensus2::{
block::{BlockInfo, BlockOrder, BlockResponse},
ValidatorStatePtr,
},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
// Constant defining how many blocks we send during syncing.
const BATCH: u64 = 10;
pub struct ProtocolSync {
channel: ChannelPtr,
order_sub: MessageSubscription<BlockOrder>,
block_sub: MessageSubscription<BlockInfo>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl ProtocolSync {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<BlockOrder>().await;
msg_subsystem.add_dispatch::<BlockInfo>().await;
let order_sub = channel.subscribe_msg::<BlockOrder>().await?;
let block_sub = channel.subscribe_msg::<BlockInfo>().await?;
Ok(Arc::new(Self {
channel: channel.clone(),
order_sub,
block_sub,
jobsman: ProtocolJobsManager::new("SyncProtocol", channel),
state,
p2p,
}))
}
async fn handle_receive_order(self: Arc<Self>) -> Result<()> {
debug!("ProtocolSync::handle_receive_order() [START]");
loop {
let order = self.order_sub.receive().await?;
debug!("ProtocolSync::handle_receive_order() received {:?}", order);
// Extra validations can be added here
let key = order.sl;
let slot_range: Vec<u64> = (key..=(key + BATCH)).collect();
let blocks = self.state.read().await.blockchain.get_blocks_by_slot(&slot_range)?;
let response = BlockResponse { blocks };
self.channel.send(response).await?;
}
}
async fn handle_receive_block(self: Arc<Self>) -> Result<()> {
debug!("ProtocolSync::handle_receive_block() [START]");
loop {
let info = self.block_sub.receive().await?;
debug!("ProtocolSync::handle_receive_block() received block");
// TODO: The following code should be executed only by replicators, not
// consensus nodes.
// Node stores finalized flock, if it doesn't exist (checking by slot).
// Extra validations can be added here.
let info_copy = (*info).clone();
if !self.state.read().await.blockchain.has_block(&info_copy)? {
self.state.write().await.blockchain.add(&[info_copy.clone()])?;
self.p2p.broadcast(info_copy).await?;
}
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolSync {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!("ProtocolSync::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await;
debug!("ProtocolSync::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolSync"
}
}

View File

@@ -0,0 +1,68 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use crate::{
consensus2::{
block::{ForkOrder, ForkResponse},
state::ValidatorStatePtr,
},
net::{
ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager,
ProtocolJobsManagerPtr,
},
Result,
};
pub struct ProtocolSyncForks {
channel: ChannelPtr,
order_sub: MessageSubscription<ForkOrder>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
}
impl ProtocolSyncForks {
pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<ForkOrder>().await;
let order_sub = channel.subscribe_msg::<ForkOrder>().await?;
Ok(Arc::new(Self {
channel: channel.clone(),
order_sub,
jobsman: ProtocolJobsManager::new("SyncForkProtocol", channel),
state,
}))
}
async fn handle_receive_order(self: Arc<Self>) -> Result<()> {
debug!("ProtocolSyncForks::handle_receive_order() [START]");
loop {
let order = self.order_sub.receive().await?;
debug!("ProtocolSyncForks::handle_receive_order() received {:?}", order);
// Extra validations can be added here.
let proposals = self.state.read().await.consensus.proposals.clone();
let response = ForkResponse { proposals };
self.channel.send(response).await?;
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolSyncForks {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!("ProtocolSyncForks::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_order(), executor.clone()).await;
debug!("ProtocolSyncForks::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolSyncForks"
}
}

View File

@@ -191,7 +191,7 @@ impl ValidatorState {
let mut unproposed_txs = self.unconfirmed_txs.clone();
for chain in &self.consensus.proposals {
for proposal in &chain.proposals {
for tx in &proposal.txs {
for tx in &proposal.block.txs {
if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) {
unproposed_txs.remove(pos);
}
@@ -239,10 +239,10 @@ impl ValidatorState {
if !proposal.public_key.verify(
BlockProposal::to_proposal_hash(
proposal.st,
proposal.sl,
&proposal.txs,
&proposal.metadata,
proposal.block.st,
proposal.block.sl,
&proposal.block.txs,
&proposal.block.metadata,
)
.as_bytes(),
&proposal.signature,
@@ -256,7 +256,7 @@ impl ValidatorState {
/// Given a proposal, the node finds which blockchain it extends.
/// If the proposal extends the canonical blockchain, a new fork chain
// is created. The node votes on the proposal only if it extends the
/// is created. The node votes on the proposal only if it extends the
/// longest notarized fork chain it has seen.
pub fn vote(&mut self, proposal: &BlockProposal) -> Result<Option<Vote>> {
self.zero_participants_check();
@@ -269,7 +269,7 @@ impl ValidatorState {
let mut orphans = Vec::new();
for vote in self.consensus.orphan_votes.iter() {
if vote.proposal == proposal_hash {
proposal.sm.votes.push(vote.clone());
proposal.block.sm.votes.push(vote.clone());
orphans.push(vote.clone());
}
}
@@ -302,13 +302,13 @@ impl ValidatorState {
}
let signed_hash = self.secret.sign(&serialize(&proposal_hash));
Ok(Some(Vote::new(self.public, signed_hash, proposal_hash, proposal.sl, self.id)))
Ok(Some(Vote::new(self.public, signed_hash, proposal_hash, proposal.block.sl, self.id)))
}
/// Verify if the provided chain is notarized excluding the last block.
pub fn extends_notarized_chain(&self, chain: &ProposalChain) -> bool {
for proposal in &chain.proposals[..(chain.proposals.len() - 1)] {
if !proposal.sm.notarized {
if !proposal.block.sm.notarized {
return false
}
}
@@ -321,18 +321,18 @@ impl ValidatorState {
for (index, chain) in self.consensus.proposals.iter().enumerate() {
let last = chain.proposals.last().unwrap();
let hash = last.hash();
if proposal.st == hash && proposal.sl > last.sl {
if proposal.block.st == hash && proposal.block.sl > last.block.sl {
return Ok(index as i64)
}
if proposal.st == last.st && proposal.sl == last.sl {
if proposal.block.st == last.block.st && proposal.block.sl == last.block.sl {
debug!("find_extended_chain_index(): Proposal already received");
return Ok(-2)
}
}
let (last_sl, last_block) = self.blockchain.last()?.unwrap();
if proposal.st != last_block || proposal.sl <= last_sl {
if proposal.block.st != last_block || proposal.block.sl <= last_sl {
debug!("find_extended_chain_index(): Proposal doesn't extend any known chain");
return Ok(-2)
}
@@ -401,16 +401,16 @@ impl ValidatorState {
}
let (proposal, chain_idx) = proposal.unwrap();
if proposal.sm.votes.contains(vote) {
if proposal.block.sm.votes.contains(vote) {
debug!("receive_vote(): Already seen this proposal");
return Ok(false)
}
proposal.sm.votes.push(vote.clone());
proposal.block.sm.votes.push(vote.clone());
if !proposal.sm.notarized && proposal.sm.votes.len() > (2 * node_count / 3) {
if !proposal.block.sm.notarized && proposal.block.sm.votes.len() > (2 * node_count / 3) {
debug!("receive_vote(): Notarized a block");
proposal.sm.notarized = true;
proposal.block.sm.notarized = true;
match self.chain_finalization(chain_idx) {
Ok(()) => {}
Err(e) => {
@@ -476,7 +476,7 @@ impl ValidatorState {
let mut consecutive = 0;
for proposal in &chain.proposals {
if proposal.sm.notarized {
if proposal.block.sm.notarized {
consecutive += 1;
continue
}
@@ -494,9 +494,9 @@ impl ValidatorState {
let mut finalized = vec![];
for proposal in &mut chain.proposals[..(consecutive - 1)] {
proposal.sm.finalized = true;
finalized.push(proposal.clone());
for tx in proposal.txs.clone() {
proposal.block.sm.finalized = true;
finalized.push(proposal.clone().into());
for tx in proposal.block.txs.clone() {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) {
self.unconfirmed_txs.remove(pos);
}
@@ -520,7 +520,7 @@ impl ValidatorState {
let mut dropped = vec![];
for chain in self.consensus.proposals.iter() {
let first = chain.proposals.first().unwrap();
if first.st != last_block || first.sl <= last_sl {
if first.block.st != last_block || first.block.sl <= last_sl {
dropped.push(chain.clone());
}
}