diff --git a/src/consensus2/proto/protocol_sync.rs b/src/consensus2/proto/protocol_sync.rs index 3f1731841..70a61cb86 100644 --- a/src/consensus2/proto/protocol_sync.rs +++ b/src/consensus2/proto/protocol_sync.rs @@ -76,11 +76,13 @@ impl ProtocolSync { // TODO: The following code should be executed only by replicators, not // consensus nodes. - // Node stores finalized flock, if it doesn't exist (checking by slot). + // Node stores finalized flock, if it doesn't exist (checking by slot), + // and removes its transactions from the unconfirmed_txs vector. // Extra validations can be added here. let info_copy = (*info).clone(); if !self.state.read().await.blockchain.has_block(&info_copy)? { self.state.write().await.blockchain.add(&[info_copy.clone()])?; + self.state.write().await.remove_txs(info_copy.txs.clone())?; self.p2p.broadcast(info_copy).await?; } } diff --git a/src/consensus2/proto/protocol_tx.rs b/src/consensus2/proto/protocol_tx.rs index d9f2e9c86..c5a2e44cd 100644 --- a/src/consensus2/proto/protocol_tx.rs +++ b/src/consensus2/proto/protocol_tx.rs @@ -47,6 +47,8 @@ impl ProtocolTx { debug!("ProtocolTx::handle_receive_tx() recv: {:?}", tx); let tx_copy = (*tx).clone(); + + // Nodes use unconfirmed_txs vector as seen_txs pool. if self.state.write().await.append_tx(tx_copy.clone()) { self.p2p.broadcast(tx_copy).await?; } diff --git a/src/consensus2/state.rs b/src/consensus2/state.rs index 2595adfe9..187d2a803 100644 --- a/src/consensus2/state.rs +++ b/src/consensus2/state.rs @@ -456,6 +456,17 @@ impl ValidatorState { Ok(None) } + /// Remove provided transactions vector from unconfirmed_txs if they exist. + pub fn remove_txs(&mut self, transactions: Vec) -> Result<()> { + for tx in transactions { + if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) { + self.unconfirmed_txs.remove(pos); + } + } + + Ok(()) + } + /// Provided an index, the node checks if the chain can be finalized. /// Consensus finalization logic: /// - If the node has observed the notarization of 3 consecutive @@ -496,11 +507,6 @@ impl ValidatorState { for proposal in &mut chain.proposals[..(consecutive - 1)] { proposal.block.sm.finalized = true; finalized.push(proposal.clone().into()); - for tx in proposal.block.txs.clone() { - if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) { - self.unconfirmed_txs.remove(pos); - } - } } chain.proposals.drain(0..(consecutive - 1)); @@ -514,6 +520,10 @@ impl ValidatorState { } }; + for proposal in &finalized { + self.remove_txs(proposal.txs.clone())?; + } + let last_block = *blockhashes.last().unwrap(); let last_sl = finalized.last().unwrap().sl;