darkfid: Implement consensus P2P protocols.

This commit is contained in:
parazyd
2022-04-15 23:54:41 +02:00
parent cfd3e38baf
commit c55c08b662
11 changed files with 320 additions and 9 deletions

3
Cargo.lock generated
View File

@@ -1725,16 +1725,19 @@ dependencies = [
"async-executor",
"async-std",
"async-trait",
"chrono",
"ctrlc-async",
"darkfi",
"easy-parallel",
"futures-lite",
"lazy-init",
"log",
"rand 0.8.5",
"serde",
"serde_derive",
"serde_json",
"simplelog",
"sled",
"structopt",
"structopt-toml",
"url",

View File

@@ -20,8 +20,10 @@ easy-parallel = "3.2.0"
futures-lite = "1.12.0"
lazy-init = "0.5.0"
log = "0.4.16"
rand = "0.8.5"
serde_json = "1.0.79"
simplelog = "0.12.0-alpha1"
sled = "0.34.7"
url = "2.2.2"
# Argument parsing

View File

@@ -3,9 +3,11 @@ use std::net::SocketAddr;
use async_executor::Executor;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use chrono::Utc;
use easy_parallel::Parallel;
use futures_lite::future;
use log::{error, info};
use log::{debug, error, info};
use rand::Rng;
use serde_derive::Deserialize;
use serde_json::{json, Value};
use simplelog::{ColorChoice, TermLogger, TerminalMode};
@@ -15,6 +17,9 @@ use url::Url;
use darkfi::{
cli_desc,
consensus2::{
util::Timestamp, ValidatorState, MAINNET_GENESIS_HASH_BYTES, TESTNET_GENESIS_HASH_BYTES,
},
crypto::{
address::Address,
keypair::{Keypair, PublicKey, SecretKey},
@@ -35,7 +40,7 @@ use darkfi::{
path::get_config_path,
},
wallet::walletdb::{WalletDb, WalletPtr},
Result,
Error, Result,
};
mod client;
@@ -45,11 +50,12 @@ mod error;
use error::{server_error, RpcError};
mod protocol;
use protocol::ProtocolProposal;
use protocol::{ProtocolParticipant, ProtocolProposal, ProtocolTx, ProtocolVote};
const CONFIG_FILE: &str = "darkfid_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml");
// TODO: Flag to participate in consensus
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "darkfid", about = cli_desc!())]
@@ -58,6 +64,10 @@ struct Args {
/// Configuration file to use
config: Option<String>,
#[structopt(long, default_value = "testnet")]
/// Chain to use (testnet, mainnet)
chain: String,
#[structopt(long, default_value = "~/.config/darkfi/darkfid_wallet.db")]
/// Path to wallet database
wallet_path: String,
@@ -66,6 +76,10 @@ struct Args {
/// Password for the wallet database
wallet_pass: String,
#[structopt(long, default_value = "~/.config/darkfi/darkfid_blockchain")]
/// Path to blockchain database
database: String,
#[structopt(long, default_value = "tcp://127.0.0.1:5397")]
/// JSON-RPC listen URL
rpc_listen: Url,
@@ -327,6 +341,26 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// Initialize or load wallet
let wallet = init_wallet(&args.wallet_path, &args.wallet_pass).await?;
// Initialize or open sled database
let db_path = format!("{}/{}", expand_path(&args.database)?.to_str().unwrap(), args.chain);
let sled_db = sled::open(&db_path)?;
// Initialize validator state
// TODO: genesis_ts should be some hardcoded constant
let genesis_ts = Timestamp(Utc::now().timestamp());
let genesis_data = match args.chain.as_str() {
"mainnet" => *MAINNET_GENESIS_HASH_BYTES,
"testnet" => *TESTNET_GENESIS_HASH_BYTES,
x => {
error!("Unsupported chain `{}`", x);
return Err(Error::UnsupportedChain)
}
};
// TODO: Is this ok?
let mut rng = rand::thread_rng();
let id: u64 = rng.gen();
let state = ValidatorState::new(&sled_db, id, genesis_ts, genesis_data)?;
// P2P network
let network_settings = net::Settings {
inbound: Some(args.p2p_accept),
@@ -338,14 +372,49 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
};
let p2p = net::P2p::new(network_settings).await;
let registry = p2p.protocol_registry();
// TODO: Register protocols
debug!("Adding ProtocolTx to the protocol registry");
let _state = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolTx::init(channel, state, p2p).await.unwrap() }
})
.await;
debug!("Adding ProtocolVote to the protocol registry");
let _state = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolVote::init(channel, state, p2p).await.unwrap() }
})
.await;
debug!("Adding ProtocolProposal to the protocol registry");
let _state = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolProposal::init(channel, state, p2p).await.unwrap() }
})
.await;
debug!("Adding ProtocolParticipant to the protocol registry");
let _state = state.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolParticipant::init(channel, state, p2p).await.unwrap() }
})
.await;
info!("Starting P2P networking");
p2p.clone().start(ex.clone()).await?;
// Initialize state
// Initialize program state
let darkfid = Darkfid::new(wallet, p2p).await?;
let darkfid = Arc::new(darkfid);

View File

@@ -1,2 +1,13 @@
// TODO: Handle ? in these modules' loops
mod protocol_participant;
pub use protocol_participant::ProtocolParticipant;
mod protocol_proposal;
pub use protocol_proposal::ProtocolProposal;
mod protocol_tx;
pub use protocol_tx::ProtocolTx;
mod protocol_vote;
pub use protocol_vote::ProtocolVote;

View File

@@ -4,7 +4,7 @@ use async_trait::async_trait;
use log::debug;
use darkfi::{
consensus::{participant::Participant, state::ValidatorStatePtr},
consensus2::{state::ValidatorStatePtr, Participant},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
@@ -44,6 +44,7 @@ impl ProtocolParticipant {
let participant = self.participant_sub.receive().await?;
debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant);
if self.state.write().unwrap().append_participant((*participant).clone()) {
let pending_participants =
self.state.read().unwrap().consensus.pending_participants.clone();

View File

@@ -0,0 +1,84 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use darkfi::{
consensus2::{block::BlockProposal, state::ValidatorStatePtr},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
pub struct ProtocolProposal {
proposal_sub: MessageSubscription<BlockProposal>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl ProtocolProposal {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<BlockProposal>().await;
let proposal_sub = channel.subscribe_msg::<BlockProposal>().await?;
Ok(Arc::new(Self {
proposal_sub,
jobsman: ProtocolJobsManager::new("ProposalProtocol", channel),
state,
p2p,
}))
}
async fn handle_receive_proposal(self: Arc<Self>) -> Result<()> {
debug!("ProtocolProposal::handle_receive_proposal() [START]");
loop {
let proposal = self.proposal_sub.receive().await?;
debug!("ProtocolProposal::handle_receive_proposal() recv: {:?}", proposal);
let proposal_copy = (*proposal).clone();
let vote = self.state.write().unwrap().receive_proposal(&proposal_copy);
match vote {
Ok(v) => {
if v.is_none() {
debug!("Node did not vote for the proposed block.");
} else {
let vote = v.unwrap();
self.state.write().unwrap().receive_vote(&vote)?;
// Broadcast block to rest of nodes
self.p2p.broadcast(proposal_copy).await?;
// Broadcast vote
self.p2p.broadcast(vote).await?;
}
}
Err(e) => {
debug!("ProtocolProposal::handle_receive_proposal() error processing proposal: {:?}", e);
}
}
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolProposal {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!("ProtocolProposal::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_proposal(), executor.clone()).await;
debug!("ProtocolProposal::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolProposal"
}
}

View File

@@ -0,0 +1,69 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use darkfi::{
consensus2::{state::ValidatorStatePtr, Tx},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
pub struct ProtocolTx {
tx_sub: MessageSubscription<Tx>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl ProtocolTx {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<Tx>().await;
let tx_sub = channel.subscribe_msg::<Tx>().await?;
Ok(Arc::new(Self {
tx_sub,
jobsman: ProtocolJobsManager::new("TxProtocol", channel),
state,
p2p,
}))
}
async fn handle_receive_tx(self: Arc<Self>) -> Result<()> {
debug!("ProtocolTx::handle_receive_tx() [START]");
loop {
let tx = self.tx_sub.receive().await?;
debug!("ProtocolTx::handle_receive_tx() recv: {:?}", tx);
let tx_copy = (*tx).clone();
if self.state.write().unwrap().append_tx(tx_copy.clone()) {
self.p2p.broadcast(tx_copy).await?;
}
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolTx {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!("ProtocolTx::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_tx(), executor.clone()).await;
debug!("ProtocolTx::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolTx"
}
}

View File

@@ -0,0 +1,69 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use darkfi::{
consensus2::{state::ValidatorStatePtr, Vote},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
pub struct ProtocolVote {
vote_sub: MessageSubscription<Vote>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
}
impl ProtocolVote {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<Vote>().await;
let vote_sub = channel.subscribe_msg::<Vote>().await?;
Ok(Arc::new(Self {
vote_sub,
jobsman: ProtocolJobsManager::new("VoteProtocol", channel),
state,
p2p,
}))
}
async fn handle_receive_vote(self: Arc<Self>) -> Result<()> {
debug!("ProtocolVote::handle_receive_vote() [START]");
loop {
let vote = self.vote_sub.receive().await?;
debug!("ProtocolVote::handle_receive_vote() recv: {:?}", vote);
let vote_copy = (*vote).clone();
if self.state.write().unwrap().receive_vote(&vote_copy)? {
self.p2p.broadcast(vote_copy).await?;
};
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolVote {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!("ProtocolVote::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_vote(), executor.clone()).await;
debug!("ProtocolVote::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolVote"
}
}

View File

@@ -28,8 +28,8 @@ pub mod util;
use lazy_static::lazy_static;
lazy_static! {
/// Genesis hash for the mainnet chain
pub static ref MAINNET_GENESIS_HASH_BYTES: [u8; 32] = *blake3::hash(b"darkfi_mainnet").as_bytes();
pub static ref MAINNET_GENESIS_HASH_BYTES: blake3::Hash = blake3::hash(b"darkfi_mainnet");
/// Genesis hash for the testnet chain
pub static ref TESTNET_GENESIS_HASH_BYTES: [u8; 32] = *blake3::hash(b"darkfi_testnet").as_bytes();
pub static ref TESTNET_GENESIS_HASH_BYTES: blake3::Hash = blake3::hash(b"darkfi_testnet");
}

View File

@@ -1,4 +1,4 @@
// TODO: Pending participants can also be a map?
// TODO: Use sets instead of vectors where possible.
use std::{
collections::BTreeMap,

View File

@@ -239,6 +239,9 @@ pub enum Error {
#[cfg(feature = "net2")]
#[error("TransportError: {0}")]
TransportError(String),
#[error("Unsupported chain")]
UnsupportedChain,
}
#[cfg(feature = "node")]