darkfid: Enable consensus task.

This commit is contained in:
parazyd
2022-04-16 02:29:21 +02:00
parent dfc9bd24c4
commit a4dd04daf7
10 changed files with 142 additions and 20 deletions

View File

@@ -0,0 +1,84 @@
use log::{debug, error, info};
use darkfi::{
consensus2::{state::ValidatorStatePtr, Participant},
net::P2pPtr,
util::async_util::sleep,
};
pub async fn proposal_task(p2p: P2pPtr, state: ValidatorStatePtr) {
// Node signals the network that it starts participating
let participant = Participant::new(state.read().await.id, state.read().await.current_epoch());
state.write().await.append_participant(participant.clone());
match p2p.broadcast(participant).await {
Ok(()) => info!("Consensus participation message broadcasted successfully."),
Err(e) => error!("Failed broadcasting consensus participation: {}", e),
}
// After initialization, node should wait for next epoch
let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs();
info!("Waiting for next epoch({} sec)...", seconds_until_next_epoch);
sleep(seconds_until_next_epoch).await;
loop {
// Node refreshes participants records
state.write().await.refresh_participants();
// Node checks if it's the epoch leader to generate a new proposal
// for that epoch.
let result = if state.write().await.is_epoch_leader() {
state.read().await.propose()
} else {
Ok(None)
};
match result {
Ok(proposal) => {
if proposal.is_none() {
info!("Node is not the epoch leader. Sleeping till next epoch...");
} else {
// Leader creates a vote for the proposal and broadcasts them both
let proposal = proposal.unwrap();
info!("Node is the epoch leader: Proposed block: {:?}", proposal);
let vote = state.write().await.receive_proposal(&proposal);
match vote {
Ok(v) => {
if v.is_none() {
debug!("Node did not vote for the proposed block");
} else {
let vote = v.unwrap();
let result = state.write().await.receive_vote(&vote);
match result {
Ok(_) => info!("Vote saved successfully."),
Err(e) => error!("Vote save failed: {}", e),
}
// Broadcast block
let result = p2p.broadcast(proposal).await;
match result {
Ok(()) => info!("Proposal broadcasted successfully."),
Err(e) => error!("Failed broadcasting proposal: {}", e),
}
// Broadcast leader vote
let result = p2p.broadcast(vote).await;
match result {
Ok(()) => info!("Leader vote broadcasted successfully."),
Err(e) => error!("Failed broadcasting leader vote: {}", e),
}
}
}
Err(e) => error!("Failed processing proposal: {}", e),
}
}
}
Err(e) => error!("Block proposal failed: {}", e),
}
// Node waits until next epoch
let seconds_until_next_epoch = state.read().await.next_epoch_start().as_secs();
info!("Waiting for next epoch ({} sec)...", seconds_until_next_epoch);
sleep(seconds_until_next_epoch).await;
}
}

View File

@@ -18,7 +18,8 @@ use url::Url;
use darkfi::{
cli_desc,
consensus2::{
util::Timestamp, ValidatorState, MAINNET_GENESIS_HASH_BYTES, TESTNET_GENESIS_HASH_BYTES,
state::ValidatorStatePtr, util::Timestamp, Tx, ValidatorState, MAINNET_GENESIS_HASH_BYTES,
TESTNET_GENESIS_HASH_BYTES,
},
crypto::{
address::Address,
@@ -52,6 +53,9 @@ use error::{server_error, RpcError};
mod protocol;
use protocol::{ProtocolParticipant, ProtocolProposal, ProtocolTx, ProtocolVote};
mod consensus;
use consensus::proposal_task;
const CONFIG_FILE: &str = "darkfid_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml");
@@ -111,6 +115,7 @@ struct Args {
pub struct Darkfid {
client: Client,
state: ValidatorStatePtr,
p2p: P2pPtr,
synced: Mutex<bool>,
}
@@ -131,15 +136,16 @@ impl RequestHandler for Darkfid {
Some("export_keypair") => return self.export_keypair(req.id, params).await,
Some("import_keypair") => return self.import_keypair(req.id, params).await,
Some("set_default_address") => return self.set_default_address(req.id, params).await,
Some("tx") => return self.receive_tx(req.id, params).await,
Some(_) | None => return jsonrpc::error(MethodNotFound, None, req.id).into(),
}
}
}
impl Darkfid {
pub async fn new(wallet: WalletPtr, p2p: P2pPtr) -> Result<Self> {
pub async fn new(wallet: WalletPtr, state: ValidatorStatePtr, p2p: P2pPtr) -> Result<Self> {
let client = Client::new(wallet).await?;
Ok(Self { client, p2p, synced: Mutex::new(false) })
Ok(Self { client, state, p2p, synced: Mutex::new(false) })
}
// RPCAPI:
@@ -319,6 +325,23 @@ impl Darkfid {
jsonrpc::response(json!(true), id).into()
}
async fn receive_tx(&self, id: Value, params: &[Value]) -> JsonResult {
if params.len() != 1 || !params[0].is_string() {
return jsonrpc::error(InvalidParams, None, id).into()
}
let payload = String::from(params[0].as_str().unwrap());
let tx = Tx { payload };
self.state.write().await.append_tx(tx.clone());
let result = self.p2p.broadcast(tx).await;
match result {
Ok(()) => return jsonrpc::response(json!(true), id).into(),
Err(_) => return jsonrpc::error(InternalError, None, id).into(),
}
}
}
async fn init_wallet(wallet_path: &str, wallet_pass: &str) -> Result<WalletPtr> {
@@ -378,7 +401,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
debug!("Adding ProtocolTx to the protocol registry");
let _state = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
.register(!net::SESSION_SEED, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolTx::init(channel, state, p2p).await.unwrap() }
})
@@ -387,7 +410,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
debug!("Adding ProtocolVote to the protocol registry");
let _state = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
.register(!net::SESSION_SEED, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolVote::init(channel, state, p2p).await.unwrap() }
})
@@ -396,7 +419,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
debug!("Adding ProtocolProposal to the protocol registry");
let _state = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
.register(!net::SESSION_SEED, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolProposal::init(channel, state, p2p).await.unwrap() }
})
@@ -405,7 +428,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
debug!("Adding ProtocolParticipant to the protocol registry");
let _state = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
.register(!net::SESSION_SEED, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolParticipant::init(channel, state, p2p).await.unwrap() }
})
@@ -413,15 +436,27 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
info!("Starting P2P networking");
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
let _p2p = p2p.clone();
ex.spawn(async move {
if let Err(e) = _p2p.run(_ex).await {
error!("P2P run failed: {}", e);
}
})
.detach();
// Initialize program state
let darkfid = Darkfid::new(wallet, p2p).await?;
let darkfid = Darkfid::new(wallet, state.clone(), p2p.clone()).await?;
let darkfid = Arc::new(darkfid);
// JSON-RPC server
info!("Starting JSON-RPC server");
ex.spawn(listen_and_serve(args.rpc_listen, darkfid)).detach();
// Consensus protocol
info!("Starting consensus protocol task");
ex.spawn(proposal_task(p2p, state)).detach();
// Wait for SIGINT
shutdown.recv().await?;
print!("\r");

View File

@@ -45,9 +45,9 @@ impl ProtocolParticipant {
debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant);
if self.state.write().unwrap().append_participant((*participant).clone()) {
if self.state.write().await.append_participant((*participant).clone()) {
let pending_participants =
self.state.read().unwrap().consensus.pending_participants.clone();
self.state.read().await.consensus.pending_participants.clone();
for participant in pending_participants {
self.p2p.broadcast(participant).await?;
}

View File

@@ -46,14 +46,14 @@ impl ProtocolProposal {
debug!("ProtocolProposal::handle_receive_proposal() recv: {:?}", proposal);
let proposal_copy = (*proposal).clone();
let vote = self.state.write().unwrap().receive_proposal(&proposal_copy);
let vote = self.state.write().await.receive_proposal(&proposal_copy);
match vote {
Ok(v) => {
if v.is_none() {
debug!("Node did not vote for the proposed block.");
} else {
let vote = v.unwrap();
self.state.write().unwrap().receive_vote(&vote)?;
self.state.write().await.receive_vote(&vote)?;
// Broadcast block to rest of nodes
self.p2p.broadcast(proposal_copy).await?;
// Broadcast vote

View File

@@ -46,7 +46,7 @@ impl ProtocolTx {
debug!("ProtocolTx::handle_receive_tx() recv: {:?}", tx);
let tx_copy = (*tx).clone();
if self.state.write().unwrap().append_tx(tx_copy.clone()) {
if self.state.write().await.append_tx(tx_copy.clone()) {
self.p2p.broadcast(tx_copy).await?;
}
}

View File

@@ -46,7 +46,7 @@ impl ProtocolVote {
debug!("ProtocolVote::handle_receive_vote() recv: {:?}", vote);
let vote_copy = (*vote).clone();
if self.state.write().unwrap().receive_vote(&vote_copy)? {
if self.state.write().await.receive_vote(&vote_copy)? {
self.p2p.broadcast(vote_copy).await?;
};
}

View File

@@ -3,13 +3,13 @@
use std::{
collections::BTreeMap,
hash::{Hash, Hasher},
sync::{Arc, RwLock},
time::Duration,
};
use async_std::sync::{Arc, RwLock};
use chrono::{NaiveDateTime, Utc};
use fxhash::FxHasher;
use log::{debug, error, warn};
use log::{debug, error, info, warn};
use rand::rngs::OsRng;
use super::{
@@ -27,7 +27,7 @@ use crate::{
Result,
};
const DELTA: u64 = 60;
const DELTA: u64 = 5;
/// This struct represents the information required by the consensus algorithm
#[derive(Debug)]
@@ -125,6 +125,7 @@ impl ValidatorState {
return false
}
info!("consensus::state::append_tx(): Appended tx to mempool");
self.unconfirmed_txs.push(tx);
true
}
@@ -156,6 +157,7 @@ impl ValidatorState {
let mut hasher = FxHasher::default();
epoch.hash(&mut hasher);
self.zero_participants_check();
// TODO: Division by zero possible
let pos = hasher.finish() % (self.consensus.participants.len() as u64);
self.consensus.participants.iter().nth(pos as usize).unwrap().1.id
}
@@ -496,6 +498,7 @@ impl ValidatorState {
chain.proposals.drain(0..(consecutive - 1));
// Append to canonical chain
debug!("Adding finalized block to chain");
let blockhashes = self.blockchain.add(&finalized)?;
self.consensus.last_block = *blockhashes.last().unwrap();
self.consensus.last_sl = finalized.last().unwrap().sl;

View File

@@ -57,7 +57,7 @@ impl ProtocolPing {
debug!(target: "net", "ProtocolPing::run_ping_pong() [START]");
loop {
// Wait channel_heartbeat amount of time.
sleep(self.settings.channel_heartbeat_seconds).await;
sleep(self.settings.channel_heartbeat_seconds.into()).await;
// Create a random nonce.
let nonce = Self::random_nonce();

View File

@@ -100,7 +100,7 @@ impl ManualSession {
Err(err) => {
info!(target: "net", "Unable to connect to manual outbound [{}]: {}", addr, err);
sleep(settings.connect_timeout_seconds).await;
sleep(settings.connect_timeout_seconds.into()).await;
}
}
}

View File

@@ -2,6 +2,6 @@ use smol::Timer;
use std::time::Duration;
/// Sleep for any number of seconds.
pub async fn sleep(seconds: u32) {
pub async fn sleep(seconds: u64) {
Timer::after(Duration::from_secs(seconds.into())).await;
}