From 6de4869bec0407b9735f9658a0b52aebd7568377 Mon Sep 17 00:00:00 2001 From: skoupidi Date: Tue, 13 Feb 2024 12:46:06 +0200 Subject: [PATCH] darkfid: removed obselete protocol_block --- bin/darkfid/src/main.rs | 4 +- bin/darkfid/src/proto/mod.rs | 6 +- bin/darkfid/src/proto/protocol_block.rs | 159 --------------------- bin/darkfid/src/proto/protocol_proposal.rs | 11 +- bin/darkfid/src/task/miner.rs | 9 +- bin/darkfid/src/tests/harness.rs | 27 ++-- bin/darkfid/src/utils.rs | 6 +- src/validator/consensus.rs | 10 +- src/validator/mod.rs | 16 --- 9 files changed, 30 insertions(+), 218 deletions(-) delete mode 100644 bin/darkfid/src/proto/protocol_block.rs diff --git a/bin/darkfid/src/main.rs b/bin/darkfid/src/main.rs index 322875e2a..f54e06289 100644 --- a/bin/darkfid/src/main.rs +++ b/bin/darkfid/src/main.rs @@ -234,9 +234,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let mut subscribers = HashMap::new(); subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks")); subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs")); - if blockchain_config.consensus { - subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals")); - } + subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals")); // Initialize syncing P2P network let sync_p2p = diff --git a/bin/darkfid/src/proto/mod.rs b/bin/darkfid/src/proto/mod.rs index 4362628fb..2a9e43cab 100644 --- a/bin/darkfid/src/proto/mod.rs +++ b/bin/darkfid/src/proto/mod.rs @@ -16,13 +16,9 @@ * along with this program. If not, see . */ -/// Block broadcast protocol -mod protocol_block; -pub use protocol_block::{BlockInfoMessage, ProtocolBlock}; - /// Block proposal broadcast protocol mod protocol_proposal; -pub use protocol_proposal::ProtocolProposal; +pub use protocol_proposal::{ProposalMessage, ProtocolProposal}; /// Validator blockchain sync protocol mod protocol_sync; diff --git a/bin/darkfid/src/proto/protocol_block.rs b/bin/darkfid/src/proto/protocol_block.rs deleted file mode 100644 index 5dac38e09..000000000 --- a/bin/darkfid/src/proto/protocol_block.rs +++ /dev/null @@ -1,159 +0,0 @@ -/* This file is part of DarkFi (https://dark.fi) - * - * Copyright (C) 2020-2024 Dyne.org foundation - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -use std::sync::Arc; - -use async_trait::async_trait; -use log::debug; -use smol::Executor; -use tinyjson::JsonValue; -use url::Url; - -use darkfi::{ - blockchain::BlockInfo, - impl_p2p_message, - net::{ - ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, - ProtocolJobsManager, ProtocolJobsManagerPtr, - }, - rpc::jsonrpc::JsonSubscriber, - util::encoding::base64, - validator::ValidatorPtr, - Result, -}; -use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable}; - -/// Auxiliary [`BlockInfo`] wrapper structure used for messaging. -#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] -pub struct BlockInfoMessage(BlockInfo); - -impl From<&BlockInfo> for BlockInfoMessage { - fn from(block: &BlockInfo) -> Self { - BlockInfoMessage(block.clone()) - } -} - -impl_p2p_message!(BlockInfoMessage, "block"); - -pub struct ProtocolBlock { - block_sub: MessageSubscription, - jobsman: ProtocolJobsManagerPtr, - validator: ValidatorPtr, - p2p: P2pPtr, - channel_address: Url, - subscriber: JsonSubscriber, -} - -impl ProtocolBlock { - pub async fn init( - channel: ChannelPtr, - validator: ValidatorPtr, - p2p: P2pPtr, - subscriber: JsonSubscriber, - ) -> Result { - debug!( - target: "validator::protocol_block::init", - "Adding ProtocolBlock to the protocol registry" - ); - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - - let block_sub = channel.subscribe_msg::().await?; - - Ok(Arc::new(Self { - block_sub, - jobsman: ProtocolJobsManager::new("BlockProtocol", channel.clone()), - validator, - p2p, - channel_address: channel.address().clone(), - subscriber, - })) - } - - async fn handle_receive_block(self: Arc) -> Result<()> { - debug!(target: "consensus::protocol_block::handle_receive_block", "START"); - let exclude_list = vec![self.channel_address.clone()]; - loop { - let block = match self.block_sub.receive().await { - Ok(v) => v, - Err(e) => { - debug!( - target: "validator::protocol_block::handle_receive_block", - "recv fail: {}", - e - ); - continue - } - }; - - // Check if node has finished syncing its blockchain - if !*self.validator.synced.read().await { - debug!( - target: "validator::protocol_block::handle_receive_block", - "Node still syncing blockchain, skipping..." - ); - continue - } - - // Check if node started participating in consensus. - // Consensus-mode enabled nodes have already performed these steps, - // during proposal finalization. They still listen to this sub, - // in case they go out of sync and become a none-consensus node. - if self.validator.consensus.participating { - debug!( - target: "validator::protocol_block::handle_receive_block", - "Node is participating in consensus, skipping..." - ); - continue - } - - let block_copy = (*block).clone(); - - match self.validator.append_block(&block_copy.0).await { - Ok(()) => { - self.p2p.broadcast_with_exclude(&block_copy, &exclude_list).await; - let encoded_block = - JsonValue::String(base64::encode(&serialize_async(&block_copy).await)); - self.subscriber.notify(vec![encoded_block].into()).await; - } - Err(e) => { - debug!( - target: "validator::protocol_block::handle_receive_block", - "append_block fail: {}", - e - ); - } - }; - } - } -} - -#[async_trait] -impl ProtocolBase for ProtocolBlock { - async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "validator::protocol_block::start", "START"); - self.jobsman.clone().start(executor.clone()); - self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await; - debug!(target: "validator::protocol_block::start", "END"); - Ok(()) - } - - fn name(&self) -> &'static str { - "ProtocolBlock" - } -} diff --git a/bin/darkfid/src/proto/protocol_proposal.rs b/bin/darkfid/src/proto/protocol_proposal.rs index d18001c8d..49940153d 100644 --- a/bin/darkfid/src/proto/protocol_proposal.rs +++ b/bin/darkfid/src/proto/protocol_proposal.rs @@ -39,7 +39,7 @@ use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable}; /// Auxiliary [`Proposal`] wrapper structure used for messaging. #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] -struct ProposalMessage(Proposal); +pub struct ProposalMessage(pub Proposal); impl_p2p_message!(ProposalMessage, "proposal"); @@ -103,15 +103,6 @@ impl ProtocolProposal { continue } - // Check if node started participating in consensus. - if !self.validator.consensus.participating { - debug!( - target: "validator::protocol_proposal::handle_receive_proposal", - "Node is not participating in consensus, skipping..." - ); - continue - } - let proposal_copy = (*proposal).clone(); match self.validator.consensus.append_proposal(&proposal_copy.0).await { diff --git a/bin/darkfid/src/task/miner.rs b/bin/darkfid/src/task/miner.rs index 8f9fdfdb9..ddce7c204 100644 --- a/bin/darkfid/src/task/miner.rs +++ b/bin/darkfid/src/task/miner.rs @@ -42,7 +42,7 @@ use log::info; use num_bigint::BigUint; use rand::rngs::OsRng; -use crate::{proto::BlockInfoMessage, Darkfid}; +use crate::{proto::ProposalMessage, Darkfid}; // TODO: handle all ? so the task don't stop on errors @@ -123,13 +123,16 @@ async fn miner_loop(node: &Darkfid, recipient: &PublicKey) -> Result<()> { let proposal = Proposal::new(next_block)?; node.validator.consensus.append_proposal(&proposal).await?; + // Broadcast proposal to the network + let message = ProposalMessage(proposal); + node.consensus_p2p.as_ref().unwrap().broadcast(&message).await; + node.sync_p2p.broadcast(&message).await; + // Check if we can finalize anything and broadcast them let finalized = node.validator.finalization().await?; if !finalized.is_empty() { let mut notif_blocks = Vec::with_capacity(finalized.len()); for block in finalized { - let message = BlockInfoMessage::from(&block); - node.sync_p2p.broadcast(&message).await; notif_blocks .push(JsonValue::String(bs58::encode(&serialize(&block)).into_string())); } diff --git a/bin/darkfid/src/tests/harness.rs b/bin/darkfid/src/tests/harness.rs index 169636ed8..8f5e79710 100644 --- a/bin/darkfid/src/tests/harness.rs +++ b/bin/darkfid/src/tests/harness.rs @@ -22,8 +22,9 @@ use darkfi::{ blockchain::{BlockInfo, Header}, net::Settings, rpc::jsonrpc::JsonSubscriber, + system::sleep, tx::{ContractCallLeaf, TransactionBuilder}, - validator::{Validator, ValidatorConfig}, + validator::{consensus::Proposal, Validator, ValidatorConfig}, zk::{empty_witnesses, ProvingKey, ZkCircuit}, Result, }; @@ -42,7 +43,7 @@ use rand::rngs::OsRng; use url::Url; use crate::{ - proto::BlockInfoMessage, + proto::ProposalMessage, task::sync::sync_task, utils::{spawn_consensus_p2p, spawn_sync_p2p}, Darkfid, @@ -149,19 +150,27 @@ impl Harness { let alice_blockchain_len = alice.blockchain.len(); assert_eq!(alice_blockchain_len, bob.blockchain.len()); - assert_eq!(alice_blockchain_len, total_blocks); + // Last block is not finalized yet + assert_eq!(alice_blockchain_len, total_blocks - 1); Ok(()) } pub async fn add_blocks(&self, blocks: &[BlockInfo]) -> Result<()> { - // We simply broadcast the block using Alice's sync P2P + // We append the block as a proposal to Alice, + // and then we broadcast it to rest nodes for block in blocks { - self.alice.sync_p2p.broadcast(&BlockInfoMessage::from(block)).await; + let proposal = Proposal::new(block.clone())?; + self.alice.validator.consensus.append_proposal(&proposal).await?; + let message = ProposalMessage(proposal); + self.alice.consensus_p2p.as_ref().unwrap().broadcast(&message).await; } - // and then add it to her chain - self.alice.validator.add_blocks(blocks).await?; + // Sleep a bit so blocks can be propagated and then + // trigger finalization check to Alice and Bob + sleep(1).await; + self.alice.validator.finalization().await?; + self.bob.validator.finalization().await?; Ok(()) } @@ -247,9 +256,7 @@ pub async fn generate_node( let mut subscribers = HashMap::new(); subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks")); subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs")); - if consensus_settings.is_some() { - subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals")); - } + subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals")); let sync_p2p = spawn_sync_p2p(sync_settings, &validator, &subscribers, ex.clone()).await; let consensus_p2p = if let Some(settings) = consensus_settings { diff --git a/bin/darkfid/src/utils.rs b/bin/darkfid/src/utils.rs index 0e1dd9ea2..0454e51b3 100644 --- a/bin/darkfid/src/utils.rs +++ b/bin/darkfid/src/utils.rs @@ -31,7 +31,7 @@ use darkfi::{ }; use crate::{ - proto::{ProtocolBlock, ProtocolProposal, ProtocolSync, ProtocolTx}, + proto::{ProtocolProposal, ProtocolSync, ProtocolTx}, BlockchainNetwork, CONFIG_FILE, }; @@ -47,12 +47,12 @@ pub async fn spawn_sync_p2p( let registry = p2p.protocol_registry(); let _validator = validator.clone(); - let _subscriber = subscribers.get("blocks").unwrap().clone(); + let _subscriber = subscribers.get("proposals").unwrap().clone(); registry .register(SESSION_ALL, move |channel, p2p| { let validator = _validator.clone(); let subscriber = _subscriber.clone(); - async move { ProtocolBlock::init(channel, validator, p2p, subscriber).await.unwrap() } + async move { ProtocolProposal::init(channel, validator, p2p, subscriber).await.unwrap() } }) .await; diff --git a/src/validator/consensus.rs b/src/validator/consensus.rs index 469084d05..d7c723d4d 100644 --- a/src/validator/consensus.rs +++ b/src/validator/consensus.rs @@ -44,8 +44,6 @@ pub struct Consensus { pub blockchain: Blockchain, /// Fork size(length) after which it can be finalized pub finalization_threshold: usize, - /// Node is participating to consensus - pub participating: bool, /// Fork chains containing block proposals pub forks: RwLock>, /// Canonical blockchain PoW module state @@ -62,13 +60,7 @@ impl Consensus { ) -> Result { let module = RwLock::new(PoWModule::new(blockchain.clone(), pow_target, pow_fixed_difficulty)?); - Ok(Self { - blockchain, - finalization_threshold, - participating: false, - forks: RwLock::new(vec![]), - module, - }) + Ok(Self { blockchain, finalization_threshold, forks: RwLock::new(vec![]), module }) } /// Generate an unsigned block for provided fork, containing all diff --git a/src/validator/mod.rs b/src/validator/mod.rs index d0c3b583f..c1db5cca1 100644 --- a/src/validator/mod.rs +++ b/src/validator/mod.rs @@ -306,22 +306,6 @@ impl Validator { Ok(()) } - /// The node retrieves a block and tries to add it if it doesn't - /// already exists. - pub async fn append_block(&self, block: &BlockInfo) -> Result<()> { - let block_hash = block.hash()?.to_string(); - - // Check if block already exists - if self.blockchain.has_block(block)? { - debug!(target: "validator::append_block", "We have already seen this block"); - return Err(Error::BlockAlreadyExists(block_hash)) - } - - self.add_blocks(&[block.clone()]).await?; - info!(target: "validator::append_block", "Block added: {}", block_hash); - Ok(()) - } - /// The node checks if proposals can be finalized. /// If proposals are found, node appends them to canonical, excluding the /// last one, and rebuild the finalized fork to contain the last one.