darkfid: removed obselete protocol_block

This commit is contained in:
skoupidi
2024-02-13 12:46:06 +02:00
parent ea50f9ac5e
commit 6de4869bec
9 changed files with 30 additions and 218 deletions

View File

@@ -234,9 +234,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
let mut subscribers = HashMap::new();
subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
if blockchain_config.consensus {
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
}
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
// Initialize syncing P2P network
let sync_p2p =

View File

@@ -16,13 +16,9 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
/// Block broadcast protocol
mod protocol_block;
pub use protocol_block::{BlockInfoMessage, ProtocolBlock};
/// Block proposal broadcast protocol
mod protocol_proposal;
pub use protocol_proposal::ProtocolProposal;
pub use protocol_proposal::{ProposalMessage, ProtocolProposal};
/// Validator blockchain sync protocol
mod protocol_sync;

View File

@@ -1,159 +0,0 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2024 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use smol::Executor;
use tinyjson::JsonValue;
use url::Url;
use darkfi::{
blockchain::BlockInfo,
impl_p2p_message,
net::{
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
rpc::jsonrpc::JsonSubscriber,
util::encoding::base64,
validator::ValidatorPtr,
Result,
};
use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
/// Auxiliary [`BlockInfo`] wrapper structure used for messaging.
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct BlockInfoMessage(BlockInfo);
impl From<&BlockInfo> for BlockInfoMessage {
fn from(block: &BlockInfo) -> Self {
BlockInfoMessage(block.clone())
}
}
impl_p2p_message!(BlockInfoMessage, "block");
pub struct ProtocolBlock {
block_sub: MessageSubscription<BlockInfoMessage>,
jobsman: ProtocolJobsManagerPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
channel_address: Url,
subscriber: JsonSubscriber,
}
impl ProtocolBlock {
pub async fn init(
channel: ChannelPtr,
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: JsonSubscriber,
) -> Result<ProtocolBasePtr> {
debug!(
target: "validator::protocol_block::init",
"Adding ProtocolBlock to the protocol registry"
);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<BlockInfoMessage>().await;
let block_sub = channel.subscribe_msg::<BlockInfoMessage>().await?;
Ok(Arc::new(Self {
block_sub,
jobsman: ProtocolJobsManager::new("BlockProtocol", channel.clone()),
validator,
p2p,
channel_address: channel.address().clone(),
subscriber,
}))
}
async fn handle_receive_block(self: Arc<Self>) -> Result<()> {
debug!(target: "consensus::protocol_block::handle_receive_block", "START");
let exclude_list = vec![self.channel_address.clone()];
loop {
let block = match self.block_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "validator::protocol_block::handle_receive_block",
"recv fail: {}",
e
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*self.validator.synced.read().await {
debug!(
target: "validator::protocol_block::handle_receive_block",
"Node still syncing blockchain, skipping..."
);
continue
}
// Check if node started participating in consensus.
// Consensus-mode enabled nodes have already performed these steps,
// during proposal finalization. They still listen to this sub,
// in case they go out of sync and become a none-consensus node.
if self.validator.consensus.participating {
debug!(
target: "validator::protocol_block::handle_receive_block",
"Node is participating in consensus, skipping..."
);
continue
}
let block_copy = (*block).clone();
match self.validator.append_block(&block_copy.0).await {
Ok(()) => {
self.p2p.broadcast_with_exclude(&block_copy, &exclude_list).await;
let encoded_block =
JsonValue::String(base64::encode(&serialize_async(&block_copy).await));
self.subscriber.notify(vec![encoded_block].into()).await;
}
Err(e) => {
debug!(
target: "validator::protocol_block::handle_receive_block",
"append_block fail: {}",
e
);
}
};
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolBlock {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "validator::protocol_block::start", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await;
debug!(target: "validator::protocol_block::start", "END");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolBlock"
}
}

View File

@@ -39,7 +39,7 @@ use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
/// Auxiliary [`Proposal`] wrapper structure used for messaging.
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct ProposalMessage(Proposal);
pub struct ProposalMessage(pub Proposal);
impl_p2p_message!(ProposalMessage, "proposal");
@@ -103,15 +103,6 @@ impl ProtocolProposal {
continue
}
// Check if node started participating in consensus.
if !self.validator.consensus.participating {
debug!(
target: "validator::protocol_proposal::handle_receive_proposal",
"Node is not participating in consensus, skipping..."
);
continue
}
let proposal_copy = (*proposal).clone();
match self.validator.consensus.append_proposal(&proposal_copy.0).await {

View File

@@ -42,7 +42,7 @@ use log::info;
use num_bigint::BigUint;
use rand::rngs::OsRng;
use crate::{proto::BlockInfoMessage, Darkfid};
use crate::{proto::ProposalMessage, Darkfid};
// TODO: handle all ? so the task don't stop on errors
@@ -123,13 +123,16 @@ async fn miner_loop(node: &Darkfid, recipient: &PublicKey) -> Result<()> {
let proposal = Proposal::new(next_block)?;
node.validator.consensus.append_proposal(&proposal).await?;
// Broadcast proposal to the network
let message = ProposalMessage(proposal);
node.consensus_p2p.as_ref().unwrap().broadcast(&message).await;
node.sync_p2p.broadcast(&message).await;
// Check if we can finalize anything and broadcast them
let finalized = node.validator.finalization().await?;
if !finalized.is_empty() {
let mut notif_blocks = Vec::with_capacity(finalized.len());
for block in finalized {
let message = BlockInfoMessage::from(&block);
node.sync_p2p.broadcast(&message).await;
notif_blocks
.push(JsonValue::String(bs58::encode(&serialize(&block)).into_string()));
}

View File

@@ -22,8 +22,9 @@ use darkfi::{
blockchain::{BlockInfo, Header},
net::Settings,
rpc::jsonrpc::JsonSubscriber,
system::sleep,
tx::{ContractCallLeaf, TransactionBuilder},
validator::{Validator, ValidatorConfig},
validator::{consensus::Proposal, Validator, ValidatorConfig},
zk::{empty_witnesses, ProvingKey, ZkCircuit},
Result,
};
@@ -42,7 +43,7 @@ use rand::rngs::OsRng;
use url::Url;
use crate::{
proto::BlockInfoMessage,
proto::ProposalMessage,
task::sync::sync_task,
utils::{spawn_consensus_p2p, spawn_sync_p2p},
Darkfid,
@@ -149,19 +150,27 @@ impl Harness {
let alice_blockchain_len = alice.blockchain.len();
assert_eq!(alice_blockchain_len, bob.blockchain.len());
assert_eq!(alice_blockchain_len, total_blocks);
// Last block is not finalized yet
assert_eq!(alice_blockchain_len, total_blocks - 1);
Ok(())
}
pub async fn add_blocks(&self, blocks: &[BlockInfo]) -> Result<()> {
// We simply broadcast the block using Alice's sync P2P
// We append the block as a proposal to Alice,
// and then we broadcast it to rest nodes
for block in blocks {
self.alice.sync_p2p.broadcast(&BlockInfoMessage::from(block)).await;
let proposal = Proposal::new(block.clone())?;
self.alice.validator.consensus.append_proposal(&proposal).await?;
let message = ProposalMessage(proposal);
self.alice.consensus_p2p.as_ref().unwrap().broadcast(&message).await;
}
// and then add it to her chain
self.alice.validator.add_blocks(blocks).await?;
// Sleep a bit so blocks can be propagated and then
// trigger finalization check to Alice and Bob
sleep(1).await;
self.alice.validator.finalization().await?;
self.bob.validator.finalization().await?;
Ok(())
}
@@ -247,9 +256,7 @@ pub async fn generate_node(
let mut subscribers = HashMap::new();
subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
if consensus_settings.is_some() {
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
}
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
let sync_p2p = spawn_sync_p2p(sync_settings, &validator, &subscribers, ex.clone()).await;
let consensus_p2p = if let Some(settings) = consensus_settings {

View File

@@ -31,7 +31,7 @@ use darkfi::{
};
use crate::{
proto::{ProtocolBlock, ProtocolProposal, ProtocolSync, ProtocolTx},
proto::{ProtocolProposal, ProtocolSync, ProtocolTx},
BlockchainNetwork, CONFIG_FILE,
};
@@ -47,12 +47,12 @@ pub async fn spawn_sync_p2p(
let registry = p2p.protocol_registry();
let _validator = validator.clone();
let _subscriber = subscribers.get("blocks").unwrap().clone();
let _subscriber = subscribers.get("proposals").unwrap().clone();
registry
.register(SESSION_ALL, move |channel, p2p| {
let validator = _validator.clone();
let subscriber = _subscriber.clone();
async move { ProtocolBlock::init(channel, validator, p2p, subscriber).await.unwrap() }
async move { ProtocolProposal::init(channel, validator, p2p, subscriber).await.unwrap() }
})
.await;

View File

@@ -44,8 +44,6 @@ pub struct Consensus {
pub blockchain: Blockchain,
/// Fork size(length) after which it can be finalized
pub finalization_threshold: usize,
/// Node is participating to consensus
pub participating: bool,
/// Fork chains containing block proposals
pub forks: RwLock<Vec<Fork>>,
/// Canonical blockchain PoW module state
@@ -62,13 +60,7 @@ impl Consensus {
) -> Result<Self> {
let module =
RwLock::new(PoWModule::new(blockchain.clone(), pow_target, pow_fixed_difficulty)?);
Ok(Self {
blockchain,
finalization_threshold,
participating: false,
forks: RwLock::new(vec![]),
module,
})
Ok(Self { blockchain, finalization_threshold, forks: RwLock::new(vec![]), module })
}
/// Generate an unsigned block for provided fork, containing all

View File

@@ -306,22 +306,6 @@ impl Validator {
Ok(())
}
/// The node retrieves a block and tries to add it if it doesn't
/// already exists.
pub async fn append_block(&self, block: &BlockInfo) -> Result<()> {
let block_hash = block.hash()?.to_string();
// Check if block already exists
if self.blockchain.has_block(block)? {
debug!(target: "validator::append_block", "We have already seen this block");
return Err(Error::BlockAlreadyExists(block_hash))
}
self.add_blocks(&[block.clone()]).await?;
info!(target: "validator::append_block", "Block added: {}", block_hash);
Ok(())
}
/// The node checks if proposals can be finalized.
/// If proposals are found, node appends them to canonical, excluding the
/// last one, and rebuild the finalized fork to contain the last one.