diff --git a/bin/darkfid2/src/consensus.rs b/bin/darkfid2/src/consensus.rs new file mode 100644 index 000000000..8921a4d50 --- /dev/null +++ b/bin/darkfid2/src/consensus.rs @@ -0,0 +1,84 @@ +use log::{debug, error, info}; + +use darkfi::{ + consensus2::{state::ValidatorStatePtr, Participant}, + net::P2pPtr, + util::async_util::sleep, +}; + +pub async fn proposal_task(p2p: P2pPtr, state: ValidatorStatePtr) { + // Node signals the network that it starts participating + let participant = Participant::new(state.read().await.id, state.read().await.current_epoch()); + state.write().await.append_participant(participant.clone()); + + match p2p.broadcast(participant).await { + Ok(()) => info!("Consensus participation message broadcasted successfully."), + Err(e) => error!("Failed broadcasting consensus participation: {}", e), + } + + // After initialization, node should wait for next epoch + let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs(); + info!("Waiting for next epoch({} sec)...", seconds_until_next_epoch); + sleep(seconds_until_next_epoch).await; + + loop { + // Node refreshes participants records + state.write().await.refresh_participants(); + + // Node checks if it's the epoch leader to generate a new proposal + // for that epoch. + let result = if state.write().await.is_epoch_leader() { + state.read().await.propose() + } else { + Ok(None) + }; + + match result { + Ok(proposal) => { + if proposal.is_none() { + info!("Node is not the epoch leader. Sleeping till next epoch..."); + } else { + // Leader creates a vote for the proposal and broadcasts them both + let proposal = proposal.unwrap(); + info!("Node is the epoch leader: Proposed block: {:?}", proposal); + let vote = state.write().await.receive_proposal(&proposal); + match vote { + Ok(v) => { + if v.is_none() { + debug!("Node did not vote for the proposed block"); + } else { + let vote = v.unwrap(); + let result = state.write().await.receive_vote(&vote); + match result { + Ok(_) => info!("Vote saved successfully."), + Err(e) => error!("Vote save failed: {}", e), + } + + // Broadcast block + let result = p2p.broadcast(proposal).await; + match result { + Ok(()) => info!("Proposal broadcasted successfully."), + Err(e) => error!("Failed broadcasting proposal: {}", e), + } + + // Broadcast leader vote + let result = p2p.broadcast(vote).await; + match result { + Ok(()) => info!("Leader vote broadcasted successfully."), + Err(e) => error!("Failed broadcasting leader vote: {}", e), + } + } + } + Err(e) => error!("Failed processing proposal: {}", e), + } + } + } + Err(e) => error!("Block proposal failed: {}", e), + } + + // Node waits until next epoch + let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs(); + info!("Waiting for next epoch ({} sec)...", seconds_until_next_epoch); + sleep(seconds_until_next_epoch).await; + } +} diff --git a/bin/darkfid2/src/main.rs b/bin/darkfid2/src/main.rs index 604204e08..77e3f98a2 100644 --- a/bin/darkfid2/src/main.rs +++ b/bin/darkfid2/src/main.rs @@ -18,7 +18,8 @@ use url::Url; use darkfi::{ cli_desc, consensus2::{ - util::Timestamp, ValidatorState, MAINNET_GENESIS_HASH_BYTES, TESTNET_GENESIS_HASH_BYTES, + state::ValidatorStatePtr, util::Timestamp, Tx, ValidatorState, MAINNET_GENESIS_HASH_BYTES, + TESTNET_GENESIS_HASH_BYTES, }, crypto::{ address::Address, @@ -52,6 +53,9 @@ use error::{server_error, RpcError}; mod protocol; use protocol::{ProtocolParticipant, ProtocolProposal, ProtocolTx, ProtocolVote}; +mod consensus; +use consensus::proposal_task; + const CONFIG_FILE: &str = "darkfid_config.toml"; const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml"); @@ -111,6 +115,7 @@ struct Args { pub struct Darkfid { client: Client, + state: ValidatorStatePtr, p2p: P2pPtr, synced: Mutex, } @@ -131,15 +136,16 @@ impl RequestHandler for Darkfid { Some("export_keypair") => return self.export_keypair(req.id, params).await, Some("import_keypair") => return self.import_keypair(req.id, params).await, Some("set_default_address") => return self.set_default_address(req.id, params).await, + Some("tx") => return self.receive_tx(req.id, params).await, Some(_) | None => return jsonrpc::error(MethodNotFound, None, req.id).into(), } } } impl Darkfid { - pub async fn new(wallet: WalletPtr, p2p: P2pPtr) -> Result { + pub async fn new(wallet: WalletPtr, state: ValidatorStatePtr, p2p: P2pPtr) -> Result { let client = Client::new(wallet).await?; - Ok(Self { client, p2p, synced: Mutex::new(false) }) + Ok(Self { client, state, p2p, synced: Mutex::new(false) }) } // RPCAPI: @@ -319,6 +325,23 @@ impl Darkfid { jsonrpc::response(json!(true), id).into() } + + async fn receive_tx(&self, id: Value, params: &[Value]) -> JsonResult { + if params.len() != 1 || !params[0].is_string() { + return jsonrpc::error(InvalidParams, None, id).into() + } + + let payload = String::from(params[0].as_str().unwrap()); + let tx = Tx { payload }; + + self.state.write().await.append_tx(tx.clone()); + + let result = self.p2p.broadcast(tx).await; + match result { + Ok(()) => return jsonrpc::response(json!(true), id).into(), + Err(_) => return jsonrpc::error(InternalError, None, id).into(), + } + } } async fn init_wallet(wallet_path: &str, wallet_pass: &str) -> Result { @@ -378,7 +401,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { debug!("Adding ProtocolTx to the protocol registry"); let _state = state.clone(); registry - .register(net::SESSION_ALL, move |channel, p2p| { + .register(!net::SESSION_SEED, move |channel, p2p| { let state = _state.clone(); async move { ProtocolTx::init(channel, state, p2p).await.unwrap() } }) @@ -387,7 +410,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { debug!("Adding ProtocolVote to the protocol registry"); let _state = state.clone(); registry - .register(net::SESSION_ALL, move |channel, p2p| { + .register(!net::SESSION_SEED, move |channel, p2p| { let state = _state.clone(); async move { ProtocolVote::init(channel, state, p2p).await.unwrap() } }) @@ -396,7 +419,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { debug!("Adding ProtocolProposal to the protocol registry"); let _state = state.clone(); registry - .register(net::SESSION_ALL, move |channel, p2p| { + .register(!net::SESSION_SEED, move |channel, p2p| { let state = _state.clone(); async move { ProtocolProposal::init(channel, state, p2p).await.unwrap() } }) @@ -405,7 +428,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { debug!("Adding ProtocolParticipant to the protocol registry"); let _state = state.clone(); registry - .register(net::SESSION_ALL, move |channel, p2p| { + .register(!net::SESSION_SEED, move |channel, p2p| { let state = _state.clone(); async move { ProtocolParticipant::init(channel, state, p2p).await.unwrap() } }) @@ -413,15 +436,27 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!("Starting P2P networking"); p2p.clone().start(ex.clone()).await?; + let _ex = ex.clone(); + let _p2p = p2p.clone(); + ex.spawn(async move { + if let Err(e) = _p2p.run(_ex).await { + error!("P2P run failed: {}", e); + } + }) + .detach(); // Initialize program state - let darkfid = Darkfid::new(wallet, p2p).await?; + let darkfid = Darkfid::new(wallet, state.clone(), p2p.clone()).await?; let darkfid = Arc::new(darkfid); // JSON-RPC server info!("Starting JSON-RPC server"); ex.spawn(listen_and_serve(args.rpc_listen, darkfid)).detach(); + // Consensus protocol + info!("Starting consensus protocol task"); + ex.spawn(proposal_task(p2p, state)).detach(); + // Wait for SIGINT shutdown.recv().await?; print!("\r"); diff --git a/bin/darkfid2/src/protocol/protocol_participant.rs b/bin/darkfid2/src/protocol/protocol_participant.rs index c8a6eef3e..3dc67ba6e 100644 --- a/bin/darkfid2/src/protocol/protocol_participant.rs +++ b/bin/darkfid2/src/protocol/protocol_participant.rs @@ -45,9 +45,9 @@ impl ProtocolParticipant { debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant); - if self.state.write().unwrap().append_participant((*participant).clone()) { + if self.state.write().await.append_participant((*participant).clone()) { let pending_participants = - self.state.read().unwrap().consensus.pending_participants.clone(); + self.state.read().await.consensus.pending_participants.clone(); for participant in pending_participants { self.p2p.broadcast(participant).await?; } diff --git a/bin/darkfid2/src/protocol/protocol_proposal.rs b/bin/darkfid2/src/protocol/protocol_proposal.rs index a8e47978c..6011deae6 100644 --- a/bin/darkfid2/src/protocol/protocol_proposal.rs +++ b/bin/darkfid2/src/protocol/protocol_proposal.rs @@ -46,14 +46,14 @@ impl ProtocolProposal { debug!("ProtocolProposal::handle_receive_proposal() recv: {:?}", proposal); let proposal_copy = (*proposal).clone(); - let vote = self.state.write().unwrap().receive_proposal(&proposal_copy); + let vote = self.state.write().await.receive_proposal(&proposal_copy); match vote { Ok(v) => { if v.is_none() { debug!("Node did not vote for the proposed block."); } else { let vote = v.unwrap(); - self.state.write().unwrap().receive_vote(&vote)?; + self.state.write().await.receive_vote(&vote)?; // Broadcast block to rest of nodes self.p2p.broadcast(proposal_copy).await?; // Broadcast vote diff --git a/bin/darkfid2/src/protocol/protocol_tx.rs b/bin/darkfid2/src/protocol/protocol_tx.rs index a463b5742..3d7e8e602 100644 --- a/bin/darkfid2/src/protocol/protocol_tx.rs +++ b/bin/darkfid2/src/protocol/protocol_tx.rs @@ -46,7 +46,7 @@ impl ProtocolTx { debug!("ProtocolTx::handle_receive_tx() recv: {:?}", tx); let tx_copy = (*tx).clone(); - if self.state.write().unwrap().append_tx(tx_copy.clone()) { + if self.state.write().await.append_tx(tx_copy.clone()) { self.p2p.broadcast(tx_copy).await?; } } diff --git a/bin/darkfid2/src/protocol/protocol_vote.rs b/bin/darkfid2/src/protocol/protocol_vote.rs index 5ccedc2f9..3537c2ea8 100644 --- a/bin/darkfid2/src/protocol/protocol_vote.rs +++ b/bin/darkfid2/src/protocol/protocol_vote.rs @@ -46,7 +46,7 @@ impl ProtocolVote { debug!("ProtocolVote::handle_receive_vote() recv: {:?}", vote); let vote_copy = (*vote).clone(); - if self.state.write().unwrap().receive_vote(&vote_copy)? { + if self.state.write().await.receive_vote(&vote_copy)? { self.p2p.broadcast(vote_copy).await?; }; } diff --git a/src/consensus2/state.rs b/src/consensus2/state.rs index bebacd925..cbd8ed90f 100644 --- a/src/consensus2/state.rs +++ b/src/consensus2/state.rs @@ -3,13 +3,13 @@ use std::{ collections::BTreeMap, hash::{Hash, Hasher}, - sync::{Arc, RwLock}, time::Duration, }; +use async_std::sync::{Arc, RwLock}; use chrono::{NaiveDateTime, Utc}; use fxhash::FxHasher; -use log::{debug, error, warn}; +use log::{debug, error, info, warn}; use rand::rngs::OsRng; use super::{ @@ -27,7 +27,7 @@ use crate::{ Result, }; -const DELTA: u64 = 60; +const DELTA: u64 = 5; /// This struct represents the information required by the consensus algorithm #[derive(Debug)] @@ -125,6 +125,7 @@ impl ValidatorState { return false } + info!("consensus::state::append_tx(): Appended tx to mempool"); self.unconfirmed_txs.push(tx); true } @@ -156,6 +157,7 @@ impl ValidatorState { let mut hasher = FxHasher::default(); epoch.hash(&mut hasher); self.zero_participants_check(); + // TODO: Division by zero possible let pos = hasher.finish() % (self.consensus.participants.len() as u64); self.consensus.participants.iter().nth(pos as usize).unwrap().1.id } @@ -496,6 +498,7 @@ impl ValidatorState { chain.proposals.drain(0..(consecutive - 1)); // Append to canonical chain + debug!("Adding finalized block to chain"); let blockhashes = self.blockchain.add(&finalized)?; self.consensus.last_block = *blockhashes.last().unwrap(); self.consensus.last_sl = finalized.last().unwrap().sl; diff --git a/src/net/protocol/protocol_ping.rs b/src/net/protocol/protocol_ping.rs index ddc437b9d..f37530c16 100644 --- a/src/net/protocol/protocol_ping.rs +++ b/src/net/protocol/protocol_ping.rs @@ -57,7 +57,7 @@ impl ProtocolPing { debug!(target: "net", "ProtocolPing::run_ping_pong() [START]"); loop { // Wait channel_heartbeat amount of time. - sleep(self.settings.channel_heartbeat_seconds).await; + sleep(self.settings.channel_heartbeat_seconds.into()).await; // Create a random nonce. let nonce = Self::random_nonce(); diff --git a/src/net/session/manual_session.rs b/src/net/session/manual_session.rs index 25393a928..04114fa23 100644 --- a/src/net/session/manual_session.rs +++ b/src/net/session/manual_session.rs @@ -100,7 +100,7 @@ impl ManualSession { Err(err) => { info!(target: "net", "Unable to connect to manual outbound [{}]: {}", addr, err); - sleep(settings.connect_timeout_seconds).await; + sleep(settings.connect_timeout_seconds.into()).await; } } } diff --git a/src/util/async_util.rs b/src/util/async_util.rs index 4906ad44f..f967551bc 100644 --- a/src/util/async_util.rs +++ b/src/util/async_util.rs @@ -2,6 +2,6 @@ use smol::Timer; use std::time::Duration; /// Sleep for any number of seconds. -pub async fn sleep(seconds: u32) { +pub async fn sleep(seconds: u64) { Timer::after(Duration::from_secs(seconds.into())).await; }