diff --git a/bin/darkfid2/src/main.rs b/bin/darkfid2/src/main.rs index fa53caad8..100de3902 100644 --- a/bin/darkfid2/src/main.rs +++ b/bin/darkfid2/src/main.rs @@ -25,7 +25,7 @@ use darkfi::{ async_daemonize, blockchain::BlockInfo, cli_desc, - net::{settings::SettingsOpt, P2p, P2pPtr}, + net::{settings::SettingsOpt, P2pPtr}, rpc::server::listen_and_serve, util::time::TimeKeeper, validator::{Validator, ValidatorConfig, ValidatorPtr}, @@ -50,7 +50,7 @@ use task::sync::sync_task; /// Utility functions mod utils; -use utils::{genesis_txs_total, spawn_sync_p2p}; +use utils::{genesis_txs_total, spawn_consensus_p2p, spawn_sync_p2p}; const CONFIG_FILE: &str = "darkfid_config.toml"; const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml"); @@ -147,12 +147,10 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let sync_p2p = spawn_sync_p2p(&args.sync_net.into(), &validator).await; // Initialize consensus P2P network - let consensus_p2p = { - if !args.consensus { - None - } else { - Some(P2p::new(args.consensus_net.into()).await) - } + let consensus_p2p = if args.consensus { + Some(spawn_consensus_p2p(&args.consensus_net.into(), &validator).await) + } else { + None }; // Initialize node diff --git a/bin/darkfid2/src/tests/harness.rs b/bin/darkfid2/src/tests/harness.rs index 57e1edee9..08def3814 100644 --- a/bin/darkfid2/src/tests/harness.rs +++ b/bin/darkfid2/src/tests/harness.rs @@ -37,7 +37,7 @@ use url::Url; use crate::{ task::sync::sync_task, - utils::{genesis_txs_total, spawn_sync_p2p}, + utils::{genesis_txs_total, spawn_consensus_p2p, spawn_sync_p2p}, Darkfid, }; @@ -85,19 +85,42 @@ impl Harness { // Generate validators using pregenerated vks let (_, vks) = vks::read_or_gen_vks_and_pks()?; - let mut settings = Settings::default(); - settings.localnet = true; + let mut sync_settings = Settings::default(); + sync_settings.localnet = true; + let mut consensus_settings = Settings::default(); + consensus_settings.localnet = true; // Alice let alice_url = Url::parse("tcp+tls://127.0.0.1:18340")?; - settings.inbound_addrs = vec![alice_url.clone()]; - let alice = generate_node(&vks, &validator_config, &settings, ex, true).await?; + sync_settings.inbound_addrs = vec![alice_url.clone()]; + let alice_consensus_url = Url::parse("tcp+tls://127.0.0.1:18350")?; + consensus_settings.inbound_addrs = vec![alice_consensus_url.clone()]; + let alice = generate_node( + &vks, + &validator_config, + &sync_settings, + Some(&consensus_settings), + ex, + true, + ) + .await?; // Bob let bob_url = Url::parse("tcp+tls://127.0.0.1:18341")?; - settings.inbound_addrs = vec![bob_url]; - settings.peers = vec![alice_url]; - let bob = generate_node(&vks, &validator_config, &settings, ex, false).await?; + sync_settings.inbound_addrs = vec![bob_url]; + sync_settings.peers = vec![alice_url]; + let bob_consensus_url = Url::parse("tcp+tls://127.0.0.1:18351")?; + consensus_settings.inbound_addrs = vec![bob_consensus_url]; + consensus_settings.peers = vec![alice_consensus_url]; + let bob = generate_node( + &vks, + &validator_config, + &sync_settings, + Some(&consensus_settings), + ex, + false, + ) + .await?; Ok(Self { config, vks, validator_config, alice, bob }) } @@ -187,15 +210,24 @@ impl Harness { pub async fn generate_node( vks: &Vec<(Vec, String, Vec)>, config: &ValidatorConfig, - settings: &Settings, + sync_settings: &Settings, + consensus_settings: Option<&Settings>, ex: &Arc>, skip_sync: bool, ) -> Result { let sled_db = sled::Config::new().temporary(true).open()?; vks::inject(&sled_db, &vks)?; + let validator = Validator::new(&sled_db, config.clone()).await?; - let sync_p2p = spawn_sync_p2p(&settings, &validator).await; - let node = Darkfid::new(sync_p2p.clone(), None, validator).await; + + let sync_p2p = spawn_sync_p2p(&sync_settings, &validator).await; + let consensus_p2p = if let Some(settings) = consensus_settings { + Some(spawn_consensus_p2p(settings, &validator).await) + } else { + None + }; + let node = Darkfid::new(sync_p2p.clone(), consensus_p2p.clone(), validator).await; + sync_p2p.clone().start(ex.clone()).await?; let _ex = ex.clone(); ex.spawn(async move { @@ -204,6 +236,18 @@ pub async fn generate_node( } }) .detach(); + + if consensus_settings.is_some() { + consensus_p2p.clone().unwrap().start(ex.clone()).await?; + let _ex = ex.clone(); + ex.spawn(async move { + if let Err(e) = consensus_p2p.unwrap().run(_ex).await { + error!("Failed starting consensus P2P network: {}", e); + } + }) + .detach(); + } + if !skip_sync { sync_task(&node).await?; } else { diff --git a/bin/darkfid2/src/tests/mod.rs b/bin/darkfid2/src/tests/mod.rs index 6a6e60da1..fde18e6f0 100644 --- a/bin/darkfid2/src/tests/mod.rs +++ b/bin/darkfid2/src/tests/mod.rs @@ -48,14 +48,15 @@ async fn sync_blocks_real(ex: Arc>) -> Result<()> { th.validate_chains(3, 7).await?; // We are going to create a third node and try to sync from the previous two - let mut settings = Settings::default(); - settings.localnet = true; + let mut sync_settings = Settings::default(); + sync_settings.localnet = true; let charlie_url = Url::parse("tcp+tls://127.0.0.1:18342")?; - settings.inbound_addrs = vec![charlie_url]; + sync_settings.inbound_addrs = vec![charlie_url]; let alice_url = th.alice.sync_p2p.settings().inbound_addrs[0].clone(); let bob_url = th.bob.sync_p2p.settings().inbound_addrs[0].clone(); - settings.peers = vec![alice_url, bob_url]; - let charlie = generate_node(&th.vks, &th.validator_config, &settings, &ex, false).await?; + sync_settings.peers = vec![alice_url, bob_url]; + let charlie = + generate_node(&th.vks, &th.validator_config, &sync_settings, None, &ex, false).await?; // Verify node synced let genesis_txs_total = th.config.alice_initial + th.config.bob_initial; let alice = &th.alice.validator.read().await; diff --git a/bin/darkfid2/src/utils.rs b/bin/darkfid2/src/utils.rs index 891908087..11cf0a036 100644 --- a/bin/darkfid2/src/utils.rs +++ b/bin/darkfid2/src/utils.rs @@ -23,7 +23,7 @@ use darkfi::{ net::{P2p, P2pPtr, Settings, SESSION_ALL}, tx::Transaction, validator::{ - proto::{ProtocolBlock, ProtocolSync, ProtocolTx}, + proto::{ProtocolBlock, ProtocolProposal, ProtocolSync, ProtocolTx}, ValidatorPtr, }, Result, @@ -101,3 +101,20 @@ pub async fn spawn_sync_p2p(settings: &Settings, validator: &ValidatorPtr) -> P2 p2p } + +/// Auxiliary function to generate the consensus P2P network and register all its protocols. +pub async fn spawn_consensus_p2p(settings: &Settings, validator: &ValidatorPtr) -> P2pPtr { + info!(target: "darkfid", "Registering consensus network P2P protocols..."); + let p2p = P2p::new(settings.clone()).await; + let registry = p2p.protocol_registry(); + + let _validator = validator.clone(); + registry + .register(SESSION_ALL, move |channel, p2p| { + let validator = _validator.clone(); + async move { ProtocolProposal::init(channel, validator, p2p).await.unwrap() } + }) + .await; + + p2p +} diff --git a/src/validator/consensus/mod.rs b/src/validator/consensus/mod.rs index 7f4f05bce..3e469ab89 100644 --- a/src/validator/consensus/mod.rs +++ b/src/validator/consensus/mod.rs @@ -16,7 +16,13 @@ * along with this program. If not, see . */ -use crate::{blockchain::Blockchain, util::time::TimeKeeper}; +use darkfi_serial::{SerialDecodable, SerialEncodable}; + +use crate::{ + blockchain::{BlockInfo, Blockchain, BlockchainOverlay, BlockchainOverlayPtr}, + util::time::TimeKeeper, + Result, +}; /// DarkFi consensus PID controller pub mod pid; @@ -30,12 +36,64 @@ pub struct Consensus { pub blockchain: Blockchain, /// Helper structure to calculate time related operations pub time_keeper: TimeKeeper, + /// Node is participating to consensus + pub participating: bool, + /// Fork chains containing block proposals + pub forks: Vec, } impl Consensus { /// Generate a new Consensus state. pub fn new(blockchain: Blockchain, time_keeper: TimeKeeper) -> Self { - Self { blockchain, time_keeper } + Self { blockchain, time_keeper, participating: false, forks: vec![] } + } + + /// Given a proposal, the node verifys it and finds which fork it extends. + /// If the proposal extends the canonical blockchain, a new fork chain is created. + pub async fn append_proposal(&mut self, _proposal: &Proposal) -> Result<()> { + // TODO + + Ok(()) + } +} + +/// This struct represents a block proposal, used for consensus. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct Proposal { + /// Block hash + pub hash: blake3::Hash, + /// Block header hash + pub header: blake3::Hash, + /// Block data + pub block: BlockInfo, +} + +impl Proposal { + pub fn new(block: BlockInfo) -> Self { + let hash = block.blockhash(); + let header = block.header.headerhash(); + Self { hash, header, block } + } +} + +impl From for BlockInfo { + fn from(proposal: Proposal) -> BlockInfo { + proposal.block + } +} + +/// This struct represents a sequence of block proposals, along with a blockchain +/// overlay, containing all pending to-write records. +#[derive(Clone)] +pub struct Fork { + pub overlay: BlockchainOverlayPtr, + pub proposals: Vec, +} + +impl Fork { + pub fn new(blockchain: &Blockchain) -> Result { + let overlay = BlockchainOverlay::new(blockchain)?; + Ok(Self { overlay, proposals: vec![] }) } } diff --git a/src/validator/proto/mod.rs b/src/validator/proto/mod.rs index 8afb48bc8..2d89cdbdd 100644 --- a/src/validator/proto/mod.rs +++ b/src/validator/proto/mod.rs @@ -20,6 +20,10 @@ mod protocol_block; pub use protocol_block::ProtocolBlock; +/// Block proposal broadcast protocol +mod protocol_proposal; +pub use protocol_proposal::ProtocolProposal; + /// Validator blockchain sync protocol mod protocol_sync; pub use protocol_sync::{ProtocolSync, SyncRequest, SyncResponse}; diff --git a/src/validator/proto/protocol_block.rs b/src/validator/proto/protocol_block.rs index ad9eda485..ae5b6f7cc 100644 --- a/src/validator/proto/protocol_block.rs +++ b/src/validator/proto/protocol_block.rs @@ -92,8 +92,17 @@ impl ProtocolBlock { continue } - // TODO: consensus participants should skip this - // as they already have the blocks added + // Check if node started participating in consensus. + // Consensus-mode enabled nodes have already performed these steps, + // during proposal finalization. They still listen to this sub, + // in case they go out of sync and become a none-consensus node. + if self.validator.read().await.consensus.participating { + debug!( + target: "validator::protocol_block::handle_receive_block", + "Node is participating in consensus, skipping..." + ); + continue + } let block_copy = (*block).clone(); diff --git a/src/validator/proto/protocol_proposal.rs b/src/validator/proto/protocol_proposal.rs new file mode 100644 index 000000000..1d9096720 --- /dev/null +++ b/src/validator/proto/protocol_proposal.rs @@ -0,0 +1,123 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2023 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use async_std::sync::Arc; +use async_trait::async_trait; +use log::debug; +use smol::Executor; +use url::Url; + +use crate::{ + impl_p2p_message, + net::{ + ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + validator::{consensus::Proposal, ValidatorPtr}, + Result, +}; + +impl_p2p_message!(Proposal, "proposal"); + +pub struct ProtocolProposal { + proposal_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + validator: ValidatorPtr, + p2p: P2pPtr, + channel_address: Url, +} + +impl ProtocolProposal { + pub async fn init( + channel: ChannelPtr, + validator: ValidatorPtr, + p2p: P2pPtr, + ) -> Result { + debug!( + target: "validator::protocol_proposal::init", + "Adding ProtocolProposal to the protocol registry" + ); + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + + let proposal_sub = channel.subscribe_msg::().await?; + + Ok(Arc::new(Self { + proposal_sub, + jobsman: ProtocolJobsManager::new("ProposalProtocol", channel.clone()), + validator, + p2p, + channel_address: channel.address().clone(), + })) + } + + async fn handle_receive_proposal(self: Arc) -> Result<()> { + debug!(target: "consensus::protocol_proposal::handle_receive_proposal", "START"); + let exclude_list = vec![self.channel_address.clone()]; + loop { + let proposal = match self.proposal_sub.receive().await { + Ok(v) => v, + Err(e) => { + debug!( + target: "validator::protocol_proposal::handle_receive_proposal", + "recv fail: {}", + e + ); + continue + } + }; + + // Check if node has finished syncing its blockchain + if !self.validator.read().await.synced { + debug!( + target: "validator::protocol_proposal::handle_receive_proposal", + "Node still syncing blockchain, skipping..." + ); + continue + } + + let proposal_copy = (*proposal).clone(); + + match self.validator.write().await.consensus.append_proposal(&proposal_copy).await { + Ok(()) => self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await, + Err(e) => { + debug!( + target: "validator::protocol_proposal::handle_receive_proposal", + "append_proposal fail: {}", + e + ); + } + }; + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolProposal { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!(target: "validator::protocol_proposal::start", "START"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_proposal(), executor.clone()).await; + debug!(target: "validator::protocol_proposal::start", "END"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolProposal" + } +}