diff --git a/bin/darkfid/src/proto/protocol_proposal.rs b/bin/darkfid/src/proto/protocol_proposal.rs index 456917a2e..d0ebab7f9 100644 --- a/bin/darkfid/src/proto/protocol_proposal.rs +++ b/bin/darkfid/src/proto/protocol_proposal.rs @@ -123,7 +123,7 @@ impl ProtocolProposal { let proposal_copy = (*proposal).clone(); - match self.validator.consensus.append_proposal(&proposal_copy.0).await { + match self.validator.append_proposal(&proposal_copy.0).await { Ok(()) => { self.p2p.broadcast_with_exclude(&proposal_copy, &exclude_list).await; if let Some(sync_p2p) = self.sync_p2p.as_ref() { @@ -189,7 +189,12 @@ impl ProtocolProposal { } for proposal in &response.proposals { - self.validator.consensus.append_proposal(proposal).await?; + self.validator.append_proposal(proposal).await?; + let message = ProposalMessage(proposal.clone()); + self.p2p.broadcast_with_exclude(&message, &exclude_list).await; + if let Some(sync_p2p) = self.sync_p2p.as_ref() { + sync_p2p.broadcast_with_exclude(&message, &exclude_list).await; + } // Notify subscriber let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await)); self.subscriber.notify(vec![enc_prop].into()).await; diff --git a/bin/darkfid/src/task/miner.rs b/bin/darkfid/src/task/miner.rs index bde1aea23..6a6a62c16 100644 --- a/bin/darkfid/src/task/miner.rs +++ b/bin/darkfid/src/task/miner.rs @@ -109,16 +109,15 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool) // Start miner loop loop { - // Grab best current fork proposals sequence + // Grab best current fork let forks = node.validator.consensus.forks.read().await; - let fork_index = best_forks_indexes(&forks)?[0]; - let fork_proposals = forks[fork_index].proposals.clone(); + let extended_fork = forks[best_forks_indexes(&forks)?[0]].full_clone()?; drop(forks); // Start listenning for network proposals and mining next block for best fork. smol::future::or( - listen_to_network(node, fork_proposals, &subscription), - mine_next_block(node, fork_index, &mut secret, recipient, &zkbin, &pk), + listen_to_network(node, &extended_fork, &subscription), + mine_next_block(node, &extended_fork, &mut secret, recipient, &zkbin, &pk), ) .await?; @@ -137,10 +136,12 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool) /// Auxiliary function to listen for incoming proposals and check if the best fork has changed async fn listen_to_network( node: &Darkfid, - fork_proposals: Vec, + extended_fork: &Fork, subscription: &Subscription, ) -> Result<()> { - 'main_loop: loop { + // Grab extended fork last proposal hash + let last_proposal_hash = extended_fork.last_proposal()?.hash; + loop { // Wait until a new proposal has been received subscription.receive().await; @@ -152,21 +153,18 @@ async fn listen_to_network( // Iterate to verify if proposals sequence has changed for index in fork_indexes { - if forks[index].proposals == fork_proposals { + if forks[index].last_proposal()?.hash != last_proposal_hash { drop(forks); - continue 'main_loop + return Ok(()) } } - - drop(forks); - return Ok(()) } } /// Auxiliary function to generate and mine provided fork index next block async fn mine_next_block( node: &Darkfid, - fork_index: usize, + extended_fork: &Fork, secret: &mut SecretKey, recipient: &PublicKey, zkbin: &ZkBinary, @@ -174,7 +172,7 @@ async fn mine_next_block( ) -> Result<()> { // Grab next target and block let (next_target, mut next_block) = - generate_next_block(node, fork_index, secret, recipient, zkbin, pk).await?; + generate_next_block(extended_fork, secret, recipient, zkbin, pk).await?; // Execute request to minerd and parse response let target = JsonValue::String(next_target.to_string()); @@ -186,11 +184,11 @@ async fn mine_next_block( next_block.sign(secret)?; // Verify it - node.validator.consensus.module.read().await.verify_current_block(&next_block)?; + extended_fork.module.verify_current_block(&next_block)?; // Append the mined block as a proposal let proposal = Proposal::new(next_block)?; - node.validator.consensus.append_proposal(&proposal).await?; + node.validator.append_proposal(&proposal).await?; // Broadcast proposal to the network let message = ProposalMessage(proposal); @@ -202,52 +200,42 @@ async fn mine_next_block( /// Auxiliary function to generate next block in an atomic manner async fn generate_next_block( - node: &Darkfid, - fork_index: usize, + extended_fork: &Fork, secret: &mut SecretKey, recipient: &PublicKey, zkbin: &ZkBinary, pk: &ProvingKey, ) -> Result<(BigUint, BlockInfo)> { - // Grab a lock over nodes' current forks - let forks = node.validator.consensus.forks.read().await; + // Grab extended fork last proposal hash + let last_proposal = extended_fork.last_proposal()?; - // Grab best current fork - let fork = &forks[fork_index]; - - // Generate new signing key for next block - let next_block_height = fork.get_next_block_height()?; // We are deriving the next secret key for optimization. // Next secret is the poseidon hash of: // [prefix, current(previous) secret, signing(block) height]. let prefix = pallas::Base::from_raw([4, 0, 0, 0]); - let next_secret = poseidon_hash([prefix, secret.inner(), next_block_height.into()]); + let next_secret = + poseidon_hash([prefix, secret.inner(), (last_proposal.block.header.height + 1).into()]); *secret = SecretKey::from(next_secret); // Generate reward transaction - let tx = generate_transaction(fork, secret, recipient, zkbin, pk, next_block_height)?; + let tx = generate_transaction(&extended_fork.last_proposal()?, secret, recipient, zkbin, pk)?; // Generate next block proposal - let target = fork.module.next_mine_target()?; - let next_block = node.validator.consensus.generate_unsigned_block(fork, tx).await?; - - // Drop forks lock - drop(forks); + let target = extended_fork.module.next_mine_target()?; + let next_block = extended_fork.generate_unsigned_block(tx).await?; Ok((target, next_block)) } /// Auxiliary function to generate a Money::PoWReward transaction fn generate_transaction( - fork: &Fork, + last_proposal: &Proposal, secret: &SecretKey, recipient: &PublicKey, zkbin: &ZkBinary, pk: &ProvingKey, - block_height: u64, ) -> Result { // Grab extended proposal info - let last_proposal = fork.last_proposal()?; let last_nonce = last_proposal.block.header.nonce; let fork_previous_hash = last_proposal.block.header.previous; @@ -259,7 +247,7 @@ fn generate_transaction( let debris = PoWRewardCallBuilder { secret: *secret, recipient: *recipient, - block_height, + block_height: last_proposal.block.header.height + 1, last_nonce, fork_previous_hash, spend_hook, diff --git a/bin/darkfid/src/task/sync.rs b/bin/darkfid/src/task/sync.rs index 1d860448a..d5eb569c7 100644 --- a/bin/darkfid/src/task/sync.rs +++ b/bin/darkfid/src/task/sync.rs @@ -134,7 +134,7 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> { // Verify and store retrieved proposals debug!(target: "darkfid::task::sync_task", "Processing received proposals"); for proposal in &response.proposals { - node.validator.consensus.append_proposal(proposal).await?; + node.validator.append_proposal(proposal).await?; // Notify subscriber let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await)); proposal_notif_sub.notify(vec![enc_prop].into()).await; diff --git a/bin/darkfid/src/tests/forks.rs b/bin/darkfid/src/tests/forks.rs index 52ffc67d4..e50870309 100644 --- a/bin/darkfid/src/tests/forks.rs +++ b/bin/darkfid/src/tests/forks.rs @@ -34,7 +34,7 @@ fn forks() -> Result<()> { let module = PoWModule::new(blockchain.clone(), 90, None)?; // Create a fork - let fork = Fork::new(&blockchain, module).await?; + let fork = Fork::new(blockchain.clone(), module).await?; // Add a dummy record to fork fork.overlay.lock().unwrap().order.insert(&[0], &[record0])?; diff --git a/bin/darkfid/src/tests/harness.rs b/bin/darkfid/src/tests/harness.rs index 60cc38116..c48186271 100644 --- a/bin/darkfid/src/tests/harness.rs +++ b/bin/darkfid/src/tests/harness.rs @@ -176,7 +176,7 @@ impl Harness { // and then we broadcast it to rest nodes for block in blocks { let proposal = Proposal::new(block.clone())?; - self.alice.validator.consensus.append_proposal(&proposal).await?; + self.alice.validator.append_proposal(&proposal).await?; let message = ProposalMessage(proposal); self.alice.miners_p2p.as_ref().unwrap().broadcast(&message).await; } diff --git a/src/validator/consensus.rs b/src/validator/consensus.rs index fd9141350..df5c11513 100644 --- a/src/validator/consensus.rs +++ b/src/validator/consensus.rs @@ -48,6 +48,8 @@ pub struct Consensus { pub forks: RwLock>, /// Canonical blockchain PoW module state pub module: RwLock, + /// Lock to restrict when proposals appends can happen + pub append_lock: RwLock<()>, } impl Consensus { @@ -58,66 +60,18 @@ impl Consensus { pow_target: usize, pow_fixed_difficulty: Option, ) -> Result { + let forks = RwLock::new(vec![]); let module = RwLock::new(PoWModule::new(blockchain.clone(), pow_target, pow_fixed_difficulty)?); - Ok(Self { blockchain, finalization_threshold, forks: RwLock::new(vec![]), module }) - } - - /// Generate an unsigned block for provided fork, containing all - /// pending transactions. - pub async fn generate_unsigned_block( - &self, - fork: &Fork, - producer_tx: Transaction, - ) -> Result { - // Grab forks' next block height - let next_block_height = fork.get_next_block_height()?; - - // Grab forks' unproposed transactions - let mut unproposed_txs = fork.unproposed_txs(&self.blockchain, next_block_height).await?; - unproposed_txs.push(producer_tx); - - // Grab forks' last block proposal(previous) - let previous = fork.last_proposal()?; - - // Generate the new header - let header = - Header::new(previous.block.hash()?, next_block_height, Timestamp::current_time(), 0); - - // Generate the block - let mut block = BlockInfo::new_empty(header); - - // Add transactions to the block - block.append_txs(unproposed_txs)?; - - Ok(block) - } - - /// Generate a block proposal for provided fork, containing all - /// pending transactions. Proposal is signed using provided secret key, - /// which must also have signed the provided proposal transaction. - pub async fn generate_signed_proposal( - &self, - fork: &Fork, - producer_tx: Transaction, - secret_key: &SecretKey, - ) -> Result { - let mut block = self.generate_unsigned_block(fork, producer_tx).await?; - - // Sign block - block.sign(secret_key)?; - - // Generate the block proposal from the block - let proposal = Proposal::new(block)?; - - Ok(proposal) + let append_lock = RwLock::new(()); + Ok(Self { blockchain, finalization_threshold, forks, module, append_lock }) } /// Generate a new empty fork. pub async fn generate_empty_fork(&self) -> Result<()> { debug!(target: "validator::consensus::generate_empty_fork", "Generating new empty fork..."); let mut lock = self.forks.write().await; - let fork = Fork::new(&self.blockchain, self.module.read().await.clone()).await?; + let fork = Fork::new(self.blockchain.clone(), self.module.read().await.clone()).await?; lock.push(fork); drop(lock); debug!(target: "validator::consensus::generate_empty_fork", "Fork generated!"); @@ -129,6 +83,18 @@ impl Consensus { pub async fn append_proposal(&self, proposal: &Proposal) -> Result<()> { debug!(target: "validator::consensus::append_proposal", "Appending proposal {}", proposal.hash); + // Check if proposal already exists + let lock = self.forks.read().await; + for fork in lock.iter() { + for p in fork.proposals.iter().rev() { + if p == &proposal.hash { + drop(lock); + return Err(Error::ProposalAlreadyExists) + } + } + } + drop(lock); + // Verify proposal and grab corresponding fork let (mut fork, index) = verify_proposal(self, proposal).await?; @@ -141,13 +107,6 @@ impl Consensus { // If a fork index was found, replace forks with the mutated one, // otherwise push the new fork. let mut lock = self.forks.write().await; - // Check if fork already exists - for f in lock.iter() { - if f.proposals == fork.proposals { - drop(lock); - return Err(Error::ProposalAlreadyExists) - } - } match index { Some(i) => { if i < lock.len() && lock[i].proposals == fork.proposals[..fork.proposals.len() - 1] @@ -200,7 +159,7 @@ impl Consensus { } // Generate a new fork extending canonical - let fork = Fork::new(&self.blockchain, self.module.read().await.clone()).await?; + let fork = Fork::new(self.blockchain.clone(), self.module.read().await.clone()).await?; return Ok((fork, None)) } @@ -212,7 +171,7 @@ impl Consensus { } // Rebuild fork - let mut fork = Fork::new(&self.blockchain, self.module.read().await.clone()).await?; + let mut fork = Fork::new(self.blockchain.clone(), self.module.read().await.clone()).await?; fork.proposals = original_fork.proposals[..p_index + 1].to_vec(); // Retrieve proposals blocks from original fork @@ -382,6 +341,8 @@ impl From for BlockInfo { /// the proposals hashes sequence, for validations. #[derive(Clone)] pub struct Fork { + /// Canonical (finalized) blockchain + pub blockchain: Blockchain, /// Overlay cache over canonical Blockchain pub overlay: BlockchainOverlayPtr, /// Current PoW module state, @@ -395,11 +356,62 @@ pub struct Fork { } impl Fork { - pub async fn new(blockchain: &Blockchain, module: PoWModule) -> Result { + pub async fn new(blockchain: Blockchain, module: PoWModule) -> Result { let mempool = blockchain.get_pending_txs()?.iter().map(|tx| blake3::hash(&serialize(tx))).collect(); - let overlay = BlockchainOverlay::new(blockchain)?; - Ok(Self { overlay, module, proposals: vec![], mempool, rank: BigUint::from(0u64) }) + let overlay = BlockchainOverlay::new(&blockchain)?; + Ok(Self { + blockchain, + overlay, + module, + proposals: vec![], + mempool, + rank: BigUint::from(0u64), + }) + } + + /// Generate an unsigned block containing all pending transactions. + pub async fn generate_unsigned_block(&self, producer_tx: Transaction) -> Result { + // Grab forks' last block proposal(previous) + let previous = self.last_proposal()?; + + // Grab forks' next block height + let next_block_height = previous.block.header.height + 1; + + // Grab forks' unproposed transactions + let mut unproposed_txs = self.unproposed_txs(&self.blockchain, next_block_height).await?; + unproposed_txs.push(producer_tx); + + // Generate the new header + let header = + Header::new(previous.block.hash()?, next_block_height, Timestamp::current_time(), 0); + + // Generate the block + let mut block = BlockInfo::new_empty(header); + + // Add transactions to the block + block.append_txs(unproposed_txs)?; + + Ok(block) + } + + /// Generate a block proposal containing all pending transactions. + /// Proposal is signed using provided secret key, which must also + /// have signed the provided proposal transaction. + pub async fn generate_signed_proposal( + &self, + producer_tx: Transaction, + secret_key: &SecretKey, + ) -> Result { + let mut block = self.generate_unsigned_block(producer_tx).await?; + + // Sign block + block.sign(secret_key)?; + + // Generate the block proposal from the block + let proposal = Proposal::new(block)?; + + Ok(proposal) } /// Auxiliary function to append a proposal and recalculate current fork rank @@ -532,12 +544,13 @@ impl Fork { /// Changes to this copy don't affect original fork overlay records, since underlying /// overlay pointer have been updated to the cloned one. pub fn full_clone(&self) -> Result { + let blockchain = self.blockchain.clone(); let overlay = self.overlay.lock().unwrap().full_clone()?; let module = self.module.clone(); let proposals = self.proposals.clone(); let mempool = self.mempool.clone(); let rank = self.rank.clone(); - Ok(Self { overlay, module, proposals, mempool, rank }) + Ok(Self { blockchain, overlay, module, proposals, mempool, rank }) } } diff --git a/src/validator/mod.rs b/src/validator/mod.rs index c1fa0ef67..a26f26d94 100644 --- a/src/validator/mod.rs +++ b/src/validator/mod.rs @@ -306,16 +306,35 @@ impl Validator { Ok(()) } + /// The node locks its consensus state and tries to append provided proposal. + pub async fn append_proposal(&self, proposal: &Proposal) -> Result<()> { + // Grab append lock so we restrict concurrent calls of this function + let append_lock = self.consensus.append_lock.write().await; + + // Execute append + self.consensus.append_proposal(proposal).await?; + + // Release append lock + drop(append_lock); + + Ok(()) + } + /// The node checks if proposals can be finalized. /// If proposals are found, node appends them to canonical, excluding the /// last one, and rebuild the finalized fork to contain the last one. pub async fn finalization(&self) -> Result> { + // Grab append lock so no new proposals can be appended while + // we execute finalization + let append_lock = self.consensus.append_lock.write().await; + info!(target: "validator::finalization", "Performing finalization check"); // Grab blocks that can be finalized let mut finalized = self.consensus.finalization().await?; if finalized.is_empty() { info!(target: "validator::finalization", "No proposals can be finalized"); + drop(append_lock); return Ok(vec![]) } @@ -334,6 +353,9 @@ impl Validator { self.consensus.append_proposal(&Proposal::new(last)?).await?; info!(target: "validator::finalization", "Finalization completed!"); + // Release append lock + drop(append_lock); + Ok(finalized) }