diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index 0f23953c0..f36f85ca0 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -31,7 +31,7 @@ pub mod slot_checkpoint_store; pub use slot_checkpoint_store::SlotCheckpointStore; pub mod tx_store; -pub use tx_store::TxStore; +pub use tx_store::{PendingTxStore, TxStore}; pub mod contract_store; pub use contract_store::{ContractStateStore, WasmStore}; @@ -51,6 +51,8 @@ pub struct Blockchain { pub slot_checkpoints: SlotCheckpointStore, /// Transactions sled tree pub transactions: TxStore, + /// Pending transactions sled tree + pub pending_txs: PendingTxStore, /// Contract states pub contracts: ContractStateStore, /// Wasm bincodes @@ -65,6 +67,7 @@ impl Blockchain { let order = BlockOrderStore::new(db, genesis_ts, genesis_data)?; let slot_checkpoints = SlotCheckpointStore::new(db)?; let transactions = TxStore::new(db)?; + let pending_txs = PendingTxStore::new(db)?; let contracts = ContractStateStore::new(db)?; let wasm_bincode = WasmStore::new(db)?; @@ -75,6 +78,7 @@ impl Blockchain { order, slot_checkpoints, transactions, + pending_txs, contracts, wasm_bincode, }) diff --git a/src/blockchain/tx_store.rs b/src/blockchain/tx_store.rs index 0d6701a0d..27a9723ad 100644 --- a/src/blockchain/tx_store.rs +++ b/src/blockchain/tx_store.rs @@ -21,6 +21,7 @@ use darkfi_serial::{deserialize, serialize}; use crate::{tx::Transaction, Error, Result}; const SLED_TX_TREE: &[u8] = b"_transactions"; +const SLED_PENDING_TX_TREE: &[u8] = b"_pending_transactions"; /// The `TxStore` is a `sled` tree storing all the blockchain's /// transactions where the key is the transaction hash, and the value is @@ -105,3 +106,83 @@ impl TxStore { Ok(txs) } } + +/// The `PendingTxStore` is a `sled` tree storing all the node pending +/// transactions where the key is the transaction hash, and the value is +/// the serialized transaction. +#[derive(Clone)] +pub struct PendingTxStore(sled::Tree); + +impl PendingTxStore { + /// Opens a new or existing `PendingTxStore` on the given sled database. + pub fn new(db: &sled::Db) -> Result { + let tree = db.open_tree(SLED_PENDING_TX_TREE)?; + Ok(Self(tree)) + } + + /// Insert a slice of [`Transaction`] into the pending tx store. + /// With sled, the operation is done as a batch. + /// The transactions are hashed with BLAKE3 and this hash is used as + /// the key, while the value is the serialized [`Transaction`] itself. + /// On success, the function returns the transaction hashes in the same + /// order as the input transactions. + pub fn insert(&self, transactions: &[Transaction]) -> Result> { + let mut ret = Vec::with_capacity(transactions.len()); + let mut batch = sled::Batch::default(); + + for tx in transactions { + let serialized = serialize(tx); + let tx_hash = blake3::hash(&serialized); + batch.insert(tx_hash.as_bytes(), serialized); + ret.push(tx_hash); + } + + self.0.apply_batch(batch)?; + Ok(ret) + } + + /// Check if the pending tx store contains a given transaction hash. + pub fn contains(&self, tx_hash: &blake3::Hash) -> Result { + Ok(self.0.contains_key(tx_hash.as_bytes())?) + } + + /// Retrieve all transactions from the pending tx store in the form of a tuple + /// (`tx_hash`, `tx`). + /// Be careful as this will try to load everything in memory. + pub fn get_all(&self) -> Result> { + let mut txs = vec![]; + + for tx in self.0.iter() { + let (key, value) = tx.unwrap(); + let hash_bytes: [u8; 32] = key.as_ref().try_into().unwrap(); + let tx = deserialize(&value)?; + txs.push((hash_bytes.into(), tx)); + } + + Ok(txs) + } + + /// Retrieve all transactions from the pending tx store. + /// Be careful as this will try to load everything in memory. + pub fn get_all_txs(&self) -> Result> { + let txs = self.get_all()?; + Ok(txs.iter().map(|x| x.1.clone()).rev().collect()) + } + + /// Remove a slice of [`Transaction`] from the pending tx store. + /// With sled, the operation is done as a batch. + /// The transactions are hashed with BLAKE3 and this hash is used as + /// the key to remove. + pub fn remove(&self, transactions: &[Transaction]) -> Result<()> { + let mut batch = sled::Batch::default(); + + for tx in transactions { + let serialized = serialize(tx); + let tx_hash = blake3::hash(&serialized); + batch.remove(tx_hash.as_bytes()); + } + + self.0.apply_batch(batch)?; + Ok(()) + } +} diff --git a/src/consensus/proto/protocol_sync_consensus.rs b/src/consensus/proto/protocol_sync_consensus.rs index f799679bc..66d808613 100644 --- a/src/consensus/proto/protocol_sync_consensus.rs +++ b/src/consensus/proto/protocol_sync_consensus.rs @@ -99,7 +99,17 @@ impl ProtocolSyncConsensus { for fork in &lock.consensus.forks { forks.push(fork.clone().into()); } - let unconfirmed_txs = lock.unconfirmed_txs.clone(); + let pending_txs = match lock.blockchain.pending_txs.get_all_txs() { + Ok(v) => v, + Err(e) => { + debug!( + target: "consensus::protocol_sync_consensus::handle_receive_request()", + "Failed querying pending txs store: {}", + e + ); + vec![] + } + }; let slot_checkpoints = lock.consensus.slot_checkpoints.clone(); let mut f_history = vec![]; for f in &lock.consensus.f_history { @@ -116,7 +126,7 @@ impl ProtocolSyncConsensus { bootstrap_slot, current_slot, forks, - unconfirmed_txs, + pending_txs, slot_checkpoints, f_history, err_history, diff --git a/src/consensus/state.rs b/src/consensus/state.rs index cb9d6ed29..9457d30db 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -665,7 +665,7 @@ pub struct ConsensusResponse { /// Hot/live data used by the consensus algorithm pub forks: Vec, /// Pending transactions - pub unconfirmed_txs: Vec, + pub pending_txs: Vec, /// Hot/live slot checkpoints pub slot_checkpoints: Vec, // TODO: When Float10 supports encoding/decoding this should be diff --git a/src/consensus/task/consensus_sync.rs b/src/consensus/task/consensus_sync.rs index e534b4967..102077901 100644 --- a/src/consensus/task/consensus_sync.rs +++ b/src/consensus/task/consensus_sync.rs @@ -145,7 +145,7 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul } lock.consensus.bootstrap_slot = response.bootstrap_slot; lock.consensus.forks = forks; - lock.unconfirmed_txs = response.unconfirmed_txs.clone(); + lock.append_pending_txs(&response.pending_txs).await; lock.consensus.slot_checkpoints = response.slot_checkpoints.clone(); lock.consensus.previous_leaders = 1; let mut f_history = vec![]; diff --git a/src/consensus/validator.rs b/src/consensus/validator.rs index a7575795f..bc258790b 100644 --- a/src/consensus/validator.rs +++ b/src/consensus/validator.rs @@ -72,8 +72,6 @@ pub struct ValidatorState { pub consensus: ConsensusState, /// Canonical (finalized) blockchain pub blockchain: Blockchain, - /// Pending transactions - pub unconfirmed_txs: Vec, /// A map of various subscribers exporting live info from the blockchain /// TODO: Instead of JsonNotification, it can be an enum of internal objects, /// and then we don't have to deal with json in this module but only @@ -135,8 +133,6 @@ impl ValidatorState { single_node, )?; - let unconfirmed_txs = vec![]; - // -----NATIVE WASM CONTRACTS----- // This is the current place where native contracts are being deployed. // When the `Blockchain` object is created, it doesn't care whether it @@ -191,7 +187,6 @@ impl ValidatorState { lead_verifying_key, consensus, blockchain, - unconfirmed_txs, subscribers, wallet, synced: false, @@ -202,7 +197,7 @@ impl ValidatorState { } /// The node retrieves a transaction, validates its state transition, - /// and appends it to the unconfirmed transactions list. + /// and appends it to the pending txs store. pub async fn append_tx(&mut self, tx: Transaction) -> bool { let tx_hash = blake3::hash(&serialize(&tx)); let tx_in_txstore = match self.blockchain.transactions.contains(&tx_hash) { @@ -213,7 +208,15 @@ impl ValidatorState { } }; - if self.unconfirmed_txs.contains(&tx) || tx_in_txstore { + let tx_in_pending_txs_store = match self.blockchain.pending_txs.contains(&tx_hash) { + Ok(v) => v, + Err(e) => { + error!(target: "consensus::validator", "append_tx(): Failed querying pending txs store: {}", e); + return false + } + }; + + if tx_in_txstore || tx_in_pending_txs_store { info!(target: "consensus::validator", "append_tx(): We have already seen this tx."); return false } @@ -224,13 +227,60 @@ impl ValidatorState { return false }; - info!(target: "consensus::validator", "append_tx(): Appended tx to mempool"); - self.unconfirmed_txs.push(tx); + if let Err(e) = self.blockchain.pending_txs.insert(&[tx]) { + error!(target: "consensus::validator", "append_tx(): Failed to insert transaction to pending txs store: {}", e); + return false + } + info!(target: "consensus::validator", "append_tx(): Appended tx to pending txs store"); true } + /// The node retrieves transactions vector, validates their state transition, + /// and appends successfull ones to the pending txs store. + pub async fn append_pending_txs(&mut self, txs: &[Transaction]) { + let mut filtered_txs = vec![]; + for tx in txs { + let tx_hash = blake3::hash(&serialize(tx)); + let tx_in_txstore = match self.blockchain.transactions.contains(&tx_hash) { + Ok(v) => v, + Err(e) => { + error!(target: "consensus::validator", "append_pending_txs(): Failed querying txstore: {}", e); + continue + } + }; + + let tx_in_pending_txs_store = match self.blockchain.pending_txs.contains(&tx_hash) { + Ok(v) => v, + Err(e) => { + error!(target: "consensus::validator", "append_pending_txs(): Failed querying pending txs store: {}", e); + continue + } + }; + + if tx_in_txstore || tx_in_pending_txs_store { + info!(target: "consensus::validator", "append_pending_txs(): We have already seen this tx."); + continue + } + + filtered_txs.push(tx.clone()); + } + + info!(target: "consensus::validator", "append_pending_txs(): Starting state transition validation"); + // TODO: verify_transactions should return erroneous txs of the set to ignore them + if let Err(e) = self.verify_transactions(&filtered_txs[..], false).await { + error!(target: "consensus::validator", "append_pending_txs(): Failed to verify transaction: {}", e); + return + }; + + if let Err(e) = self.blockchain.pending_txs.insert(&filtered_txs) { + error!(target: "consensus::validator", "append_pending_txs(): Failed to insert transactions to pending txs store: {}", e); + return + } + info!(target: "consensus::validator", "append_pending_txs(): Appended tx to pending txs store"); + } + /// Generate a block proposal for the current slot, containing all - /// unconfirmed transactions. Proposal extends the longest fork + /// pending transactions. Proposal extends the longest fork /// chain the node is holding. pub fn propose( &mut self, @@ -247,7 +297,7 @@ impl ValidatorState { } // Generate proposal - let unproposed_txs = self.unproposed_txs(fork_index); + let unproposed_txs = self.unproposed_txs(fork_index)?; let mut tree = BridgeTree::::new(100); // The following is pretty weird, so something better should be done. for tx in &unproposed_txs { @@ -304,17 +354,17 @@ impl ValidatorState { Ok(Some((BlockProposal::new(header, unproposed_txs, lead_info), coin, derived_blind))) } - /// Retrieve all unconfirmed transactions not proposed in previous blocks + /// Retrieve all pending transactions not proposed in previous blocks /// of provided index chain. - pub fn unproposed_txs(&self, index: i64) -> Vec { + pub fn unproposed_txs(&self, index: i64) -> Result> { let unproposed_txs = if index == -1 { // If index is -1 (canonical blockchain) a new fork will be generated, // therefore all unproposed transactions can be included in the proposal. - self.unconfirmed_txs.clone() + self.blockchain.pending_txs.get_all_txs()? } else { // We iterate over the fork chain proposals to find already proposed // transactions and remove them from the local unproposed_txs vector. - let mut filtered_txs = self.unconfirmed_txs.clone(); + let mut filtered_txs = self.blockchain.pending_txs.get_all_txs()?; let chain = &self.consensus.forks[index as usize]; for state_checkpoint in &chain.sequence { for tx in &state_checkpoint.proposal.block.txs { @@ -329,10 +379,10 @@ impl ValidatorState { // Check if transactions exceed configured cap let cap = constants::TXS_CAP; if unproposed_txs.len() > cap { - return unproposed_txs[0..cap].to_vec() + return Ok(unproposed_txs[0..cap].to_vec()) } - unproposed_txs + Ok(unproposed_txs) } /// Given a proposal, the node verify its sender (slot leader) and finds which blockchain @@ -574,17 +624,6 @@ impl ValidatorState { Ok(true) } - /// Remove provided transactions vector from unconfirmed_txs if they exist. - pub fn remove_txs(&mut self, transactions: &Vec) -> Result<()> { - for tx in transactions { - if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| txs == tx) { - self.unconfirmed_txs.remove(pos); - } - } - - Ok(()) - } - /// Node checks if any of the fork chains can be finalized. /// Consensus finalization logic: /// - If the node has observed the creation of a fork chain and no other forks exists at same or greater height, @@ -669,8 +708,8 @@ impl ValidatorState { return Err(e) } - // Remove proposal transactions from memory pool - if let Err(e) = self.remove_txs(&proposal.txs) { + // Remove proposal transactions from pending txs store + if let Err(e) = self.blockchain.pending_txs.remove(&proposal.txs) { error!(target: "consensus::validator", "Removing finalized block transactions failed: {}", e); return Err(e) } @@ -778,8 +817,8 @@ impl ValidatorState { info!(target: "consensus::validator", "consensus: Sending notification about finalized block"); blocks_subscriber.notify(notif).await; - info!(target: "consensus::validator", "receive_finalized_block(): Removing block transactions from unconfirmed_txs"); - self.remove_txs(&block.txs)?; + info!(target: "consensus::validator", "receive_finalized_block(): Removing block transactions from pending txs store"); + self.blockchain.pending_txs.remove(&block.txs)?; Ok(true) }