consensus: Move tx protocol to sync p2p network.

This commit is contained in:
aggstam
2022-04-21 15:54:56 +02:00
committed by parazyd
parent 97dadd8bf0
commit c5bcd05eec
3 changed files with 20 additions and 6 deletions

View File

@@ -76,11 +76,13 @@ impl ProtocolSync {
// TODO: The following code should be executed only by replicators, not
// consensus nodes.
// Node stores finalized flock, if it doesn't exist (checking by slot).
// Node stores finalized flock, if it doesn't exist (checking by slot),
// and removes its transactions from the unconfirmed_txs vector.
// Extra validations can be added here.
let info_copy = (*info).clone();
if !self.state.read().await.blockchain.has_block(&info_copy)? {
self.state.write().await.blockchain.add(&[info_copy.clone()])?;
self.state.write().await.remove_txs(info_copy.txs.clone())?;
self.p2p.broadcast(info_copy).await?;
}
}

View File

@@ -47,6 +47,8 @@ impl ProtocolTx {
debug!("ProtocolTx::handle_receive_tx() recv: {:?}", tx);
let tx_copy = (*tx).clone();
// Nodes use unconfirmed_txs vector as seen_txs pool.
if self.state.write().await.append_tx(tx_copy.clone()) {
self.p2p.broadcast(tx_copy).await?;
}

View File

@@ -456,6 +456,17 @@ impl ValidatorState {
Ok(None)
}
/// Remove provided transactions vector from unconfirmed_txs if they exist.
pub fn remove_txs(&mut self, transactions: Vec<Tx>) -> Result<()> {
for tx in transactions {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) {
self.unconfirmed_txs.remove(pos);
}
}
Ok(())
}
/// Provided an index, the node checks if the chain can be finalized.
/// Consensus finalization logic:
/// - If the node has observed the notarization of 3 consecutive
@@ -496,11 +507,6 @@ impl ValidatorState {
for proposal in &mut chain.proposals[..(consecutive - 1)] {
proposal.block.sm.finalized = true;
finalized.push(proposal.clone().into());
for tx in proposal.block.txs.clone() {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) {
self.unconfirmed_txs.remove(pos);
}
}
}
chain.proposals.drain(0..(consecutive - 1));
@@ -514,6 +520,10 @@ impl ValidatorState {
}
};
for proposal in &finalized {
self.remove_txs(proposal.txs.clone())?;
}
let last_block = *blockhashes.last().unwrap();
let last_sl = finalized.last().unwrap().sl;