mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
darkfid2/validator: consensus p2p foundation laid out
This commit is contained in:
@@ -25,7 +25,7 @@ use darkfi::{
|
||||
async_daemonize,
|
||||
blockchain::BlockInfo,
|
||||
cli_desc,
|
||||
net::{settings::SettingsOpt, P2p, P2pPtr},
|
||||
net::{settings::SettingsOpt, P2pPtr},
|
||||
rpc::server::listen_and_serve,
|
||||
util::time::TimeKeeper,
|
||||
validator::{Validator, ValidatorConfig, ValidatorPtr},
|
||||
@@ -50,7 +50,7 @@ use task::sync::sync_task;
|
||||
|
||||
/// Utility functions
|
||||
mod utils;
|
||||
use utils::{genesis_txs_total, spawn_sync_p2p};
|
||||
use utils::{genesis_txs_total, spawn_consensus_p2p, spawn_sync_p2p};
|
||||
|
||||
const CONFIG_FILE: &str = "darkfid_config.toml";
|
||||
const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml");
|
||||
@@ -147,12 +147,10 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'_>>) -> Result<()> {
|
||||
let sync_p2p = spawn_sync_p2p(&args.sync_net.into(), &validator).await;
|
||||
|
||||
// Initialize consensus P2P network
|
||||
let consensus_p2p = {
|
||||
if !args.consensus {
|
||||
None
|
||||
} else {
|
||||
Some(P2p::new(args.consensus_net.into()).await)
|
||||
}
|
||||
let consensus_p2p = if args.consensus {
|
||||
Some(spawn_consensus_p2p(&args.consensus_net.into(), &validator).await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Initialize node
|
||||
|
||||
@@ -37,7 +37,7 @@ use url::Url;
|
||||
|
||||
use crate::{
|
||||
task::sync::sync_task,
|
||||
utils::{genesis_txs_total, spawn_sync_p2p},
|
||||
utils::{genesis_txs_total, spawn_consensus_p2p, spawn_sync_p2p},
|
||||
Darkfid,
|
||||
};
|
||||
|
||||
@@ -85,19 +85,42 @@ impl Harness {
|
||||
|
||||
// Generate validators using pregenerated vks
|
||||
let (_, vks) = vks::read_or_gen_vks_and_pks()?;
|
||||
let mut settings = Settings::default();
|
||||
settings.localnet = true;
|
||||
let mut sync_settings = Settings::default();
|
||||
sync_settings.localnet = true;
|
||||
let mut consensus_settings = Settings::default();
|
||||
consensus_settings.localnet = true;
|
||||
|
||||
// Alice
|
||||
let alice_url = Url::parse("tcp+tls://127.0.0.1:18340")?;
|
||||
settings.inbound_addrs = vec![alice_url.clone()];
|
||||
let alice = generate_node(&vks, &validator_config, &settings, ex, true).await?;
|
||||
sync_settings.inbound_addrs = vec![alice_url.clone()];
|
||||
let alice_consensus_url = Url::parse("tcp+tls://127.0.0.1:18350")?;
|
||||
consensus_settings.inbound_addrs = vec![alice_consensus_url.clone()];
|
||||
let alice = generate_node(
|
||||
&vks,
|
||||
&validator_config,
|
||||
&sync_settings,
|
||||
Some(&consensus_settings),
|
||||
ex,
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Bob
|
||||
let bob_url = Url::parse("tcp+tls://127.0.0.1:18341")?;
|
||||
settings.inbound_addrs = vec![bob_url];
|
||||
settings.peers = vec![alice_url];
|
||||
let bob = generate_node(&vks, &validator_config, &settings, ex, false).await?;
|
||||
sync_settings.inbound_addrs = vec![bob_url];
|
||||
sync_settings.peers = vec![alice_url];
|
||||
let bob_consensus_url = Url::parse("tcp+tls://127.0.0.1:18351")?;
|
||||
consensus_settings.inbound_addrs = vec![bob_consensus_url];
|
||||
consensus_settings.peers = vec![alice_consensus_url];
|
||||
let bob = generate_node(
|
||||
&vks,
|
||||
&validator_config,
|
||||
&sync_settings,
|
||||
Some(&consensus_settings),
|
||||
ex,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Self { config, vks, validator_config, alice, bob })
|
||||
}
|
||||
@@ -187,15 +210,24 @@ impl Harness {
|
||||
pub async fn generate_node(
|
||||
vks: &Vec<(Vec<u8>, String, Vec<u8>)>,
|
||||
config: &ValidatorConfig,
|
||||
settings: &Settings,
|
||||
sync_settings: &Settings,
|
||||
consensus_settings: Option<&Settings>,
|
||||
ex: &Arc<smol::Executor<'_>>,
|
||||
skip_sync: bool,
|
||||
) -> Result<Darkfid> {
|
||||
let sled_db = sled::Config::new().temporary(true).open()?;
|
||||
vks::inject(&sled_db, &vks)?;
|
||||
|
||||
let validator = Validator::new(&sled_db, config.clone()).await?;
|
||||
let sync_p2p = spawn_sync_p2p(&settings, &validator).await;
|
||||
let node = Darkfid::new(sync_p2p.clone(), None, validator).await;
|
||||
|
||||
let sync_p2p = spawn_sync_p2p(&sync_settings, &validator).await;
|
||||
let consensus_p2p = if let Some(settings) = consensus_settings {
|
||||
Some(spawn_consensus_p2p(settings, &validator).await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let node = Darkfid::new(sync_p2p.clone(), consensus_p2p.clone(), validator).await;
|
||||
|
||||
sync_p2p.clone().start(ex.clone()).await?;
|
||||
let _ex = ex.clone();
|
||||
ex.spawn(async move {
|
||||
@@ -204,6 +236,18 @@ pub async fn generate_node(
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
if consensus_settings.is_some() {
|
||||
consensus_p2p.clone().unwrap().start(ex.clone()).await?;
|
||||
let _ex = ex.clone();
|
||||
ex.spawn(async move {
|
||||
if let Err(e) = consensus_p2p.unwrap().run(_ex).await {
|
||||
error!("Failed starting consensus P2P network: {}", e);
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
|
||||
if !skip_sync {
|
||||
sync_task(&node).await?;
|
||||
} else {
|
||||
|
||||
@@ -48,14 +48,15 @@ async fn sync_blocks_real(ex: Arc<Executor<'_>>) -> Result<()> {
|
||||
th.validate_chains(3, 7).await?;
|
||||
|
||||
// We are going to create a third node and try to sync from the previous two
|
||||
let mut settings = Settings::default();
|
||||
settings.localnet = true;
|
||||
let mut sync_settings = Settings::default();
|
||||
sync_settings.localnet = true;
|
||||
let charlie_url = Url::parse("tcp+tls://127.0.0.1:18342")?;
|
||||
settings.inbound_addrs = vec![charlie_url];
|
||||
sync_settings.inbound_addrs = vec![charlie_url];
|
||||
let alice_url = th.alice.sync_p2p.settings().inbound_addrs[0].clone();
|
||||
let bob_url = th.bob.sync_p2p.settings().inbound_addrs[0].clone();
|
||||
settings.peers = vec![alice_url, bob_url];
|
||||
let charlie = generate_node(&th.vks, &th.validator_config, &settings, &ex, false).await?;
|
||||
sync_settings.peers = vec![alice_url, bob_url];
|
||||
let charlie =
|
||||
generate_node(&th.vks, &th.validator_config, &sync_settings, None, &ex, false).await?;
|
||||
// Verify node synced
|
||||
let genesis_txs_total = th.config.alice_initial + th.config.bob_initial;
|
||||
let alice = &th.alice.validator.read().await;
|
||||
|
||||
@@ -23,7 +23,7 @@ use darkfi::{
|
||||
net::{P2p, P2pPtr, Settings, SESSION_ALL},
|
||||
tx::Transaction,
|
||||
validator::{
|
||||
proto::{ProtocolBlock, ProtocolSync, ProtocolTx},
|
||||
proto::{ProtocolBlock, ProtocolProposal, ProtocolSync, ProtocolTx},
|
||||
ValidatorPtr,
|
||||
},
|
||||
Result,
|
||||
@@ -101,3 +101,20 @@ pub async fn spawn_sync_p2p(settings: &Settings, validator: &ValidatorPtr) -> P2
|
||||
|
||||
p2p
|
||||
}
|
||||
|
||||
/// Auxiliary function to generate the consensus P2P network and register all its protocols.
|
||||
pub async fn spawn_consensus_p2p(settings: &Settings, validator: &ValidatorPtr) -> P2pPtr {
|
||||
info!(target: "darkfid", "Registering consensus network P2P protocols...");
|
||||
let p2p = P2p::new(settings.clone()).await;
|
||||
let registry = p2p.protocol_registry();
|
||||
|
||||
let _validator = validator.clone();
|
||||
registry
|
||||
.register(SESSION_ALL, move |channel, p2p| {
|
||||
let validator = _validator.clone();
|
||||
async move { ProtocolProposal::init(channel, validator, p2p).await.unwrap() }
|
||||
})
|
||||
.await;
|
||||
|
||||
p2p
|
||||
}
|
||||
|
||||
@@ -16,7 +16,13 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use crate::{blockchain::Blockchain, util::time::TimeKeeper};
|
||||
use darkfi_serial::{SerialDecodable, SerialEncodable};
|
||||
|
||||
use crate::{
|
||||
blockchain::{BlockInfo, Blockchain, BlockchainOverlay, BlockchainOverlayPtr},
|
||||
util::time::TimeKeeper,
|
||||
Result,
|
||||
};
|
||||
|
||||
/// DarkFi consensus PID controller
|
||||
pub mod pid;
|
||||
@@ -30,12 +36,64 @@ pub struct Consensus {
|
||||
pub blockchain: Blockchain,
|
||||
/// Helper structure to calculate time related operations
|
||||
pub time_keeper: TimeKeeper,
|
||||
/// Node is participating to consensus
|
||||
pub participating: bool,
|
||||
/// Fork chains containing block proposals
|
||||
pub forks: Vec<Fork>,
|
||||
}
|
||||
|
||||
impl Consensus {
|
||||
/// Generate a new Consensus state.
|
||||
pub fn new(blockchain: Blockchain, time_keeper: TimeKeeper) -> Self {
|
||||
Self { blockchain, time_keeper }
|
||||
Self { blockchain, time_keeper, participating: false, forks: vec![] }
|
||||
}
|
||||
|
||||
/// Given a proposal, the node verifys it and finds which fork it extends.
|
||||
/// If the proposal extends the canonical blockchain, a new fork chain is created.
|
||||
pub async fn append_proposal(&mut self, _proposal: &Proposal) -> Result<()> {
|
||||
// TODO
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This struct represents a block proposal, used for consensus.
|
||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
|
||||
pub struct Proposal {
|
||||
/// Block hash
|
||||
pub hash: blake3::Hash,
|
||||
/// Block header hash
|
||||
pub header: blake3::Hash,
|
||||
/// Block data
|
||||
pub block: BlockInfo,
|
||||
}
|
||||
|
||||
impl Proposal {
|
||||
pub fn new(block: BlockInfo) -> Self {
|
||||
let hash = block.blockhash();
|
||||
let header = block.header.headerhash();
|
||||
Self { hash, header, block }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Proposal> for BlockInfo {
|
||||
fn from(proposal: Proposal) -> BlockInfo {
|
||||
proposal.block
|
||||
}
|
||||
}
|
||||
|
||||
/// This struct represents a sequence of block proposals, along with a blockchain
|
||||
/// overlay, containing all pending to-write records.
|
||||
#[derive(Clone)]
|
||||
pub struct Fork {
|
||||
pub overlay: BlockchainOverlayPtr,
|
||||
pub proposals: Vec<Proposal>,
|
||||
}
|
||||
|
||||
impl Fork {
|
||||
pub fn new(blockchain: &Blockchain) -> Result<Self> {
|
||||
let overlay = BlockchainOverlay::new(blockchain)?;
|
||||
Ok(Self { overlay, proposals: vec![] })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,10 @@
|
||||
mod protocol_block;
|
||||
pub use protocol_block::ProtocolBlock;
|
||||
|
||||
/// Block proposal broadcast protocol
|
||||
mod protocol_proposal;
|
||||
pub use protocol_proposal::ProtocolProposal;
|
||||
|
||||
/// Validator blockchain sync protocol
|
||||
mod protocol_sync;
|
||||
pub use protocol_sync::{ProtocolSync, SyncRequest, SyncResponse};
|
||||
|
||||
@@ -92,8 +92,17 @@ impl ProtocolBlock {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: consensus participants should skip this
|
||||
// as they already have the blocks added
|
||||
// Check if node started participating in consensus.
|
||||
// Consensus-mode enabled nodes have already performed these steps,
|
||||
// during proposal finalization. They still listen to this sub,
|
||||
// in case they go out of sync and become a none-consensus node.
|
||||
if self.validator.read().await.consensus.participating {
|
||||
debug!(
|
||||
target: "validator::protocol_block::handle_receive_block",
|
||||
"Node is participating in consensus, skipping..."
|
||||
);
|
||||
continue
|
||||
}
|
||||
|
||||
let block_copy = (*block).clone();
|
||||
|
||||
|
||||
123
src/validator/proto/protocol_proposal.rs
Normal file
123
src/validator/proto/protocol_proposal.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
/* This file is part of DarkFi (https://dark.fi)
|
||||
*
|
||||
* Copyright (C) 2020-2023 Dyne.org foundation
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use async_std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use log::debug;
|
||||
use smol::Executor;
|
||||
use url::Url;
|
||||
|
||||
use crate::{
|
||||
impl_p2p_message,
|
||||
net::{
|
||||
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
|
||||
ProtocolJobsManager, ProtocolJobsManagerPtr,
|
||||
},
|
||||
validator::{consensus::Proposal, ValidatorPtr},
|
||||
Result,
|
||||
};
|
||||
|
||||
impl_p2p_message!(Proposal, "proposal");
|
||||
|
||||
pub struct ProtocolProposal {
|
||||
proposal_sub: MessageSubscription<Proposal>,
|
||||
jobsman: ProtocolJobsManagerPtr,
|
||||
validator: ValidatorPtr,
|
||||
p2p: P2pPtr,
|
||||
channel_address: Url,
|
||||
}
|
||||
|
||||
impl ProtocolProposal {
|
||||
pub async fn init(
|
||||
channel: ChannelPtr,
|
||||
validator: ValidatorPtr,
|
||||
p2p: P2pPtr,
|
||||
) -> Result<ProtocolBasePtr> {
|
||||
debug!(
|
||||
target: "validator::protocol_proposal::init",
|
||||
"Adding ProtocolProposal to the protocol registry"
|
||||
);
|
||||
let msg_subsystem = channel.message_subsystem();
|
||||
msg_subsystem.add_dispatch::<Proposal>().await;
|
||||
|
||||
let proposal_sub = channel.subscribe_msg::<Proposal>().await?;
|
||||
|
||||
Ok(Arc::new(Self {
|
||||
proposal_sub,
|
||||
jobsman: ProtocolJobsManager::new("ProposalProtocol", channel.clone()),
|
||||
validator,
|
||||
p2p,
|
||||
channel_address: channel.address().clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn handle_receive_proposal(self: Arc<Self>) -> Result<()> {
|
||||
debug!(target: "consensus::protocol_proposal::handle_receive_proposal", "START");
|
||||
let exclude_list = vec![self.channel_address.clone()];
|
||||
loop {
|
||||
let proposal = match self.proposal_sub.receive().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
debug!(
|
||||
target: "validator::protocol_proposal::handle_receive_proposal",
|
||||
"recv fail: {}",
|
||||
e
|
||||
);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
// Check if node has finished syncing its blockchain
|
||||
if !self.validator.read().await.synced {
|
||||
debug!(
|
||||
target: "validator::protocol_proposal::handle_receive_proposal",
|
||||
"Node still syncing blockchain, skipping..."
|
||||
);
|
||||
continue
|
||||
}
|
||||
|
||||
let proposal_copy = (*proposal).clone();
|
||||
|
||||
match self.validator.write().await.consensus.append_proposal(&proposal_copy).await {
|
||||
Ok(()) => self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await,
|
||||
Err(e) => {
|
||||
debug!(
|
||||
target: "validator::protocol_proposal::handle_receive_proposal",
|
||||
"append_proposal fail: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProtocolBase for ProtocolProposal {
|
||||
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
debug!(target: "validator::protocol_proposal::start", "START");
|
||||
self.jobsman.clone().start(executor.clone());
|
||||
self.jobsman.clone().spawn(self.clone().handle_receive_proposal(), executor.clone()).await;
|
||||
debug!(target: "validator::protocol_proposal::start", "END");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
"ProtocolProposal"
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user