src/consensus/protocol_tx: atomic operation added to check for seen txs

This commit is contained in:
aggstam
2022-07-19 16:49:46 +03:00
parent ec7ebd09fd
commit 4d8c0a85c0

View File

@@ -69,8 +69,9 @@ impl ProtocolTx {
let tx_copy = (*tx).clone();
let tx_hash = blake3::hash(&serialize(&tx_copy));
let tx_in_txstore =
match self.state.read().await.blockchain.transactions.contains(&tx_hash) {
{
let state = &mut self.state.write().await;
let tx_in_txstore = match state.blockchain.transactions.contains(&tx_hash) {
Ok(v) => v,
Err(e) => {
error!("handle_receive_tx(): Failed querying txstore: {}", e);
@@ -78,27 +79,28 @@ impl ProtocolTx {
}
};
if self.state.read().await.unconfirmed_txs.contains(&tx_copy) || tx_in_txstore {
debug!("ProtocolTx::handle_receive_tx(): We have already seen this tx.");
continue
}
debug!("ProtocolTx::handle_receive_tx(): Starting state transition validation");
let canon_state_clone = self.state.read().await.state_machine.lock().await.clone();
let mem_state = MemoryState::new(canon_state_clone);
match ValidatorState::validate_state_transitions(mem_state, &[tx_copy.clone()]) {
Ok(_) => debug!("ProtocolTx::handle_receive_tx(): State transition valid"),
Err(e) => {
warn!("ProtocolTx::handle_receive_tx(): State transition fail: {}", e);
if state.unconfirmed_txs.contains(&tx_copy) || tx_in_txstore {
debug!("ProtocolTx::handle_receive_tx(): We have already seen this tx.");
continue
}
}
// Nodes use unconfirmed_txs vector as seen_txs pool.
if self.state.write().await.append_tx(tx_copy.clone()) {
if let Err(e) = self.p2p.broadcast_with_exclude(tx_copy, &exclude_list).await {
error!("handle_receive_tx(): p2p broadcast fail: {}", e);
};
debug!("ProtocolTx::handle_receive_tx(): Starting state transition validation");
let canon_state_clone = state.state_machine.lock().await.clone();
let mem_state = MemoryState::new(canon_state_clone);
match ValidatorState::validate_state_transitions(mem_state, &[tx_copy.clone()]) {
Ok(_) => debug!("ProtocolTx::handle_receive_tx(): State transition valid"),
Err(e) => {
warn!("ProtocolTx::handle_receive_tx(): State transition fail: {}", e);
continue
}
}
// Nodes use unconfirmed_txs vector as seen_txs pool.
if state.append_tx(tx_copy.clone()) {
if let Err(e) = self.p2p.broadcast_with_exclude(tx_copy, &exclude_list).await {
error!("handle_receive_tx(): p2p broadcast fail: {}", e);
};
}
}
}
}