From c55c08b6623db077c3fa9600f6eb5d65621853ae Mon Sep 17 00:00:00 2001 From: parazyd Date: Fri, 15 Apr 2022 23:54:41 +0200 Subject: [PATCH] darkfid: Implement consensus P2P protocols. --- Cargo.lock | 3 + bin/darkfid2/Cargo.toml | 2 + bin/darkfid2/src/main.rs | 79 +++++++++++++++-- bin/darkfid2/src/protocol/mod.rs | 11 +++ .../src/protocol/protocol_participant.rs | 3 +- .../src/protocol/protocol_proposal.rs | 84 +++++++++++++++++++ bin/darkfid2/src/protocol/protocol_tx.rs | 69 +++++++++++++++ bin/darkfid2/src/protocol/protocol_vote.rs | 69 +++++++++++++++ src/consensus2/mod.rs | 4 +- src/consensus2/state.rs | 2 +- src/error.rs | 3 + 11 files changed, 320 insertions(+), 9 deletions(-) create mode 100644 bin/darkfid2/src/protocol/protocol_proposal.rs create mode 100644 bin/darkfid2/src/protocol/protocol_tx.rs create mode 100644 bin/darkfid2/src/protocol/protocol_vote.rs diff --git a/Cargo.lock b/Cargo.lock index 1f1045945..f742f8b3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1725,16 +1725,19 @@ dependencies = [ "async-executor", "async-std", "async-trait", + "chrono", "ctrlc-async", "darkfi", "easy-parallel", "futures-lite", "lazy-init", "log", + "rand 0.8.5", "serde", "serde_derive", "serde_json", "simplelog", + "sled", "structopt", "structopt-toml", "url", diff --git a/bin/darkfid2/Cargo.toml b/bin/darkfid2/Cargo.toml index ac4f8defe..71aa19459 100644 --- a/bin/darkfid2/Cargo.toml +++ b/bin/darkfid2/Cargo.toml @@ -20,8 +20,10 @@ easy-parallel = "3.2.0" futures-lite = "1.12.0" lazy-init = "0.5.0" log = "0.4.16" +rand = "0.8.5" serde_json = "1.0.79" simplelog = "0.12.0-alpha1" +sled = "0.34.7" url = "2.2.2" # Argument parsing diff --git a/bin/darkfid2/src/main.rs b/bin/darkfid2/src/main.rs index 69acdb512..604204e08 100644 --- a/bin/darkfid2/src/main.rs +++ b/bin/darkfid2/src/main.rs @@ -3,9 +3,11 @@ use std::net::SocketAddr; use async_executor::Executor; use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; +use chrono::Utc; use easy_parallel::Parallel; use futures_lite::future; -use log::{error, info}; +use log::{debug, error, info}; +use rand::Rng; use serde_derive::Deserialize; use serde_json::{json, Value}; use simplelog::{ColorChoice, TermLogger, TerminalMode}; @@ -15,6 +17,9 @@ use url::Url; use darkfi::{ cli_desc, + consensus2::{ + util::Timestamp, ValidatorState, MAINNET_GENESIS_HASH_BYTES, TESTNET_GENESIS_HASH_BYTES, + }, crypto::{ address::Address, keypair::{Keypair, PublicKey, SecretKey}, @@ -35,7 +40,7 @@ use darkfi::{ path::get_config_path, }, wallet::walletdb::{WalletDb, WalletPtr}, - Result, + Error, Result, }; mod client; @@ -45,11 +50,12 @@ mod error; use error::{server_error, RpcError}; mod protocol; -use protocol::ProtocolProposal; +use protocol::{ProtocolParticipant, ProtocolProposal, ProtocolTx, ProtocolVote}; const CONFIG_FILE: &str = "darkfid_config.toml"; const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml"); +// TODO: Flag to participate in consensus #[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] #[serde(default)] #[structopt(name = "darkfid", about = cli_desc!())] @@ -58,6 +64,10 @@ struct Args { /// Configuration file to use config: Option, + #[structopt(long, default_value = "testnet")] + /// Chain to use (testnet, mainnet) + chain: String, + #[structopt(long, default_value = "~/.config/darkfi/darkfid_wallet.db")] /// Path to wallet database wallet_path: String, @@ -66,6 +76,10 @@ struct Args { /// Password for the wallet database wallet_pass: String, + #[structopt(long, default_value = "~/.config/darkfi/darkfid_blockchain")] + /// Path to blockchain database + database: String, + #[structopt(long, default_value = "tcp://127.0.0.1:5397")] /// JSON-RPC listen URL rpc_listen: Url, @@ -327,6 +341,26 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { // Initialize or load wallet let wallet = init_wallet(&args.wallet_path, &args.wallet_pass).await?; + // Initialize or open sled database + let db_path = format!("{}/{}", expand_path(&args.database)?.to_str().unwrap(), args.chain); + let sled_db = sled::open(&db_path)?; + + // Initialize validator state + // TODO: genesis_ts should be some hardcoded constant + let genesis_ts = Timestamp(Utc::now().timestamp()); + let genesis_data = match args.chain.as_str() { + "mainnet" => *MAINNET_GENESIS_HASH_BYTES, + "testnet" => *TESTNET_GENESIS_HASH_BYTES, + x => { + error!("Unsupported chain `{}`", x); + return Err(Error::UnsupportedChain) + } + }; + // TODO: Is this ok? + let mut rng = rand::thread_rng(); + let id: u64 = rng.gen(); + let state = ValidatorState::new(&sled_db, id, genesis_ts, genesis_data)?; + // P2P network let network_settings = net::Settings { inbound: Some(args.p2p_accept), @@ -338,14 +372,49 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { }; let p2p = net::P2p::new(network_settings).await; + let registry = p2p.protocol_registry(); - // TODO: Register protocols + + debug!("Adding ProtocolTx to the protocol registry"); + let _state = state.clone(); + registry + .register(net::SESSION_ALL, move |channel, p2p| { + let state = _state.clone(); + async move { ProtocolTx::init(channel, state, p2p).await.unwrap() } + }) + .await; + + debug!("Adding ProtocolVote to the protocol registry"); + let _state = state.clone(); + registry + .register(net::SESSION_ALL, move |channel, p2p| { + let state = _state.clone(); + async move { ProtocolVote::init(channel, state, p2p).await.unwrap() } + }) + .await; + debug!("Adding ProtocolProposal to the protocol registry"); + let _state = state.clone(); + registry + .register(net::SESSION_ALL, move |channel, p2p| { + let state = _state.clone(); + async move { ProtocolProposal::init(channel, state, p2p).await.unwrap() } + }) + .await; + + debug!("Adding ProtocolParticipant to the protocol registry"); + let _state = state.clone(); + registry + .register(net::SESSION_ALL, move |channel, p2p| { + let state = _state.clone(); + async move { ProtocolParticipant::init(channel, state, p2p).await.unwrap() } + }) + .await; info!("Starting P2P networking"); p2p.clone().start(ex.clone()).await?; - // Initialize state + // Initialize program state let darkfid = Darkfid::new(wallet, p2p).await?; let darkfid = Arc::new(darkfid); diff --git a/bin/darkfid2/src/protocol/mod.rs b/bin/darkfid2/src/protocol/mod.rs index 1a27b7453..e4cb9a51b 100644 --- a/bin/darkfid2/src/protocol/mod.rs +++ b/bin/darkfid2/src/protocol/mod.rs @@ -1,2 +1,13 @@ +// TODO: Handle ? in these modules' loops + mod protocol_participant; pub use protocol_participant::ProtocolParticipant; + +mod protocol_proposal; +pub use protocol_proposal::ProtocolProposal; + +mod protocol_tx; +pub use protocol_tx::ProtocolTx; + +mod protocol_vote; +pub use protocol_vote::ProtocolVote; diff --git a/bin/darkfid2/src/protocol/protocol_participant.rs b/bin/darkfid2/src/protocol/protocol_participant.rs index d7d9e7642..c8a6eef3e 100644 --- a/bin/darkfid2/src/protocol/protocol_participant.rs +++ b/bin/darkfid2/src/protocol/protocol_participant.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use log::debug; use darkfi::{ - consensus::{participant::Participant, state::ValidatorStatePtr}, + consensus2::{state::ValidatorStatePtr, Participant}, net::{ ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr, @@ -44,6 +44,7 @@ impl ProtocolParticipant { let participant = self.participant_sub.receive().await?; debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant); + if self.state.write().unwrap().append_participant((*participant).clone()) { let pending_participants = self.state.read().unwrap().consensus.pending_participants.clone(); diff --git a/bin/darkfid2/src/protocol/protocol_proposal.rs b/bin/darkfid2/src/protocol/protocol_proposal.rs new file mode 100644 index 000000000..a8e47978c --- /dev/null +++ b/bin/darkfid2/src/protocol/protocol_proposal.rs @@ -0,0 +1,84 @@ +use async_executor::Executor; +use async_std::sync::Arc; +use async_trait::async_trait; +use log::debug; + +use darkfi::{ + consensus2::{block::BlockProposal, state::ValidatorStatePtr}, + net::{ + ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + Result, +}; + +pub struct ProtocolProposal { + proposal_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, +} + +impl ProtocolProposal { + pub async fn init( + channel: ChannelPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, + ) -> Result { + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + + let proposal_sub = channel.subscribe_msg::().await?; + + Ok(Arc::new(Self { + proposal_sub, + jobsman: ProtocolJobsManager::new("ProposalProtocol", channel), + state, + p2p, + })) + } + + async fn handle_receive_proposal(self: Arc) -> Result<()> { + debug!("ProtocolProposal::handle_receive_proposal() [START]"); + loop { + let proposal = self.proposal_sub.receive().await?; + + debug!("ProtocolProposal::handle_receive_proposal() recv: {:?}", proposal); + + let proposal_copy = (*proposal).clone(); + let vote = self.state.write().unwrap().receive_proposal(&proposal_copy); + match vote { + Ok(v) => { + if v.is_none() { + debug!("Node did not vote for the proposed block."); + } else { + let vote = v.unwrap(); + self.state.write().unwrap().receive_vote(&vote)?; + // Broadcast block to rest of nodes + self.p2p.broadcast(proposal_copy).await?; + // Broadcast vote + self.p2p.broadcast(vote).await?; + } + } + Err(e) => { + debug!("ProtocolProposal::handle_receive_proposal() error processing proposal: {:?}", e); + } + } + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolProposal { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!("ProtocolProposal::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_proposal(), executor.clone()).await; + debug!("ProtocolProposal::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolProposal" + } +} diff --git a/bin/darkfid2/src/protocol/protocol_tx.rs b/bin/darkfid2/src/protocol/protocol_tx.rs new file mode 100644 index 000000000..a463b5742 --- /dev/null +++ b/bin/darkfid2/src/protocol/protocol_tx.rs @@ -0,0 +1,69 @@ +use async_executor::Executor; +use async_std::sync::Arc; +use async_trait::async_trait; +use log::debug; + +use darkfi::{ + consensus2::{state::ValidatorStatePtr, Tx}, + net::{ + ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + Result, +}; + +pub struct ProtocolTx { + tx_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, +} + +impl ProtocolTx { + pub async fn init( + channel: ChannelPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, + ) -> Result { + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + + let tx_sub = channel.subscribe_msg::().await?; + + Ok(Arc::new(Self { + tx_sub, + jobsman: ProtocolJobsManager::new("TxProtocol", channel), + state, + p2p, + })) + } + + async fn handle_receive_tx(self: Arc) -> Result<()> { + debug!("ProtocolTx::handle_receive_tx() [START]"); + loop { + let tx = self.tx_sub.receive().await?; + + debug!("ProtocolTx::handle_receive_tx() recv: {:?}", tx); + + let tx_copy = (*tx).clone(); + if self.state.write().unwrap().append_tx(tx_copy.clone()) { + self.p2p.broadcast(tx_copy).await?; + } + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolTx { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!("ProtocolTx::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_tx(), executor.clone()).await; + debug!("ProtocolTx::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolTx" + } +} diff --git a/bin/darkfid2/src/protocol/protocol_vote.rs b/bin/darkfid2/src/protocol/protocol_vote.rs new file mode 100644 index 000000000..5ccedc2f9 --- /dev/null +++ b/bin/darkfid2/src/protocol/protocol_vote.rs @@ -0,0 +1,69 @@ +use async_executor::Executor; +use async_std::sync::Arc; +use async_trait::async_trait; +use log::debug; + +use darkfi::{ + consensus2::{state::ValidatorStatePtr, Vote}, + net::{ + ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + Result, +}; + +pub struct ProtocolVote { + vote_sub: MessageSubscription, + jobsman: ProtocolJobsManagerPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, +} + +impl ProtocolVote { + pub async fn init( + channel: ChannelPtr, + state: ValidatorStatePtr, + p2p: P2pPtr, + ) -> Result { + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + + let vote_sub = channel.subscribe_msg::().await?; + + Ok(Arc::new(Self { + vote_sub, + jobsman: ProtocolJobsManager::new("VoteProtocol", channel), + state, + p2p, + })) + } + + async fn handle_receive_vote(self: Arc) -> Result<()> { + debug!("ProtocolVote::handle_receive_vote() [START]"); + loop { + let vote = self.vote_sub.receive().await?; + + debug!("ProtocolVote::handle_receive_vote() recv: {:?}", vote); + + let vote_copy = (*vote).clone(); + if self.state.write().unwrap().receive_vote(&vote_copy)? { + self.p2p.broadcast(vote_copy).await?; + }; + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolVote { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!("ProtocolVote::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_vote(), executor.clone()).await; + debug!("ProtocolVote::start() [END]"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolVote" + } +} diff --git a/src/consensus2/mod.rs b/src/consensus2/mod.rs index 84df310d4..43e026f07 100644 --- a/src/consensus2/mod.rs +++ b/src/consensus2/mod.rs @@ -28,8 +28,8 @@ pub mod util; use lazy_static::lazy_static; lazy_static! { /// Genesis hash for the mainnet chain - pub static ref MAINNET_GENESIS_HASH_BYTES: [u8; 32] = *blake3::hash(b"darkfi_mainnet").as_bytes(); + pub static ref MAINNET_GENESIS_HASH_BYTES: blake3::Hash = blake3::hash(b"darkfi_mainnet"); /// Genesis hash for the testnet chain - pub static ref TESTNET_GENESIS_HASH_BYTES: [u8; 32] = *blake3::hash(b"darkfi_testnet").as_bytes(); + pub static ref TESTNET_GENESIS_HASH_BYTES: blake3::Hash = blake3::hash(b"darkfi_testnet"); } diff --git a/src/consensus2/state.rs b/src/consensus2/state.rs index ce66cfbc9..bebacd925 100644 --- a/src/consensus2/state.rs +++ b/src/consensus2/state.rs @@ -1,4 +1,4 @@ -// TODO: Pending participants can also be a map? +// TODO: Use sets instead of vectors where possible. use std::{ collections::BTreeMap, diff --git a/src/error.rs b/src/error.rs index eafddb476..24d46758d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -239,6 +239,9 @@ pub enum Error { #[cfg(feature = "net2")] #[error("TransportError: {0}")] TransportError(String), + + #[error("Unsupported chain")] + UnsupportedChain, } #[cfg(feature = "node")]