src/consensus: moved under script/research/validatord workspace

This commit is contained in:
aggstam
2022-04-25 14:04:45 +03:00
parent f661c1e714
commit 70ffc7e0c4
14 changed files with 139 additions and 54 deletions

View File

@@ -17,6 +17,10 @@ async-channel = "1.6.1"
async-executor = "1.4.1"
easy-parallel = "3.2.0"
# Crypto
rand = "0.8.5"
blake3 = "1.3.1"
# Storage
sled = "0.34.7"
@@ -29,6 +33,7 @@ structopt-toml = "0.5.0"
toml = "0.5.9"
# Misc
chrono = "0.4.19"
log = "0.4.16"
num_cpus = "1.13.1"
simplelog = "0.12.0"

View File

@@ -0,0 +1,340 @@
use std::io;
use darkfi::{
crypto::{keypair::PublicKey, schnorr::Signature},
impl_vec, net,
util::serial::{
deserialize, serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt,
},
Result,
};
use super::{
metadata::{Metadata, StreamletMetadata},
tx::Tx,
util::{Timestamp, EMPTY_HASH_BYTES},
};
const SLED_BLOCK_TREE: &[u8] = b"_blocks";
const SLED_BLOCK_ORDER_TREE: &[u8] = b"_blocks_order";
/// This struct represents a tuple of the form (st, sl, txs, metadata).
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct Block {
/// Previous block hash
pub st: blake3::Hash,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Transaction hashes
/// The actual transactions are in [`TxStore`]
pub txs: Vec<blake3::Hash>,
/// Additional block information
pub metadata: Metadata,
}
impl Block {
pub fn new(st: blake3::Hash, sl: u64, txs: Vec<blake3::Hash>, metadata: Metadata) -> Block {
Block { st, sl, txs, metadata }
}
/// Generates the genesis block.
pub fn genesis_block(genesis: i64) -> Block {
let hash = blake3::Hash::from(EMPTY_HASH_BYTES);
let metadata = Metadata::new(
Timestamp(genesis),
String::from("proof"),
String::from("r"),
String::from("s"),
);
Block::new(hash, 0, vec![], metadata)
}
}
#[derive(Debug)]
pub struct BlockStore(sled::Tree);
impl BlockStore {
/// Opens a new or existing blockstore tree given a sled database.
pub fn new(db: &sled::Db, genesis: i64) -> Result<Self> {
let tree = db.open_tree(SLED_BLOCK_TREE)?;
let store = Self(tree);
if store.0.is_empty() {
// Genesis block is generated.
store.insert(&Block::genesis_block(genesis))?;
}
Ok(store)
}
/// Insert a block into the blockstore.
/// The block is hashed with blake3 and this blockhash is used as
/// the key, where value is the serialized block itself.
pub fn insert(&self, block: &Block) -> Result<blake3::Hash> {
let serialized = serialize(block);
let blockhash = blake3::hash(&serialized);
self.0.insert(blockhash.as_bytes(), serialized)?;
Ok(blockhash)
}
/// Fetch given blocks from the blockstore.
/// The resulting vector contains `Option` which is `Some` if the block
/// was found in the blockstore, and `None`, if it has not.
pub fn get(&self, blockhashes: &[blake3::Hash]) -> Result<Vec<Option<(blake3::Hash, Block)>>> {
let mut ret: Vec<Option<(blake3::Hash, Block)>> = Vec::with_capacity(blockhashes.len());
for i in blockhashes {
if let Some(found) = self.0.get(i.as_bytes())? {
let block = deserialize(&found)?;
ret.push(Some((i.clone(), block)));
} else {
ret.push(None);
}
}
Ok(ret)
}
/// Retrieve all blocks.
/// Be carefull as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<Option<(blake3::Hash, Block)>>> {
let mut blocks = Vec::new();
let mut iterator = self.0.into_iter().enumerate();
while let Some((_, r)) = iterator.next() {
let (k, v) = r.unwrap();
let hash_bytes: [u8; 32] = k.as_ref().try_into().unwrap();
let block = deserialize(&v)?;
blocks.push(Some((hash_bytes.into(), block)));
}
Ok(blocks)
}
}
/// Auxilary structure used for blockchain syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct BlockOrder {
/// Slot uid
pub sl: u64,
/// Block hash of that slot
pub block: blake3::Hash,
}
impl net::Message for BlockOrder {
fn name() -> &'static str {
"blockorder"
}
}
/// Auxilary structure represending a full block data, used for blockchain syncing.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct BlockInfo {
/// Previous block hash
pub st: blake3::Hash,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Transactions payload
pub txs: Vec<Tx>,
/// Additional proposal information
pub metadata: Metadata,
/// Proposal information used by Streamlet consensus
pub sm: StreamletMetadata,
}
impl BlockInfo {
pub fn new(
st: blake3::Hash,
sl: u64,
txs: Vec<Tx>,
metadata: Metadata,
sm: StreamletMetadata,
) -> BlockInfo {
BlockInfo { st, sl, txs, metadata, sm }
}
}
impl net::Message for BlockInfo {
fn name() -> &'static str {
"blockinfo"
}
}
impl_vec!(BlockInfo);
/// Auxilary structure used for blockchain syncing.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct BlockResponse {
/// Response blocks.
pub blocks: Vec<BlockInfo>,
}
impl net::Message for BlockResponse {
fn name() -> &'static str {
"blockresponse"
}
}
#[derive(Debug)]
pub struct BlockOrderStore(sled::Tree);
impl BlockOrderStore {
/// Opens a new or existing blockorderstore tree given a sled database.
pub fn new(db: &sled::Db, genesis: i64) -> Result<Self> {
let tree = db.open_tree(SLED_BLOCK_ORDER_TREE)?;
let store = Self(tree);
if store.0.is_empty() {
// Genesis block record is generated.
let block = Block::genesis_block(genesis);
let blockhash = blake3::hash(&serialize(&block));
store.insert(block.sl, blockhash)?;
}
Ok(store)
}
/// Insert a block hash into the blockorderstore.
/// The block slot is used as the key, where value is the block hash.
pub fn insert(&self, slot: u64, block: blake3::Hash) -> Result<()> {
self.0.insert(slot.to_be_bytes(), serialize(&block))?;
Ok(())
}
/// Fetch given slots block hashes from the blockstore.
/// The resulting vector contains `Option` which is `Some` if the block
/// was found in the blockstore, and `None`, if it has not.
pub fn get(&self, slots: &[u64]) -> Result<Vec<Option<BlockOrder>>> {
let mut ret: Vec<Option<BlockOrder>> = Vec::with_capacity(slots.len());
for sl in slots {
if let Some(found) = self.0.get(sl.to_be_bytes())? {
let block = deserialize(&found)?;
ret.push(Some(BlockOrder { sl: sl.clone(), block }));
} else {
ret.push(None);
}
}
Ok(ret)
}
/// Retrieve the last block hash in the tree, based on the Ord implementation for Vec<u8>.
pub fn get_last(&self) -> Result<Option<(u64, blake3::Hash)>> {
if let Some(found) = self.0.last()? {
let slot_bytes: [u8; 8] = found.0.as_ref().try_into().unwrap();
let slot = u64::from_be_bytes(slot_bytes);
let block_hash = deserialize(&found.1)?;
return Ok(Some((slot, block_hash)))
}
Ok(None)
}
/// Retrieve n hashes after key.
pub fn get_after(&self, mut key: u64, n: u64) -> Result<Vec<blake3::Hash>> {
let mut hashes = Vec::new();
let mut counter = 0;
while counter <= n {
if let Some(found) = self.0.get_gt(key.to_be_bytes())? {
let key_bytes: [u8; 8] = found.0.as_ref().try_into().unwrap();
key = u64::from_be_bytes(key_bytes);
let block_hash = deserialize(&found.1)?;
hashes.push(block_hash);
counter = counter + 1;
} else {
break
}
}
Ok(hashes)
}
/// Retrieve all blocks hashes.
/// Be carefull as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<Option<(u64, blake3::Hash)>>> {
let mut block_hashes = Vec::new();
let mut iterator = self.0.into_iter().enumerate();
while let Some((_, r)) = iterator.next() {
let (k, v) = r.unwrap();
let slot_bytes: [u8; 8] = k.as_ref().try_into().unwrap();
let slot = u64::from_be_bytes(slot_bytes);
let block_hash = deserialize(&v)?;
block_hashes.push(Some((slot, block_hash)));
}
Ok(block_hashes)
}
}
/// This struct represents a Block proposal, used for consensus.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct BlockProposal {
/// leader public key
pub public_key: PublicKey,
/// signed block
pub signature: Signature,
/// leader id
pub id: u64,
/// Previous block hash
pub st: blake3::Hash,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Transactions payload
pub txs: Vec<Tx>,
/// Additional proposal information
pub metadata: Metadata,
/// Proposal information used by Streamlet consensus
pub sm: StreamletMetadata,
}
impl BlockProposal {
pub fn new(
public_key: PublicKey,
signature: Signature,
id: u64,
st: blake3::Hash,
sl: u64,
txs: Vec<Tx>,
metadata: Metadata,
sm: StreamletMetadata,
) -> BlockProposal {
BlockProposal { public_key, signature, id, st, sl, txs, metadata, sm }
}
/// Produce proposal hash using st, sl, txs and metadata.
pub fn hash(&self) -> blake3::Hash {
Self::to_proposal_hash(self.st, self.sl, &self.txs, &self.metadata)
}
/// Util function generate a proposal hash using provided st, sl, txs and metadata.
pub fn to_proposal_hash(
st: blake3::Hash,
sl: u64,
transactions: &Vec<Tx>,
metadata: &Metadata,
) -> blake3::Hash {
let mut txs = Vec::new();
for tx in transactions {
let hash = blake3::hash(&serialize(tx));
txs.push(hash);
}
blake3::hash(&serialize(&Block::new(st, sl, txs, metadata.clone())))
}
}
impl PartialEq for BlockProposal {
fn eq(&self, other: &Self) -> bool {
self.public_key == other.public_key &&
self.signature == other.signature &&
self.id == other.id &&
self.st == other.st &&
self.sl == other.sl &&
self.txs == other.txs &&
self.metadata == other.metadata
}
}
impl net::Message for BlockProposal {
fn name() -> &'static str {
"proposal"
}
}
impl_vec!(BlockProposal);

View File

@@ -0,0 +1,203 @@
use std::io;
use log::debug;
use darkfi::{
impl_vec,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
use super::{
block::{Block, BlockInfo, BlockOrderStore, BlockProposal, BlockStore},
metadata::StreamletMetadataStore,
tx::TxStore,
};
/// This struct represents the canonical (finalized) blockchain stored in sled database.
#[derive(Debug)]
pub struct Blockchain {
/// Blocks sled database
pub blocks: BlockStore,
/// Blocks order sled database
pub order: BlockOrderStore,
/// Transactions sled database
pub transactions: TxStore,
/// Streamlet metadata sled database
pub streamlet_metadata: StreamletMetadataStore,
}
impl Blockchain {
pub fn new(db: &sled::Db, genesis: i64) -> Result<Blockchain> {
let blocks = BlockStore::new(db, genesis)?;
let order = BlockOrderStore::new(db, genesis)?;
let transactions = TxStore::new(db)?;
let streamlet_metadata = StreamletMetadataStore::new(db, genesis)?;
Ok(Blockchain { blocks, order, transactions, streamlet_metadata })
}
/// Insertion of a block proposal.
pub fn add_by_proposal(&mut self, proposal: BlockProposal) -> Result<blake3::Hash> {
// Storing transactions
let mut txs = Vec::new();
for tx in proposal.txs {
let hash = self.transactions.insert(&tx)?;
txs.push(hash);
}
// Storing block
let block = Block { st: proposal.st, sl: proposal.sl, txs, metadata: proposal.metadata };
let hash = self.blocks.insert(&block)?;
// Storing block order
self.order.insert(block.sl, hash)?;
// Storing streamlet metadata
self.streamlet_metadata.insert(hash, &proposal.sm)?;
Ok(hash)
}
/// Insertion of a block info.
pub fn add_by_info(&mut self, info: BlockInfo) -> Result<blake3::Hash> {
if self.has_block(&info)? {
let blockhash =
BlockProposal::to_proposal_hash(info.st, info.sl, &info.txs, &info.metadata);
return Ok(blockhash)
}
// Storing transactions
let mut txs = Vec::new();
for tx in info.txs {
let hash = self.transactions.insert(&tx)?;
txs.push(hash);
}
// Storing block
let block = Block { st: info.st, sl: info.sl, txs, metadata: info.metadata };
let hash = self.blocks.insert(&block)?;
// Storing block order
self.order.insert(block.sl, hash)?;
// Storing streamlet metadata
self.streamlet_metadata.insert(hash, &info.sm)?;
Ok(hash)
}
/// Retrieve the last block slot and hash.
pub fn last(&self) -> Result<Option<(u64, blake3::Hash)>> {
self.order.get_last()
}
/// Retrieve the last block slot and hash.
pub fn has_block(&self, info: &BlockInfo) -> Result<bool> {
let hashes = self.order.get(&vec![info.sl])?;
if hashes.is_empty() {
return Ok(false)
}
if let Some(found) = &hashes[0] {
// Checking provided info produces same hash
let blockhash =
BlockProposal::to_proposal_hash(info.st, info.sl, &info.txs, &info.metadata);
return Ok(blockhash == found.block)
}
Ok(false)
}
/// Retrieve n blocks with all their info, after start key.
pub fn get_with_info(&self, key: u64, n: u64) -> Result<Vec<BlockInfo>> {
let mut blocks_info = Vec::new();
// Retrieve requested hashes from order store
let hashes = self.order.get_after(key, n)?;
// Retrieve blocks for found hashes
let blocks = self.blocks.get(&hashes)?;
// For each found block, retrieve its txs and metadata and convert to BlockProposal
for option in blocks {
match option {
None => continue,
Some((hash, block)) => {
let mut txs = Vec::new();
let found = self.transactions.get(&block.txs)?;
for option in found {
match option {
Some(tx) => txs.push(tx),
None => continue,
}
}
let sm = self.streamlet_metadata.get(&vec![hash])?[0].as_ref().unwrap().clone();
blocks_info.push(BlockInfo::new(block.st, block.sl, txs, block.metadata, sm));
}
}
}
Ok(blocks_info)
}
}
/// This struct represents a sequence of block proposals.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct ProposalsChain {
pub proposals: Vec<BlockProposal>,
}
impl ProposalsChain {
pub fn new(initial_proposal: BlockProposal) -> ProposalsChain {
ProposalsChain { proposals: vec![initial_proposal] }
}
/// A proposal is considered valid when its parent hash is equal to the hash of the
/// previous proposal and their epochs are incremental, exluding genesis block proposal.
/// Additional validity rules can be applied.
pub fn check_proposal(
&self,
proposal: &BlockProposal,
previous: &BlockProposal,
genesis: &blake3::Hash,
) -> bool {
if &proposal.st == genesis {
debug!("Genesis block proposal provided.");
return false
}
let previous_hash = previous.hash();
if proposal.st != previous_hash || proposal.sl <= previous.sl {
debug!("Provided proposal is invalid.");
return false
}
true
}
/// A proposals chain is considered valid, when every proposal is valid, based on check_proposal function.
pub fn check_chain(&self, genesis: &blake3::Hash) -> bool {
for (index, proposal) in self.proposals[1..].iter().enumerate() {
if !self.check_proposal(proposal, &self.proposals[index], genesis) {
return false
}
}
true
}
/// Insertion of a valid proposal.
pub fn add(&mut self, proposal: &BlockProposal, genesis: &blake3::Hash) {
if self.check_proposal(proposal, self.proposals.last().unwrap(), genesis) {
self.proposals.push(proposal.clone());
}
}
/// Proposals chain notarization check.
pub fn notarized(&self) -> bool {
for proposal in &self.proposals {
if !proposal.sm.notarized {
return false
}
}
true
}
}
impl_vec!(ProposalsChain);

View File

@@ -0,0 +1,121 @@
use darkfi::{
util::serial::{deserialize, serialize, SerialDecodable, SerialEncodable},
Result,
};
use super::{block::Block, participant::Participant, util::Timestamp, vote::Vote};
const SLED_STREAMLET_METADATA_TREE: &[u8] = b"_streamlet_metadata";
/// This struct represents additional Block information used by the consensus protocol.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct Metadata {
/// Block creation timestamp
pub timestamp: Timestamp,
/// Block information used by Ouroboros consensus
pub om: OuroborosMetadata,
}
impl Metadata {
pub fn new(timestamp: Timestamp, proof: String, r: String, s: String) -> Metadata {
Metadata { timestamp, om: OuroborosMetadata::new(proof, r, s) }
}
}
/// This struct represents Block information used by Ouroboros consensus protocol.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct OuroborosMetadata {
/// Proof the stakeholder is the block owner
pub proof: String,
/// Random seed for VRF
pub r: String,
/// Block owner signature
pub s: String,
}
impl OuroborosMetadata {
pub fn new(proof: String, r: String, s: String) -> OuroborosMetadata {
OuroborosMetadata { proof, r, s }
}
}
/// This struct represents Block information used by Streamlet consensus protocol.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct StreamletMetadata {
/// Epoch votes
pub votes: Vec<Vote>,
/// Block notarization flag
pub notarized: bool,
/// Block finalization flag
pub finalized: bool,
/// Nodes participated in the voting process
pub participants: Vec<Participant>,
}
impl StreamletMetadata {
pub fn new(participants: Vec<Participant>) -> StreamletMetadata {
StreamletMetadata { votes: Vec::new(), notarized: false, finalized: false, participants }
}
}
#[derive(Debug)]
pub struct StreamletMetadataStore(sled::Tree);
impl StreamletMetadataStore {
pub fn new(db: &sled::Db, genesis: i64) -> Result<Self> {
let tree = db.open_tree(SLED_STREAMLET_METADATA_TREE)?;
let store = Self(tree);
if store.0.is_empty() {
// Genesis block record is generated.
let block = blake3::hash(&serialize(&Block::genesis_block(genesis)));
let metadata = StreamletMetadata {
votes: vec![],
notarized: true,
finalized: true,
participants: vec![],
};
store.insert(block, &metadata)?;
}
Ok(store)
}
/// Insert streamlet metadata into the store.
/// The block hash for the metadata is used as the key, where value is the serialized metadata.
pub fn insert(&self, block: blake3::Hash, metadata: &StreamletMetadata) -> Result<()> {
self.0.insert(block.as_bytes(), serialize(metadata))?;
Ok(())
}
/// Fetch given streamlet metadata from the store.
/// The resulting vector contains `Option` which is `Some` if the metadata
/// was found in the store, and `None`, if it has not.
pub fn get(&self, hashes: &[blake3::Hash]) -> Result<Vec<Option<StreamletMetadata>>> {
let mut ret: Vec<Option<StreamletMetadata>> = Vec::with_capacity(hashes.len());
for i in hashes {
if let Some(found) = self.0.get(i.as_bytes())? {
let metadata = deserialize(&found)?;
ret.push(Some(metadata));
} else {
ret.push(None);
}
}
Ok(ret)
}
/// Retrieve all streamlet metadata.
/// Be carefull as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<Option<(blake3::Hash, StreamletMetadata)>>> {
let mut metadata = Vec::new();
let mut iterator = self.0.into_iter().enumerate();
while let Some((_, r)) = iterator.next() {
let (k, v) = r.unwrap();
let hash_bytes: [u8; 32] = k.as_ref().try_into().unwrap();
let m = deserialize(&v)?;
metadata.push(Some((hash_bytes.into(), m)));
}
Ok(metadata)
}
}

View File

@@ -0,0 +1,16 @@
pub mod block;
pub mod blockchain;
pub mod metadata;
pub mod participant;
pub mod state;
pub mod tx;
pub mod util;
pub mod vote;
pub use block::{Block, BlockProposal};
pub use blockchain::Blockchain;
pub use metadata::Metadata;
pub use participant::Participant;
pub use state::ValidatorState;
pub use tx::Tx;
pub use vote::Vote;

View File

@@ -0,0 +1,55 @@
use std::{collections::BTreeMap, io};
use darkfi::{
impl_vec, net,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
/// This struct represents a tuple of the form (node_id, epoch_joined, last_epoch_voted).
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct Participant {
/// Node id
pub id: u64,
/// Epoch node joined the network
pub joined: u64,
/// Last epoch node voted
pub voted: Option<u64>,
}
impl Participant {
pub fn new(id: u64, joined: u64) -> Participant {
Participant { id, joined, voted: None }
}
}
impl net::Message for Participant {
fn name() -> &'static str {
"participant"
}
}
impl Encodable for BTreeMap<u64, Participant> {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += VarInt(self.len() as u64).encode(&mut s)?;
for c in self.iter() {
len += c.1.encode(&mut s)?;
}
Ok(len)
}
}
impl Decodable for BTreeMap<u64, Participant> {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let len = VarInt::decode(&mut d)?.0;
let mut ret = BTreeMap::new();
for _ in 0..len {
let participant: Participant = Decodable::decode(&mut d)?;
ret.insert(participant.id, participant);
}
Ok(ret)
}
}
impl_vec!(Participant);

View File

@@ -0,0 +1,725 @@
use chrono::{NaiveDateTime, Utc};
use log::{debug, error, warn};
use rand::rngs::OsRng;
use std::{
collections::{hash_map::DefaultHasher, BTreeMap},
hash::{Hash, Hasher},
path::PathBuf,
sync::{Arc, RwLock},
time::Duration,
};
use darkfi::{
crypto::{
keypair::{PublicKey, SecretKey},
schnorr::{SchnorrPublic, SchnorrSecret},
},
net,
util::serial::{deserialize, serialize, Encodable, SerialDecodable, SerialEncodable},
Error, Result,
};
use super::{
block::{Block, BlockInfo, BlockProposal},
blockchain::{Blockchain, ProposalsChain},
metadata::{Metadata, StreamletMetadata},
participant::Participant,
tx::Tx,
util::{get_current_time, Timestamp},
vote::Vote,
};
pub const DELTA: u64 = 10;
const SLED_CONSESUS_STATE_TREE: &[u8] = b"_consensus_state";
/// This struct represents the information required by the consensus algorithm.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ConsensusState {
/// Genesis block creation timestamp
pub genesis: Timestamp,
/// Fork chains containing block proposals
pub proposals: Vec<ProposalsChain>,
/// Orphan votes pool, in case a vote reaches a node before the corresponding block
pub orphan_votes: Vec<Vote>,
/// Node participation identity
pub participant: Option<Participant>,
/// Validators currently participating in the consensus
pub participants: BTreeMap<u64, Participant>,
/// Validators to be added on the next epoch as participants
pub pending_participants: Vec<Participant>,
/// Last slot participants where refreshed
pub refreshed: u64,
}
impl ConsensusState {
pub fn new(db: &sled::Db, id: u64, genesis: i64) -> Result<ConsensusState> {
let tree = db.open_tree(SLED_CONSESUS_STATE_TREE)?;
let consensus = if let Some(found) = tree.get(id.to_ne_bytes())? {
deserialize(&found).unwrap()
} else {
let consensus = ConsensusState {
genesis: Timestamp(genesis),
proposals: Vec::new(),
orphan_votes: Vec::new(),
participant: None,
participants: BTreeMap::new(),
pending_participants: vec![],
refreshed: 0,
};
let serialized = serialize(&consensus);
tree.insert(id.to_ne_bytes(), serialized)?;
consensus
};
Ok(consensus)
}
}
/// Auxilary structure used for consensus syncing.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct ConsensusRequest {
/// Validator id
pub id: u64,
}
impl net::Message for ConsensusRequest {
fn name() -> &'static str {
"consensusrequest"
}
}
/// Auxilary structure used for consensus syncing.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ConsensusResponse {
/// Hot/live data used by the consensus algorithm
pub consensus: ConsensusState,
}
impl net::Message for ConsensusResponse {
fn name() -> &'static str {
"consensusresponse"
}
}
/// Atomic pointer to validator state.
pub type ValidatorStatePtr = Arc<RwLock<ValidatorState>>;
/// This struct represents the state of a validator node.
pub struct ValidatorState {
/// Validator id
pub id: u64,
/// Secret key, to sign messages
pub secret: SecretKey,
/// Validator public key
pub public: PublicKey,
/// Sled database for storage
pub db: sled::Db,
/// Hot/live data used by the consensus algorithm
pub consensus: ConsensusState,
/// Canonical (finalized) blockchain
pub blockchain: Blockchain,
/// Pending transactions
pub unconfirmed_txs: Vec<Tx>,
/// Genesis block hash, used for validations
pub genesis_block: blake3::Hash,
/// Participation flag
pub participating: bool,
}
impl ValidatorState {
pub fn new(db_path: PathBuf, id: u64, genesis: i64) -> Result<ValidatorStatePtr> {
// TODO: clock sync
let secret = SecretKey::random(&mut OsRng);
let db = sled::open(db_path)?;
let public = PublicKey::from_secret(secret);
let consensus = ConsensusState::new(&db, id, genesis)?;
let blockchain = Blockchain::new(&db, genesis)?;
let unconfirmed_txs = Vec::new();
let genesis_block = blake3::hash(&serialize(&Block::genesis_block(genesis)));
let participating = false;
Ok(Arc::new(RwLock::new(ValidatorState {
id,
secret,
public,
db,
consensus,
blockchain,
unconfirmed_txs,
genesis_block,
participating,
})))
}
/// Node retreives a transaction and append it to the unconfirmed transactions list.
/// Additional validity rules must be defined by the protocol for transactions.
pub fn append_tx(&mut self, tx: Tx) -> bool {
if self.unconfirmed_txs.contains(&tx) {
return false
}
self.unconfirmed_txs.push(tx);
true
}
/// Node calculates seconds until next epoch starting time.
/// Epochs duration is configured using the delta value.
pub fn next_epoch_start(&self) -> Duration {
let start_time = NaiveDateTime::from_timestamp(self.consensus.genesis.0, 0);
let current_epoch = self.current_epoch() + 1;
let next_epoch_start_timestamp =
(current_epoch * (2 * DELTA)) + (start_time.timestamp() as u64);
let next_epoch_start =
NaiveDateTime::from_timestamp(next_epoch_start_timestamp.try_into().unwrap(), 0);
let current_time = NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0);
let diff = next_epoch_start - current_time;
Duration::new(diff.num_seconds().try_into().unwrap(), 0)
}
/// Node calculates current epoch, based on elapsed time from the genesis block.
/// Epochs duration is configured using the delta value.
pub fn current_epoch(&self) -> u64 {
self.consensus.genesis.clone().elapsed() / (2 * DELTA)
}
/// Finds the last epoch a proposal or block was generated.
pub fn last_epoch(&self) -> Result<u64> {
let mut epoch = 0;
for chain in &self.consensus.proposals {
for proposal in &chain.proposals {
if proposal.block.sl > epoch {
epoch = proposal.block.sl;
}
}
}
// We return here in case proposals exist,
// so we don't query the sled database.
if epoch > 0 {
return Ok(epoch)
}
let (last_sl, _) = self.blockchain.last()?.unwrap();
Ok(last_sl)
}
/// Node finds epochs leader, using a simple hash method.
/// Leader calculation is based on how many nodes are participating in the network.
pub fn epoch_leader(&mut self) -> u64 {
let epoch = self.current_epoch();
// DefaultHasher is used to hash the epoch number
// because it produces a number string which then can be modulated by the len.
// blake3 produces alphanumeric
let mut hasher = DefaultHasher::new();
epoch.hash(&mut hasher);
let pos = hasher.finish() % (self.consensus.participants.len() as u64);
// Since BTreeMap orders by key in asceding order, each node will have
// the same key in calculated position.
self.consensus.participants.iter().nth(pos as usize).unwrap().1.id
}
/// Node checks if they are the current epoch leader.
pub fn is_epoch_leader(&mut self) -> bool {
let leader = self.epoch_leader();
self.id == leader
}
/// Node generates a block proposal for the current epoch,
/// containing all uncorfirmed transactions.
/// Proposal extends the longest notarized fork chain the node holds.
pub fn propose(&self) -> Result<Option<BlockProposal>> {
let epoch = self.current_epoch();
let (previous_hash, index) = self.longest_notarized_chain_last_hash().unwrap();
let unproposed_txs = self.unproposed_txs(index);
let metadata = Metadata::new(
get_current_time(),
String::from("proof"),
String::from("r"),
String::from("s"),
);
let sm = StreamletMetadata::new(self.consensus.participants.values().cloned().collect());
let signed_block = self.secret.sign(
BlockProposal::to_proposal_hash(previous_hash, epoch, &unproposed_txs, &metadata)
.as_bytes(),
);
Ok(Some(BlockProposal::new(
self.public,
signed_block,
self.id,
previous_hash,
epoch,
unproposed_txs,
metadata,
sm,
)))
}
/// Node retrieves all unconfirmed transactions not proposed
/// in previous blocks of provided index chain.
pub fn unproposed_txs(&self, index: i64) -> Vec<Tx> {
let mut unproposed_txs = self.unconfirmed_txs.clone();
// If index is -1(canonical blockchain) a new fork chain will be generated,
// therefore all unproposed transactions can be included in the proposal.
if index == -1 {
return unproposed_txs
}
// We iterate the fork chain proposals to find already proposed transactions
// and remove them from the local unproposed_txs vector.
let chain = &self.consensus.proposals[index as usize];
for proposal in &chain.proposals {
for tx in &proposal.txs {
if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) {
unproposed_txs.remove(pos);
}
}
}
unproposed_txs
}
/// Finds the longest fully notarized blockchain the node holds and returns the last block hash
/// and the chain index.
pub fn longest_notarized_chain_last_hash(&self) -> Result<(blake3::Hash, i64)> {
let mut longest_notarized_chain: Option<ProposalsChain> = None;
let mut length = 0;
let mut index = -1;
if !self.consensus.proposals.is_empty() {
for (i, chain) in self.consensus.proposals.iter().enumerate() {
if chain.notarized() && chain.proposals.len() > length {
longest_notarized_chain = Some(chain.clone());
length = chain.proposals.len();
index = i as i64;
}
}
}
let hash = match longest_notarized_chain {
Some(chain) => chain.proposals.last().unwrap().hash(),
None => self.blockchain.last()?.unwrap().1,
};
Ok((hash, index))
}
/// Node receives the proposed block, verifies its sender(epoch leader),
/// and proceeds with voting on it.
pub fn receive_proposal(&mut self, proposal: &BlockProposal) -> Result<Option<Vote>> {
// Node hasn't started participating
if !self.participating {
return Ok(None)
}
// Node refreshes participants records
self.refresh_participants()?;
let leader = self.epoch_leader();
if leader != proposal.id {
debug!(
"Received proposal not from epoch leader ({:?}). Proposer: {:?}",
leader, proposal.id
);
return Ok(None)
}
if !proposal.public_key.verify(
BlockProposal::to_proposal_hash(
proposal.st,
proposal.sl,
&proposal.txs,
&proposal.metadata,
)
.as_bytes(),
&proposal.signature,
) {
debug!("Proposer signature couldn't be verified. Proposer: {:?}", proposal.id);
return Ok(None)
}
self.vote(proposal)
}
/// Given a proposal, node finds which blockchain it extends.
/// If proposal extends the canonical blockchain, a new fork chain is created.
/// Node votes on the proposal, only if it extends the longest notarized fork chain it has seen.
pub fn vote(&mut self, proposal: &BlockProposal) -> Result<Option<Vote>> {
let mut proposal = proposal.clone();
// Generate proposal hash
let proposal_hash = proposal.hash();
// Add orphan votes
let mut orphans = Vec::new();
for vote in self.consensus.orphan_votes.iter() {
if vote.proposal == proposal_hash {
proposal.sm.votes.push(vote.clone());
orphans.push(vote.clone());
}
}
for vote in orphans {
self.consensus.orphan_votes.retain(|v| *v != vote);
}
let index = self.find_extended_chain_index(&proposal).unwrap();
if index == -2 {
return Ok(None)
}
let chain = match index {
-1 => {
let proposalschain = ProposalsChain::new(proposal.clone());
self.consensus.proposals.push(proposalschain);
self.consensus.proposals.last().unwrap()
}
_ => {
self.consensus.proposals[index as usize].add(&proposal, &self.genesis_block);
&self.consensus.proposals[index as usize]
}
};
if self.extends_notarized_chain(chain) {
let signed_hash = self.secret.sign(&serialize(&proposal_hash)[..]);
return Ok(Some(Vote::new(
self.public,
signed_hash,
proposal_hash,
proposal.sl,
self.id,
)))
}
Ok(None)
}
/// Node verifies if provided chain is notarized excluding the last block.
pub fn extends_notarized_chain(&self, chain: &ProposalsChain) -> bool {
if chain.proposals.len() > 1 {
for proposal in &chain.proposals[..(chain.proposals.len() - 1)] {
if !proposal.sm.notarized {
return false
}
}
}
true
}
/// Given a proposal, node finds the index of the chain it extends.
pub fn find_extended_chain_index(&mut self, proposal: &BlockProposal) -> Result<i64> {
for (index, chain) in self.consensus.proposals.iter().enumerate() {
let last = chain.proposals.last().unwrap();
let hash = last.hash();
if proposal.st == hash && proposal.sl > last.sl {
return Ok(index as i64)
}
if proposal.st == last.st && proposal.sl == last.sl {
debug!("Proposal already received.");
return Ok(-2)
}
}
let (last_sl, last_block) = self.blockchain.last()?.unwrap();
if proposal.st != last_block || proposal.sl <= last_sl {
error!("Proposal doesn't extend any known chains.");
return Ok(-2)
}
Ok(-1)
}
/// Node receives a vote for a proposal.
/// First, sender is verified using their public key.
/// Proposal is searched in nodes fork chains.
/// If the vote wasn't received before, it is appended to proposal votes list.
/// When a node sees 2n/3 votes for a proposal it notarizes it.
/// When a proposal gets notarized, the transactions it contains are removed from
/// nodes unconfirmed transactions list.
/// Finally, we check if the notarization of the proposal can finalize parent proposals
/// in its chain.
pub fn receive_vote(&mut self, vote: &Vote) -> Result<(bool, Option<Vec<BlockInfo>>)> {
// Node hasn't started participating
if !self.participating {
return Ok((false, None))
}
let mut encoded_proposal = vec![];
let result = vote.proposal.encode(&mut encoded_proposal);
match result {
Ok(_) => (),
Err(e) => {
error!("Proposal encoding failed. Error: {:?}", e);
return Ok((false, None))
}
};
if !vote.public_key.verify(&encoded_proposal[..], &vote.vote) {
debug!("Voter signature couldn't be verified. Voter: {:?}", vote.id);
return Ok((false, None))
}
// Node refreshes participants records
self.refresh_participants()?;
let nodes_count = self.consensus.participants.len();
// Checking that the voter can actually vote.
match self.consensus.participants.get(&vote.id) {
Some(participant) => {
if self.current_epoch() <= participant.joined {
debug!("Voter joined after current epoch. Voter: {:?}", vote.id);
return Ok((false, None))
}
}
None => {
debug!("Voter is not a participant. Voter: {:?}", vote.id);
return Ok((false, None))
}
}
let proposal = self.find_proposal(&vote.proposal).unwrap();
if proposal == None {
debug!("Received vote for unknown proposal.");
if !self.consensus.orphan_votes.contains(vote) {
self.consensus.orphan_votes.push(vote.clone());
}
return Ok((false, None))
}
let (unwrapped, chain_index) = proposal.unwrap();
if !unwrapped.sm.votes.contains(vote) {
unwrapped.sm.votes.push(vote.clone());
let mut to_broadcast = Vec::new();
if !unwrapped.sm.notarized && unwrapped.sm.votes.len() > (2 * nodes_count / 3) {
unwrapped.sm.notarized = true;
to_broadcast = self.chain_finalization(chain_index)?;
}
// updating participant vote
let exists = self.consensus.participants.get(&vote.id);
let mut participant = match exists {
Some(p) => p.clone(),
None => Participant::new(vote.id, vote.sl),
};
match participant.voted {
Some(voted) => {
if vote.sl > voted {
participant.voted = Some(vote.sl);
}
}
None => participant.voted = Some(vote.sl),
}
self.consensus.participants.insert(participant.id, participant);
return Ok((true, Some(to_broadcast)))
}
return Ok((false, None))
}
/// Node searches it the chains it holds for provided proposal.
pub fn find_proposal(
&mut self,
vote_proposal: &blake3::Hash,
) -> Result<Option<(&mut BlockProposal, i64)>> {
for (index, chain) in &mut self.consensus.proposals.iter_mut().enumerate() {
for proposal in chain.proposals.iter_mut().rev() {
let proposal_hash = proposal.hash();
if vote_proposal == &proposal_hash {
return Ok(Some((proposal, index as i64)))
}
}
}
Ok(None)
}
/// Note removes provided transactions vector, from unconfirmed_txs, if they exist.
pub fn remove_txs(&mut self, transactions: Vec<Tx>) -> Result<()> {
for tx in transactions {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) {
self.unconfirmed_txs.remove(pos);
}
}
Ok(())
}
/// Provided an index, node checks if chain can be finalized.
/// Consensus finalization logic: If node has observed the notarization of 3 consecutive
/// proposals in a fork chain, it finalizes (appends to canonical blockchain) all proposals up to the middle block.
/// When fork chain proposals are finalized, rest fork chains not starting by those proposals are removed.
pub fn chain_finalization(&mut self, chain_index: i64) -> Result<Vec<BlockInfo>> {
let mut to_broadcast = Vec::new();
let chain = &mut self.consensus.proposals[chain_index as usize];
let len = chain.proposals.len();
if len > 2 {
let mut consecutive = 0;
for proposal in &chain.proposals {
if proposal.sm.notarized {
consecutive += 1;
} else {
break
}
}
if consecutive > 2 {
let mut finalized = Vec::new();
for proposal in &mut chain.proposals[..(consecutive - 1)] {
proposal.sm.finalized = true;
finalized.push(proposal.clone());
}
chain.proposals.drain(0..(consecutive - 1));
for proposal in &finalized {
self.blockchain.add_by_proposal(proposal.clone())?;
self.remove_txs(proposal.txs.clone())?;
to_broadcast.push(BlockInfo::new(
proposal.st,
proposal.sl,
proposal.txs.clone(),
proposal.metadata.clone(),
proposal.sm.clone(),
));
}
let (last_sl, last_block) = self.blockchain.last()?.unwrap();
let mut dropped = Vec::new();
for chain in self.consensus.proposals.iter() {
let first = chain.proposals.first().unwrap();
if first.st != last_block || first.sl <= last_sl {
dropped.push(chain.clone());
}
}
for chain in dropped {
self.consensus.proposals.retain(|c| *c != chain);
}
// Remove orphan votes
let mut orphans = Vec::new();
for vote in self.consensus.orphan_votes.iter() {
if vote.sl <= last_sl {
orphans.push(vote.clone());
}
}
for vote in orphans {
self.consensus.orphan_votes.retain(|v| *v != vote);
}
}
}
Ok(to_broadcast)
}
/// Append node participant identity to the pending participants list.
pub fn append_self_participant(&mut self, participant: Participant) {
self.consensus.participant = Some(participant.clone());
self.append_participant(participant);
}
/// Node retreives a new participant and appends it to the pending participants list.
pub fn append_participant(&mut self, participant: Participant) -> bool {
if self.consensus.pending_participants.contains(&participant) {
return false
}
self.consensus.pending_participants.push(participant);
true
}
/// Refresh the participants map, to retain only the active ones.
/// Active nodes are considered those that on the epoch the last proposal
/// was generated, either voted or joined the previous epoch.
/// That ensures we cover the case of chosen leader beign inactive.
pub fn refresh_participants(&mut self) -> Result<()> {
// Node checks if it should refresh its participants list
let epoch = self.current_epoch();
if epoch <= self.consensus.refreshed {
debug!("refresh_participants(): Participants have been refreshed this epoch.");
return Ok(())
}
debug!("refresh_participants(): Adding pending participants");
for participant in &self.consensus.pending_participants {
self.consensus.participants.insert(participant.id, participant.clone());
}
if self.consensus.participants.is_empty() {
debug!(
"refresh_participants(): Didn't manage to add any participant, pending were empty."
);
}
self.consensus.pending_participants = vec![];
let mut inactive = Vec::new();
let mut last_epoch = self.last_epoch()?;
// This check ensures that we don't chech the current epoch,
// as a node might receive the proposal of current epoch before
// starting refreshing participants, so the last_epoch will be
// the current one.
if last_epoch >= epoch {
last_epoch = epoch - 1;
}
let previous_epoch = last_epoch - 1;
error!(
"refresh_participants(): Checking epochs: previous - {:?}, last - {:?}",
previous_epoch, last_epoch
);
for (index, participant) in self.consensus.participants.clone().iter() {
match participant.voted {
Some(epoch) => {
if epoch < last_epoch {
warn!("refresh_participants(): Inactive participant: {:?}", participant);
inactive.push(*index);
}
}
None => {
if participant.joined < previous_epoch {
warn!("refresh_participants(): Inactive participant: {:?}", participant);
inactive.push(*index);
}
}
}
}
for index in inactive {
self.consensus.participants.remove(&index);
}
if self.consensus.participants.is_empty() {
// If no nodes are active, node becomes a single node network.
let mut participant = self.consensus.participant.clone().unwrap();
participant.joined = epoch;
self.consensus.participant = Some(participant.clone());
self.consensus.participants.insert(participant.id, participant.clone());
}
self.consensus.refreshed = epoch;
Ok(())
}
/// Util function to save the current consensus state to provided file path.
pub fn save_consensus_state(&self) -> Result<()> {
let tree = self.db.open_tree(SLED_CONSESUS_STATE_TREE).unwrap();
let serialized = serialize(&self.consensus);
match tree.insert(self.id.to_ne_bytes(), serialized) {
Err(_) => Err(Error::OperationFailed),
_ => Ok(()),
}
}
/// Util function to reset the current consensus state.
pub fn reset_consensus_state(&mut self) -> Result<()> {
let genesis = self.consensus.genesis.clone();
let consensus = ConsensusState {
genesis,
proposals: Vec::new(),
orphan_votes: Vec::new(),
participant: None,
participants: BTreeMap::new(),
pending_participants: vec![],
refreshed: 0,
};
self.consensus = consensus;
Ok(())
}
}

View File

@@ -0,0 +1,78 @@
use std::io;
use darkfi::{
impl_vec, net,
util::serial::{
deserialize, serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt,
},
Result,
};
const SLED_TX_TREE: &[u8] = b"_transactions";
/// Temporary structure used to represent transactions.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct Tx {
pub payload: String,
}
impl net::Message for Tx {
fn name() -> &'static str {
"tx"
}
}
impl_vec!(Tx);
#[derive(Debug)]
pub struct TxStore(sled::Tree);
impl TxStore {
pub fn new(db: &sled::Db) -> Result<Self> {
let tree = db.open_tree(SLED_TX_TREE)?;
Ok(Self(tree))
}
/// Insert a tx into the txstore.
/// The tx is hashed with blake3 and this txhash is used as
/// the key, where value is the serialized tx itself.
pub fn insert(&self, tx: &Tx) -> Result<blake3::Hash> {
let serialized = serialize(tx);
let txhash = blake3::hash(&serialized);
self.0.insert(txhash.as_bytes(), serialized)?;
Ok(txhash)
}
/// Fetch given transactions from the txstore.
/// The resulting vector contains `Option` which is `Some` if the tx
/// was found in the txstore, and `None`, if it has not.
pub fn get(&self, txhashes: &[blake3::Hash]) -> Result<Vec<Option<Tx>>> {
let mut ret: Vec<Option<Tx>> = Vec::with_capacity(txhashes.len());
for i in txhashes {
if let Some(found) = self.0.get(i.as_bytes())? {
let tx = deserialize(&found)?;
ret.push(Some(tx));
} else {
ret.push(None);
}
}
Ok(ret)
}
/// Retrieve all transactions.
/// Be carefull as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<Option<(blake3::Hash, Tx)>>> {
let mut txs = Vec::new();
let mut iterator = self.0.into_iter().enumerate();
while let Some((_, r)) = iterator.next() {
let (k, v) = r.unwrap();
let hash_bytes: [u8; 32] = k.as_ref().try_into().unwrap();
let tx = deserialize(&v)?;
txs.push(Some((hash_bytes.into(), tx)));
}
Ok(txs)
}
}

View File

@@ -0,0 +1,28 @@
use chrono::{NaiveDateTime, Utc};
use darkfi::util::serial::{SerialDecodable, SerialEncodable};
/// Serialized blake3 hash bytes for character "⊥"
pub const EMPTY_HASH_BYTES: [u8; 32] = [
254, 233, 82, 102, 23, 208, 153, 87, 96, 165, 163, 194, 238, 7, 1, 88, 14, 1, 249, 118, 197,
29, 180, 211, 87, 66, 59, 38, 86, 54, 12, 39,
];
/// Util structure to represend chrono UTC timestamps.
#[derive(Debug, Clone, PartialEq, SerialDecodable, SerialEncodable)]
pub struct Timestamp(pub i64);
impl Timestamp {
/// Calculates elapsed time of a Timestamp.
pub fn elapsed(self) -> u64 {
let start_time = NaiveDateTime::from_timestamp(self.0, 0);
let end_time = NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0);
let diff = end_time - start_time;
diff.num_seconds().try_into().unwrap()
}
}
/// Util function to generate a Timestamp of current time.
pub fn get_current_time() -> Timestamp {
Timestamp(Utc::now().timestamp())
}

View File

@@ -0,0 +1,43 @@
use std::io;
use darkfi::{
crypto::{keypair::PublicKey, schnorr::Signature},
impl_vec, net,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
/// This struct represents a Vote, used by Streamlet consensus.
#[derive(Debug, Clone, PartialEq, SerialDecodable, SerialEncodable)]
pub struct Vote {
/// Node public key
pub public_key: PublicKey,
/// signed block
pub vote: Signature,
/// block proposal hash to vote on
pub proposal: blake3::Hash,
/// Slot uid, generated by the beacon
pub sl: u64,
/// node id
pub id: u64,
}
impl Vote {
pub fn new(
public_key: PublicKey,
vote: Signature,
proposal: blake3::Hash,
sl: u64,
id: u64,
) -> Vote {
Vote { public_key, vote, proposal, sl, id }
}
}
impl net::Message for Vote {
fn name() -> &'static str {
"vote"
}
}
impl_vec!(Vote);

View File

@@ -1 +0,0 @@
pub mod protocols;

View File

@@ -11,11 +11,12 @@ use structopt::StructOpt;
use structopt_toml::StructOptToml;
use darkfi::{
consensus::{
consensus2::{
block::{BlockOrder, BlockResponse},
participant::Participant,
state::{ConsensusRequest, ConsensusResponse, ValidatorState, ValidatorStatePtr},
tx::Tx,
proto::{ProtocolSync, ProtocolTx, ProtocolVote, ProtocolProposal, ProtocolParticipant, ProtocolSyncConsensus}
},
net,
rpc::{
@@ -35,12 +36,6 @@ use darkfi::{
Result,
};
use validatord::protocols::{
protocol_participant::ProtocolParticipant, protocol_proposal::ProtocolProposal,
protocol_sync::ProtocolSync, protocol_sync_consensus::ProtocolSyncConsensus,
protocol_tx::ProtocolTx, protocol_vote::ProtocolVote,
};
const CONFIG_FILE: &str = r"validatord_config.toml";
const CONFIG_FILE_CONTENTS: &[u8] = include_bytes!("../validatord_config.toml");
@@ -124,7 +119,7 @@ async fn syncing_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()>
// Nodes sends the last known block hash of the canonical blockchain
// and loops until the respond is the same block (used to utilize batch requests)
let mut last = state.read().unwrap().blockchain.last()?.unwrap();
let mut last = state.read().await.blockchain.last()?.unwrap();
info!("Last known block: {:?} - {:?}", last.0, last.1);
loop {
// Node creates a BlockOrder and sends it
@@ -134,9 +129,9 @@ async fn syncing_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()>
// Node stores responce data. Extra validations can be added here.
let response = response_sub.receive().await?;
for info in &response.blocks {
state.write().unwrap().blockchain.add_by_info(info.clone())?;
state.write().await.blockchain.add_by_info(info.clone())?;
}
let last_received = state.read().unwrap().blockchain.last()?.unwrap();
let last_received = state.read().await.blockchain.last()?.unwrap();
info!("Last received block: {:?} - {:?}", last_received.0, last_received.1);
if last == last_received {
break
@@ -167,15 +162,15 @@ async fn syncing_consensus_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> R
.expect("Missing ConsensusResponse dispatcher!");
// Node creates a ConsensusRequest and sends it
let request = ConsensusRequest { id: state.read().unwrap().id };
let request = ConsensusRequest { id: state.read().await.id };
channel.send(request).await?;
// Node stores responce data. Extra validations can be added here.
let response = response_sub.receive().await?;
state.write().unwrap().consensus = response.consensus.clone();
state.write().await.consensus = response.consensus.clone();
} else {
info!("Node is not connected to other nodes, resetting consensus state.");
state.write().unwrap().reset_consensus_state()?;
state.write().await.reset_consensus_state()?;
}
info!("Node synced!");
@@ -185,7 +180,7 @@ async fn syncing_consensus_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> R
async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
// Node waits just before the current or next epoch end,
// so it can start syncing latest state.
let mut seconds_until_next_epoch = state.read().unwrap().next_epoch_start();
let mut seconds_until_next_epoch = state.read().await.next_epoch_start();
let one_sec = Duration::new(1, 0);
loop {
if seconds_until_next_epoch > one_sec {
@@ -194,7 +189,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
}
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
seconds_until_next_epoch = state.read().unwrap().next_epoch_start();
seconds_until_next_epoch = state.read().await.next_epoch_start();
}
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
@@ -208,8 +203,8 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
// Node signals the network that it will start participating
let participant =
Participant::new(state.read().unwrap().id, state.read().unwrap().current_epoch());
state.write().unwrap().append_participant(participant.clone());
Participant::new(state.read().await.id, state.read().await.current_epoch());
state.write().await.append_self_participant(participant.clone());
let result = p2p.broadcast(participant.clone()).await;
match result {
Ok(()) => info!("Participation message broadcasted successfuly."),
@@ -217,20 +212,20 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
}
// After initialization node waits for next epoch to start participating
let seconds_until_next_epoch = state.read().unwrap().next_epoch_start();
let seconds_until_next_epoch = state.read().await.next_epoch_start();
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
// Node modifies its participating flag to true
state.write().unwrap().participating = true;
state.write().await.participating = true;
loop {
// Node refreshes participants records
state.write().unwrap().refresh_participants();
state.write().await.refresh_participants();
// Node checks if its the epoch leader to generate a new proposal for that epoch
let result = if state.write().unwrap().is_epoch_leader() {
state.read().unwrap().propose()
let result = if state.write().await.is_epoch_leader() {
state.read().await.propose()
} else {
Ok(None)
};
@@ -242,14 +237,14 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
// Leader creates a vote for the proposal and broadcasts them both
let unwrapped = proposal.unwrap();
info!("Node is the epoch leader. Proposed block: {:?}", unwrapped);
let vote = state.write().unwrap().receive_proposal(&unwrapped);
let vote = state.write().await.receive_proposal(&unwrapped);
match vote {
Ok(x) => {
if x.is_none() {
error!("Node did not vote for the proposed block.");
} else {
let vote = x.unwrap();
let result = state.write().unwrap().receive_vote(&vote);
let result = state.write().await.receive_vote(&vote);
match result {
Ok(_) => info!("Vote saved successfuly."),
Err(e) => error!("Vote save failed. Error: {:?}", e),
@@ -278,7 +273,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
}
// Current node state is flushed to sled database
let result = state.read().unwrap().save_consensus_state();
let result = state.read().await.save_consensus_state();
match result {
Ok(()) => (),
Err(e) => {
@@ -287,7 +282,7 @@ async fn proposal_task(p2p: net::P2pPtr, state: ValidatorStatePtr) {
};
// Node waits until next epoch
let seconds_until_next_epoch = state.read().unwrap().next_epoch_start();
let seconds_until_next_epoch = state.read().await.next_epoch_start();
info!("Waiting for next epoch({:?} sec)...", seconds_until_next_epoch);
thread::sleep(seconds_until_next_epoch);
}
@@ -491,7 +486,7 @@ impl JsonRpcInterface {
let payload = String::from(args[0].as_str().unwrap());
let tx = Tx { payload };
self.state.write().unwrap().append_tx(tx.clone());
self.state.write().await.append_tx(tx.clone());
let result = self.p2p.broadcast(tx).await;
match result {

View File

@@ -1,8 +1,8 @@
use async_executor::Executor;
use async_trait::async_trait;
use darkfi::{
consensus::{participant::Participant, state::ValidatorStatePtr},
use darkfi::{
consensus::{participant::Participant, state::ValidatorStatePtr},
net::{
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,