From 4d8c0a85c0ce106227d1caac524902cf0e2e99dc Mon Sep 17 00:00:00 2001 From: aggstam Date: Tue, 19 Jul 2022 16:49:46 +0300 Subject: [PATCH] src/consensus/protocol_tx: atomic operation added to check for seen txs --- src/consensus/proto/protocol_tx.rs | 42 ++++++++++++++++-------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/consensus/proto/protocol_tx.rs b/src/consensus/proto/protocol_tx.rs index b385d2b6b..4d56998a8 100644 --- a/src/consensus/proto/protocol_tx.rs +++ b/src/consensus/proto/protocol_tx.rs @@ -69,8 +69,9 @@ impl ProtocolTx { let tx_copy = (*tx).clone(); let tx_hash = blake3::hash(&serialize(&tx_copy)); - let tx_in_txstore = - match self.state.read().await.blockchain.transactions.contains(&tx_hash) { + { + let state = &mut self.state.write().await; + let tx_in_txstore = match state.blockchain.transactions.contains(&tx_hash) { Ok(v) => v, Err(e) => { error!("handle_receive_tx(): Failed querying txstore: {}", e); @@ -78,27 +79,28 @@ impl ProtocolTx { } }; - if self.state.read().await.unconfirmed_txs.contains(&tx_copy) || tx_in_txstore { - debug!("ProtocolTx::handle_receive_tx(): We have already seen this tx."); - continue - } - - debug!("ProtocolTx::handle_receive_tx(): Starting state transition validation"); - let canon_state_clone = self.state.read().await.state_machine.lock().await.clone(); - let mem_state = MemoryState::new(canon_state_clone); - match ValidatorState::validate_state_transitions(mem_state, &[tx_copy.clone()]) { - Ok(_) => debug!("ProtocolTx::handle_receive_tx(): State transition valid"), - Err(e) => { - warn!("ProtocolTx::handle_receive_tx(): State transition fail: {}", e); + if state.unconfirmed_txs.contains(&tx_copy) || tx_in_txstore { + debug!("ProtocolTx::handle_receive_tx(): We have already seen this tx."); continue } - } - // Nodes use unconfirmed_txs vector as seen_txs pool. - if self.state.write().await.append_tx(tx_copy.clone()) { - if let Err(e) = self.p2p.broadcast_with_exclude(tx_copy, &exclude_list).await { - error!("handle_receive_tx(): p2p broadcast fail: {}", e); - }; + debug!("ProtocolTx::handle_receive_tx(): Starting state transition validation"); + let canon_state_clone = state.state_machine.lock().await.clone(); + let mem_state = MemoryState::new(canon_state_clone); + match ValidatorState::validate_state_transitions(mem_state, &[tx_copy.clone()]) { + Ok(_) => debug!("ProtocolTx::handle_receive_tx(): State transition valid"), + Err(e) => { + warn!("ProtocolTx::handle_receive_tx(): State transition fail: {}", e); + continue + } + } + + // Nodes use unconfirmed_txs vector as seen_txs pool. + if state.append_tx(tx_copy.clone()) { + if let Err(e) = self.p2p.broadcast_with_exclude(tx_copy, &exclude_list).await { + error!("handle_receive_tx(): p2p broadcast fail: {}", e); + }; + } } } }