consensus/proto/protocol_sync:handle_receive_block(): consensus nodes exit at function start, nodes-tool: upgraded to latest versions

This commit is contained in:
aggstam
2022-05-04 20:35:58 +03:00
parent 5127263cd3
commit be7fd5fef2
2 changed files with 80 additions and 86 deletions

View File

@@ -13,12 +13,13 @@ use darkfi::{
metadata::{Metadata, OuroborosMetadata, StreamletMetadata},
participant::Participant,
state::{ConsensusState, ValidatorState},
tx::Tx,
util::Timestamp,
vote::Vote,
TESTNET_GENESIS_HASH_BYTES,
},
crypto::token_list::DrkTokenList,
node::Client,
tx::Transaction,
util::expand_path,
wallet::walletdb::init_wallet,
Result,
@@ -115,7 +116,7 @@ struct ProposalInfo {
_address: String,
_st: blake3::Hash,
_sl: u64,
_txs: Vec<Tx>,
_txs: Vec<Transaction>,
_metadata: MetadataInfo,
_sm: StreamletMetadataInfo,
}
@@ -192,11 +193,8 @@ impl BlockInfoChain {
let result = blockstore.get_all();
match result {
Ok(iter) => {
for item in iter.iter() {
match item {
Some((hash, block)) => _blocks.push(BlockInfo::new(hash.clone(), &block)),
None => (),
};
for (hash, block) in iter.iter() {
_blocks.push(BlockInfo::new(hash.clone(), &block));
}
}
Err(e) => println!("Error: {:?}", e),
@@ -228,13 +226,8 @@ impl BlockOrderStoreInfo {
let result = orderstore.get_all();
match result {
Ok(iter) => {
for item in iter.iter() {
match item {
Some((slot, hash)) => {
_order.push(OrderInfo::new(slot.clone(), hash.clone()))
}
None => (),
};
for (slot, hash) in iter.iter() {
_order.push(OrderInfo::new(slot.clone(), hash.clone()));
}
}
Err(e) => println!("Error: {:?}", e),
@@ -246,11 +239,11 @@ impl BlockOrderStoreInfo {
#[derive(Debug)]
struct TxInfo {
_hash: blake3::Hash,
_payload: Tx,
_payload: Transaction,
}
impl TxInfo {
pub fn new(_hash: blake3::Hash, tx: &Tx) -> TxInfo {
pub fn new(_hash: blake3::Hash, tx: &Transaction) -> TxInfo {
let _payload = tx.clone();
TxInfo { _hash, _payload }
}
@@ -267,11 +260,8 @@ impl TxStoreInfo {
let result = txstore.get_all();
match result {
Ok(iter) => {
for item in iter.iter() {
match item {
Some((hash, tx)) => _transactions.push(TxInfo::new(hash.clone(), &tx)),
None => (),
};
for (hash, tx) in iter.iter() {
_transactions.push(TxInfo::new(hash.clone(), &tx));
}
}
Err(e) => println!("Error: {:?}", e),
@@ -304,13 +294,8 @@ impl MetadataStoreInfo {
let result = metadatastore.get_all();
match result {
Ok(iter) => {
for item in iter.iter() {
match item {
Some((hash, m)) => {
_metadata.push(HashedMetadataInfo::new(hash.clone(), &m))
}
None => (),
};
for (hash, m) in iter.iter() {
_metadata.push(HashedMetadataInfo::new(hash.clone(), &m));
}
}
Err(e) => println!("Error: {:?}", e),
@@ -364,7 +349,13 @@ async fn main() -> Result<()> {
let path = format!("../../../tmp/node{:?}/wallet.db", i);
let wallet = init_wallet(&path, &pass).await?;
let address = wallet.get_default_address().await?;
let client = Arc::new(Client::new(wallet).await?);
let tokenlist = Arc::new(DrkTokenList::new(&[
("drk", include_bytes!("../../../../contrib/token/darkfi_token_list.min.json")),
("btc", include_bytes!("../../../../contrib/token/bitcoin_token_list.min.json")),
("eth", include_bytes!("../../../../contrib/token/erc20_token_list.min.json")),
("sol", include_bytes!("../../../../contrib/token/solana_token_list.min.json")),
])?);
let client = Arc::new(Client::new(wallet, tokenlist).await?);
// Initialize or load sled database
let path = format!("../../../tmp/node{:?}/blockchain/testnet", i);

View File

@@ -88,6 +88,13 @@ impl ProtocolSync {
}
async fn handle_receive_block(self: Arc<Self>) -> Result<()> {
// Consensus-mode enabled nodes have already performed these steps,
// during proposal finalization.
if self.consensus_mode {
debug!("handle_receive_block(): node runs in consensus mode, skipping...");
return Ok(())
}
debug!("handle_receive_block() [START]");
loop {
let info = match self.block_sub.receive().await {
@@ -107,70 +114,66 @@ impl ProtocolSync {
debug!("handle_receive_block(): Pending lock released");
// Node stores finalized block, if it doesn't exist (checking by slot),
// and removes its transactions from the unconfirmed_txs vector.
// Consensus-mode enabled nodes have already performed these steps,
// during proposal finalization.
// and removes its transactions from the unconfirmed_txs vector.
// Extra validations can be added here.
if !self.consensus_mode {
*self.pending.lock().await = true;
let info_copy = (*info).clone();
*self.pending.lock().await = true;
let info_copy = (*info).clone();
let has_block = match self.state.read().await.blockchain.has_block(&info_copy) {
Ok(v) => v,
Err(e) => {
error!("handle_receive_block(): failed checking for has_block(): {}", e);
*self.pending.lock().await = false;
continue
}
let has_block = match self.state.read().await.blockchain.has_block(&info_copy) {
Ok(v) => v,
Err(e) => {
error!("handle_receive_block(): failed checking for has_block(): {}", e);
*self.pending.lock().await = false;
continue
}
};
if !has_block {
debug!("handle_receive_block(): Starting state transition validation");
let canon_state_clone =
self.state.read().await.state_machine.lock().await.clone();
let mem_state = MemoryState::new(canon_state_clone);
let state_updates =
match ValidatorState::validate_state_transitions(mem_state, &info.txs) {
Ok(v) => v,
Err(e) => {
warn!("handle_receive_block(): State transition fail: {}", e);
*self.pending.lock().await = false;
continue
}
};
debug!("handle_receive_block(): All state transitions passed");
debug!("handle_receive_block(): Updating canon state machine");
if let Err(e) =
self.state.write().await.update_canon_state(state_updates, None).await
{
error!("handle_receive_block(): Canon statemachine update fail: {}", e);
*self.pending.lock().await = false;
continue
};
if !has_block {
debug!("handle_receive_block(): Starting state transition validation");
let canon_state_clone =
self.state.read().await.state_machine.lock().await.clone();
let mem_state = MemoryState::new(canon_state_clone);
let state_updates =
match ValidatorState::validate_state_transitions(mem_state, &info.txs) {
Ok(v) => v,
Err(e) => {
warn!("handle_receive_block(): State transition fail: {}", e);
*self.pending.lock().await = false;
continue
}
};
debug!("handle_receive_block(): All state transitions passed");
debug!("handle_receive_block(): Appending block to ledger");
if let Err(e) = self.state.write().await.blockchain.add(&[info_copy.clone()]) {
error!("handle_receive_block(): blockchain.add() fail: {}", e);
*self.pending.lock().await = false;
continue
};
debug!("handle_receive_block(): Updating canon state machine");
if let Err(e) =
self.state.write().await.update_canon_state(state_updates, None).await
{
error!("handle_receive_block(): Canon statemachine update fail: {}", e);
*self.pending.lock().await = false;
continue
};
if let Err(e) = self.state.write().await.remove_txs(info_copy.txs.clone()) {
error!("handle_receive_block(): remove_txs() fail: {}", e);
*self.pending.lock().await = false;
continue
};
debug!("handle_receive_block(): Appending block to ledger");
if let Err(e) = self.state.write().await.blockchain.add(&[info_copy.clone()]) {
error!("handle_receive_block(): blockchain.add() fail: {}", e);
*self.pending.lock().await = false;
continue
};
if let Err(e) = self.state.write().await.remove_txs(info_copy.txs.clone()) {
error!("handle_receive_block(): remove_txs() fail: {}", e);
*self.pending.lock().await = false;
continue
};
if let Err(e) = self.p2p.broadcast(info_copy).await {
error!("handle_receive_block(): p2p broadcast fail: {}", e);
*self.pending.lock().await = false;
continue
};
}
*self.pending.lock().await = false;
if let Err(e) = self.p2p.broadcast(info_copy).await {
error!("handle_receive_block(): p2p broadcast fail: {}", e);
*self.pending.lock().await = false;
continue
};
}
*self.pending.lock().await = false;
}
}
}