nodes-tool: modified to work with darkfid2, syncing: changed blocks retriaval method, forking a fork chain impl added, new consensus simulation script added

This commit is contained in:
aggstam
2022-04-26 19:13:07 +03:00
parent e92efdcf0c
commit fcbc935fd9
8 changed files with 170 additions and 31 deletions

78
script/consensus_simulation.sh Executable file
View File

@@ -0,0 +1,78 @@
#!/bin/bash
# Simulation of the consensus network for n(>2) nodes.
nodes=4
# moving one folder up
cd ..
# compiling bin
make BINS=darkfid2
# PIDs array
pids=()
# Starting node 0 (seed) in background
./darkfid2 \
--consensus \
--consensus-p2p-accept 127.0.0.1:6000 \
--consensus-p2p-external 127.0.0.1:6000 \
--database ./tmp/node0/blockchain \
--rpc-listen tcp://127.0.0.1:6010 \
--sync-p2p-accept 127.0.0.1:6020 \
--sync-p2p-external 127.0.0.1:6020 \
--wallet-path ./tmp/node0/wallet.db \
-g 1650887115 &
pids[${#pids[@]}]=$!
# Waiting for seed to setup
sleep 20
# Starting nodes 1 till second to last node in background
bound=$(($nodes-2))
for i in $(eval echo "{1..$bound}")
do
./darkfid2 \
--consensus \
--consensus-seed 127.0.0.1:6000 \
--sync-seed 127.0.0.1:6020 \
--consensus-p2p-accept 127.0.0.1:600$i \
--consensus-p2p-external 127.0.0.1:600$i \
--database ./tmp/node$i/blockchain \
--rpc-listen tcp://127.0.0.1:601$i \
--sync-p2p-accept 127.0.0.1:602$i \
--sync-p2p-external 127.0.0.1:602$i \
--wallet-path ./tmp/node$i/wallet.db \
-g 1650887115 &
pids[${#pids[@]}]=$!
# waiting for node to setup
sleep 20
done
# Trap kill signal
trap ctrl_c INT
# On kill signal, terminate background node processes
function ctrl_c() {
for pid in ${pids[@]}
do
kill $pid
done
}
bound=$(($nodes-1))
# Starting last node
./darkfid2 \
--consensus \
--consensus-seed 127.0.0.1:6000 \
--sync-seed 127.0.0.1:6020 \
--consensus-p2p-accept 127.0.0.1:600$bound \
--consensus-p2p-external 127.0.0.1:600$bound \
--database ./tmp/node$bound/blockchain \
--rpc-listen tcp://127.0.0.1:601$bound \
--sync-p2p-accept 127.0.0.1:602$bound \
--sync-p2p-external 127.0.0.1:602$bound \
--wallet-path ./tmp/node$bound/wallet.db \
-g 1650887115

View File

@@ -1,3 +1,3 @@
/target
Cargo.lock
validatord_*
node*

View File

@@ -5,7 +5,7 @@ edition = "2021"
[dependencies.darkfi]
path = "../../../"
features = ["blockchain"]
features = ["blockchain", "node", "wallet"]
[dependencies]

View File

@@ -2,10 +2,10 @@ use std::{fs::File, io::Write};
use darkfi::{
blockchain::{
Blockchain,
blockstore::{BlockOrderStore, BlockStore},
metadatastore::StreamletMetadataStore,
txstore::TxStore,
Blockchain,
},
consensus::{
block::{Block, BlockProposal, ProposalChain},
@@ -17,23 +17,25 @@ use darkfi::{
vote::Vote,
TESTNET_GENESIS_HASH_BYTES,
},
node::Client,
util::expand_path,
wallet::walletdb::init_wallet,
Result,
};
#[derive(Debug)]
struct ParticipantInfo {
_id: u64,
_address: String,
_joined: u64,
_voted: Option<u64>,
}
impl ParticipantInfo {
pub fn new(participant: &Participant) -> ParticipantInfo {
let _id = participant.id;
let _address = participant.address.to_string();
let _joined = participant.joined;
let _voted = participant.voted;
ParticipantInfo { _id, _joined, _voted }
ParticipantInfo { _address, _joined, _voted }
}
}
@@ -41,15 +43,15 @@ impl ParticipantInfo {
struct VoteInfo {
_proposal: blake3::Hash,
_sl: u64,
_id: u64,
_address: String,
}
impl VoteInfo {
pub fn new(vote: &Vote) -> VoteInfo {
let _proposal = vote.proposal;
let _sl = vote.sl;
let _id = vote.id;
VoteInfo { _proposal, _sl, _id }
let _address = vote.address.to_string();
VoteInfo { _proposal, _sl, _address }
}
}
@@ -109,7 +111,7 @@ impl MetadataInfo {
#[derive(Debug)]
struct ProposalInfo {
_id: u64,
_address: String,
_st: blake3::Hash,
_sl: u64,
_txs: Vec<Tx>,
@@ -119,13 +121,13 @@ struct ProposalInfo {
impl ProposalInfo {
pub fn new(proposal: &BlockProposal) -> ProposalInfo {
let _id = proposal.id;
let _address = proposal.address.to_string();
let _st = proposal.block.st;
let _sl = proposal.block.sl;
let _txs = proposal.block.txs.clone();
let _metadata = MetadataInfo::new(&proposal.block.metadata);
let _sm = StreamletMetadataInfo::new(&proposal.block.sm);
ProposalInfo { _id, _st, _sl, _txs, _metadata, _sm }
ProposalInfo { _address, _st, _sl, _txs, _metadata, _sm }
}
}
@@ -336,17 +338,17 @@ impl BlockchainInfo {
#[derive(Debug)]
struct StateInfo {
_id: u64,
_address: String,
_consensus: ConsensusInfo,
_blockchain: BlockchainInfo,
}
impl StateInfo {
pub fn new(state: &ValidatorState) -> StateInfo {
let _id = state.id;
let _address = state.address.to_string();
let _consensus = ConsensusInfo::new(&state.consensus);
let _blockchain = BlockchainInfo::new(&state.blockchain);
StateInfo { _id, _consensus, _blockchain }
StateInfo { _address, _consensus, _blockchain }
}
}
@@ -356,16 +358,27 @@ async fn main() -> Result<()> {
let genesis_ts = Timestamp(1648383795);
let genesis_data = *TESTNET_GENESIS_HASH_BYTES;
for i in 0..nodes {
let path = format!("~/.config/darkfi/validatord_db_{:?}", i);
// Initialize or load wallet
let path = format!("../../../tmp/node{:?}/wallet.db", i);
let pass = "changeme";
let wallet = init_wallet(&path, &pass).await?;
Client::new(wallet.clone()).await?;
let address = wallet.get_default_address().await?;
// Initialize or load sled database
let path = format!("../../../tmp/node{:?}/blockchain/testnet", i);
let db_path = expand_path(&path).unwrap();
let sled_db = sled::open(&db_path)?;
println!("Export data from sled database: {:?}", db_path);
let state = ValidatorState::new(&sled_db, i, genesis_ts, genesis_data)?;
// Data export
println!("Exporting data for node{:?} - {:?}", i, address.to_string());
let state = ValidatorState::new(&sled_db, address, genesis_ts, genesis_data)?;
let info = StateInfo::new(&*state.read().await);
let info_string = format!("{:#?}", info);
let path = format!("validatord_state_{:?}", i);
let path = format!("node{:?}_testnet_db", i);
let mut file = File::create(path)?;
file.write(info_string.as_bytes())?;
drop(sled_db);
}
Ok(())

View File

@@ -140,6 +140,27 @@ impl BlockOrderStore {
Ok(ret)
}
/// Retrieve n hashes after given slot.
pub fn get_after(&self, slot: u64, n: u64) -> Result<Vec<blake3::Hash>> {
let mut ret = vec![];
let mut key = slot;
let mut counter = 0;
while counter <= n {
if let Some(found) = self.0.get_gt(key.to_be_bytes())? {
let key_bytes: [u8; 8] = found.0.as_ref().try_into().unwrap();
key = u64::from_be_bytes(key_bytes);
let block_hash = deserialize(&found.1)?;
ret.push(block_hash);
counter = counter + 1;
} else {
break
}
}
Ok(ret)
}
/// Retrieve the last block hash in the tree, based on the Ord
/// implementation for Vec<u8>.
pub fn get_last(&self) -> Result<Option<(u64, blake3::Hash)>> {

View File

@@ -108,6 +108,14 @@ impl Blockchain {
self.get_blocks_by_hash(&hashes)
}
/// Retrieve n blocks after start slot.
pub fn get_blocks_after(&self, slot: u64, n: u64) -> Result<Vec<BlockInfo>> {
debug!("get_blocks_after(): {:?} - {:?}", slot, n);
let hashes = self.order.get_after(slot, n)?;
self.get_blocks_by_hash(&hashes)
}
/// Check if the given [`BlockInfo`] is in the database
pub fn has_block(&self, info: &BlockInfo) -> Result<bool> {
let hashes = match self.order.get(&[info.sl], true) {

View File

@@ -62,10 +62,7 @@ impl ProtocolSync {
// Extra validations can be added here
let key = order.sl;
let range: Vec<u64> = (key..=(key + BATCH)).collect();
debug!("ProtocolSync::handle_receive_request(): Querying block range: {:?}", range);
let blocks = self.state.read().await.blockchain.get_blocks_by_slot(&range)?;
let blocks = self.state.read().await.blockchain.get_blocks_after(key, BATCH)?;
debug!("ProtocolSync::handle_receive_request(): Found {} blocks", blocks.len());
let response = BlockResponse { blocks };

View File

@@ -405,6 +405,7 @@ impl ValidatorState {
/// Given a proposal, find the index of the chain it extends.
pub fn find_extended_chain_index(&mut self, proposal: &BlockProposal) -> Result<i64> {
let mut fork = None;
for (index, chain) in self.consensus.proposals.iter().enumerate() {
let last = chain.proposals.last().unwrap();
let hash = last.hash();
@@ -416,6 +417,23 @@ impl ValidatorState {
debug!("find_extended_chain_index(): Proposal already received");
return Ok(-2)
}
if proposal.block.st == last.block.st && proposal.block.sl > last.block.sl {
fork = Some(chain.clone());
}
}
match fork {
Some(mut chain) => {
debug!("Proposal to fork a forkchain was received.");
chain.proposals.pop(); // removing last block to create the fork
if chain.proposals.len() > 0 {
// if len is 0 we will verify against blockchain last block
self.consensus.proposals.push(chain);
return Ok(self.consensus.proposals.len() as i64 - 1)
}
}
None => (),
}
let (last_sl, last_block) = self.blockchain.last()?.unwrap();
@@ -486,7 +504,7 @@ impl ValidatorState {
};
if proposal.is_none() {
warn!(target: "consensus", "Received vote for unknown proposal.");
debug!(target: "consensus", "Received vote for unknown proposal.");
if !self.consensus.orphan_votes.contains(vote) {
self.consensus.orphan_votes.push(vote.clone());
}
@@ -679,7 +697,7 @@ impl ValidatorState {
self.consensus.participants.insert(participant.address, participant.clone());
}
if self.consensus.participants.is_empty() {
if self.consensus.pending_participants.is_empty() {
debug!(
"refresh_participants(): Didn't manage to add any participant, pending were empty."
);
@@ -702,8 +720,8 @@ impl ValidatorState {
let previous_from_last_epoch = last_epoch - 1;
debug!(
"refresh_participants(): Checking epochs: previous - {:?}, last - {:?}",
previous_epoch, last_epoch
"refresh_participants(): Node {:?} checking epochs: previous - {:?}, last - {:?}, previous from last - {:?}",
self.address.to_string(), previous_epoch, last_epoch, previous_from_last_epoch
);
for (index, participant) in self.consensus.participants.clone().iter() {
@@ -711,8 +729,10 @@ impl ValidatorState {
Some(epoch) => {
if epoch < last_epoch {
warn!(
"refresh_participants(): Inactive participant: {:?}",
participant.address.to_string()
"refresh_participants(): Inactive participant: {:?} (joined {:?}, voted {:?})",
participant.address.to_string(),
participant.joined,
participant.voted
);
inactive.push(*index);
}
@@ -722,8 +742,10 @@ impl ValidatorState {
participant.joined < previous_from_last_epoch
{
warn!(
"refresh_participants(): Inactive participant: {:?}",
participant.address.to_string()
"refresh_participants(): Inactive participant: {:?} (joined {:?}, voted {:?})",
participant.address.to_string(),
participant.joined,
participant.voted
);
inactive.push(*index);
}