From 43f89dbfc744995b609fb1b8fbdd87ba42cdfd04 Mon Sep 17 00:00:00 2001 From: parazyd Date: Mon, 6 Mar 2023 20:10:01 +0100 Subject: [PATCH] consensus: Remove erroneus tx logic. --- bin/darkfid/src/main.rs | 3 - bin/darkfid/src/rpc_blockchain.rs | 20 --- bin/drk/src/main.rs | 10 +- bin/drk/src/rpc_blockchain.rs | 17 --- src/blockchain/mod.rs | 17 +-- src/blockchain/tx_store.rs | 55 -------- src/consensus/validator.rs | 203 ++++++------------------------ 7 files changed, 40 insertions(+), 285 deletions(-) diff --git a/bin/darkfid/src/main.rs b/bin/darkfid/src/main.rs index fd39bf0cc..87cbf4662 100644 --- a/bin/darkfid/src/main.rs +++ b/bin/darkfid/src/main.rs @@ -225,9 +225,6 @@ impl RequestHandler for Darkfid { Some("blockchain.lookup_zkas") => { return self.blockchain_lookup_zkas(req.id, params).await } - Some("blockchain.was_erroneous_tx") => { - return self.blockchain_was_erroneous_tx(req.id, params).await - } // =================== // Transaction methods diff --git a/bin/darkfid/src/rpc_blockchain.rs b/bin/darkfid/src/rpc_blockchain.rs index ee310cf16..4e6fe9dcf 100644 --- a/bin/darkfid/src/rpc_blockchain.rs +++ b/bin/darkfid/src/rpc_blockchain.rs @@ -192,24 +192,4 @@ impl Darkfid { JsonResponse::new(json!(ret), id).into() } - - // RPCAPI: - // Queries the blockchain database to check if the provided transaction hash exists - // in the erroneous transactions set. - // - // --> {"jsonrpc": "2.0", "method": "blockchain.was_erroneous_tx", "params": [[tx_hash bytes]], "id": 1} - // <-- {"jsonrpc": "2.0", "result": bool, "id": 1} - pub async fn blockchain_was_erroneous_tx(&self, id: Value, params: &[Value]) -> JsonResult { - if params.len() != 1 || !params[0].is_array() { - return JsonError::new(InvalidParams, None, id).into() - } - let hash_bytes: [u8; 32] = serde_json::from_value(params[0].clone()).unwrap(); - let tx_hash = blake3::Hash::try_from(hash_bytes).unwrap(); - let blockchain = { self.validator_state.read().await.blockchain.clone() }; - let Ok(result) = blockchain.was_erroneous_tx(&tx_hash) else { - return JsonError::new(InternalError, None, id).into() - }; - - JsonResponse::new(json!(result), id).into() - } } diff --git a/bin/drk/src/main.rs b/bin/drk/src/main.rs index e08db461b..d34bb7ce0 100644 --- a/bin/drk/src/main.rs +++ b/bin/drk/src/main.rs @@ -1053,18 +1053,14 @@ async fn main() -> Result<()> { { tx } else { - eprintln!("Tx not found"); - return Ok(()) + eprintln!("tx not found"); + exit(1); }; + // Make sure the tx is correct assert_eq!(tx.hash(), tx_hash); println!("Transaction ID: {}", tx_hash); - let is_err = drk - .was_erroneous_tx(&tx_hash) - .await - .with_context(|| "Failed to get tx state")?; - println!("State: {}", if is_err { "failed" } else { "passed" }); Ok(()) } diff --git a/bin/drk/src/rpc_blockchain.rs b/bin/drk/src/rpc_blockchain.rs index dc7152ca7..6a7a20640 100644 --- a/bin/drk/src/rpc_blockchain.rs +++ b/bin/drk/src/rpc_blockchain.rs @@ -118,10 +118,6 @@ impl Drk { async fn scan_block_dao(&self, block: &BlockInfo) -> Result<()> { eprintln!("[DAO] Iterating over {} transactions", block.txs.len()); for tx in block.txs.iter() { - // Verify transaction is not in the erroneous set - if self.was_erroneous_tx(&tx.hash()).await? { - continue - } self.apply_tx_dao_data(tx, true).await?; } @@ -136,10 +132,6 @@ impl Drk { eprintln!("[Money] Iterating over {} transactions", block.txs.len()); for tx in block.txs.iter() { - // Verify transaction is not in the erroneous set - if self.was_erroneous_tx(&tx.hash()).await? { - continue - } self.apply_tx_money_data(tx, true).await?; } @@ -307,13 +299,4 @@ impl Drk { Ok(()) } - - /// Queries darkfid to check if transaction is in the erroneous set - pub async fn was_erroneous_tx(&self, tx_hash: &blake3::Hash) -> Result { - let req = JsonRequest::new("blockchain.was_erroneous_tx", json!([tx_hash.as_bytes()])); - match self.rpc_client.request(req).await { - Ok(v) => Ok(serde_json::from_value(v)?), - Err(_) => Ok(false), - } - } } diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index 220108617..0f23953c0 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -20,7 +20,6 @@ use log::debug; use crate::{ consensus::{Block, BlockInfo, SlotCheckpoint}, - tx::Transaction, util::time::Timestamp, Result, }; @@ -32,7 +31,7 @@ pub mod slot_checkpoint_store; pub use slot_checkpoint_store::SlotCheckpointStore; pub mod tx_store; -pub use tx_store::{ErroneousTxStore, TxStore}; +pub use tx_store::TxStore; pub mod contract_store; pub use contract_store::{ContractStateStore, WasmStore}; @@ -52,8 +51,6 @@ pub struct Blockchain { pub slot_checkpoints: SlotCheckpointStore, /// Transactions sled tree pub transactions: TxStore, - /// Erroneous transactions sled tree - pub erroneous_txs: ErroneousTxStore, /// Contract states pub contracts: ContractStateStore, /// Wasm bincodes @@ -68,7 +65,6 @@ impl Blockchain { let order = BlockOrderStore::new(db, genesis_ts, genesis_data)?; let slot_checkpoints = SlotCheckpointStore::new(db)?; let transactions = TxStore::new(db)?; - let erroneous_txs = ErroneousTxStore::new(db)?; let contracts = ContractStateStore::new(db)?; let wasm_bincode = WasmStore::new(db)?; @@ -79,7 +75,6 @@ impl Blockchain { order, slot_checkpoints, transactions, - erroneous_txs, contracts, wasm_bincode, }) @@ -221,14 +216,4 @@ impl Blockchain { }; Ok(!vec.is_empty()) } - - /// Insert a given slice of [`Transactions`] containing erronenous txs into the blockchain database - pub fn add_erroneous_txs(&self, erronenous_txs: &[Transaction]) -> Result> { - self.erroneous_txs.insert(erronenous_txs) - } - - /// Check if the erroneoustxstore contains given transaction hash. - pub fn was_erroneous_tx(&self, tx_hash: &blake3::Hash) -> Result { - self.erroneous_txs.contains(tx_hash) - } } diff --git a/src/blockchain/tx_store.rs b/src/blockchain/tx_store.rs index 55022d9a4..0d6701a0d 100644 --- a/src/blockchain/tx_store.rs +++ b/src/blockchain/tx_store.rs @@ -21,7 +21,6 @@ use darkfi_serial::{deserialize, serialize}; use crate::{tx::Transaction, Error, Result}; const SLED_TX_TREE: &[u8] = b"_transactions"; -const SLED_ERRONEOUS_TX_TREE: &[u8] = b"_erroneous_transactions"; /// The `TxStore` is a `sled` tree storing all the blockchain's /// transactions where the key is the transaction hash, and the value is @@ -106,57 +105,3 @@ impl TxStore { Ok(txs) } } - -/// The `ErroneousTxStore` is a `sled` tree storing all the blockchain's -/// erroneous transactions where the key is the transaction hash, and the value is -/// an empty slice. -#[derive(Clone)] -pub struct ErroneousTxStore(sled::Tree); - -impl ErroneousTxStore { - /// Opens a new or existing `ErroneousTxStore` on the given sled database. - pub fn new(db: &sled::Db) -> Result { - let tree = db.open_tree(SLED_ERRONEOUS_TX_TREE)?; - Ok(Self(tree)) - } - - /// Insert a slice of [`Transaction`] into the erroneoustxstore. With sled, the - /// operation is done as a batch. - /// The transactions are hashed with BLAKE3 and this hash is used as - /// the key, while the value is an empty slice. - /// On success, the function returns the transaction hashes in the same - /// order as the input transactions. - pub fn insert(&self, transactions: &[Transaction]) -> Result> { - let mut ret = Vec::with_capacity(transactions.len()); - let mut batch = sled::Batch::default(); - - for tx in transactions { - let serialized = serialize(tx); - let tx_hash = blake3::hash(&serialized); - batch.insert(tx_hash.as_bytes(), &[]); - ret.push(tx_hash); - } - - self.0.apply_batch(batch)?; - Ok(ret) - } - - /// Check if the erroneoustxstore contains a given transaction hash. - pub fn contains(&self, tx_hash: &blake3::Hash) -> Result { - Ok(self.0.contains_key(tx_hash.as_bytes())?) - } - - /// Retrieve all erroneous transaction hashes from the erroneoustxstore. - /// Be careful as this will try to load everything in memory. - pub fn get_all(&self) -> Result> { - let mut txs = vec![]; - - for tx in self.0.iter() { - let (key, _) = tx.unwrap(); - let hash_bytes: [u8; 32] = key.as_ref().try_into().unwrap(); - txs.push(hash_bytes.into()); - } - - Ok(txs) - } -} diff --git a/src/consensus/validator.rs b/src/consensus/validator.rs index 75736c63b..360c8207d 100644 --- a/src/consensus/validator.rs +++ b/src/consensus/validator.rs @@ -657,19 +657,15 @@ impl ValidatorState { let blocks_subscriber = self.subscribers.get("blocks").unwrap().clone(); // Validating state transitions - let mut erroneous_txs = vec![]; for proposal in &finalized { // TODO: Is this the right place? We're already doing this in protocol_sync. // TODO: These state transitions have already been checked. (I wrote this, but where?) // TODO: FIXME: The state transitions have already been written, they have to be in memory // until this point. info!(target: "consensus::validator", "Applying state transition for finalized block"); - match self.verify_transactions(&proposal.txs, true).await { - Ok(hashes) => erroneous_txs.extend(hashes), - Err(e) => { - error!(target: "consensus::validator", "Finalized block transaction verifications failed: {}", e); - return Err(e) - } + if let Err(e) = self.verify_transactions(&proposal.txs, true).await { + error!(target: "consensus::validator", "Finalized block transaction verifications failed: {}", e); + return Err(e) } // Remove proposal transactions from memory pool @@ -684,7 +680,6 @@ impl ValidatorState { info!(target: "consensus::validator", "consensus: Sending notification about finalized block"); blocks_subscriber.notify(notif).await; } - self.blockchain.add_erroneous_txs(&erroneous_txs)?; // Setting leaders history to last proposal leaders count let last_state_checkpoint = fork.sequence.last().unwrap().clone(); @@ -738,21 +733,16 @@ impl ValidatorState { pub async fn receive_blocks(&mut self, blocks: &[BlockInfo]) -> Result<()> { // Verify state transitions for all blocks and their respective transactions. info!(target: "consensus::validator", "receive_blocks(): Starting state transition validations"); - let mut erroneous_txs = vec![]; + for block in blocks { - match self.verify_transactions(&block.txs, true).await { - Ok(hashes) => erroneous_txs.extend(hashes), - Err(e) => { - error!(target: "consensus::validator", "receive_blocks(): Transaction verifications failed: {}", e); - return Err(e) - } - } + if let Err(e) = self.verify_transactions(&block.txs, true).await { + error!(target: "consensus::validator", "receive_blocks(): Transaction verifications failed: {}", e); + return Err(e) + }; } - info!(target: "consensus::validator", "receive_blocks(): All state transitions passed"); - info!(target: "consensus::validator", "receive_blocks(): Appending blocks to ledger"); + info!(target: "consensus::validator", "receive_blocks(): All state transitions passed. Appending blocks to ledger."); self.blockchain.add(blocks)?; - self.blockchain.add_erroneous_txs(&erroneous_txs)?; Ok(()) } @@ -843,8 +833,6 @@ impl ValidatorState { /// transaction if any of the verifications fail. /// The function takes a boolean called `write` which tells it to actually write /// the state transitions to the database. - // TODO: Currently we keep erroneous transactions in the vector and blocks, - // in order to apply max fee logic in the future, to prevent spamming. // TODO: This should be paralellized as if even one tx in the batch fails to verify, // we can skip it. When things are parallel, make sure to write in a deterministic // order. @@ -854,13 +842,8 @@ impl ValidatorState { // 3. Verify execution // 4. Verify ZK proofs // 5. (optionally) write - pub async fn verify_transactions( - &self, - txs: &[Transaction], - write: bool, - ) -> Result> { + pub async fn verify_transactions(&self, txs: &[Transaction], write: bool) -> Result<()> { info!(target: "consensus::validator", "Verifying {} transaction(s)", txs.len()); - let mut erroneous_txs = vec![]; for tx in txs { let tx_hash = blake3::hash(&serialize(tx)); @@ -883,24 +866,9 @@ impl ValidatorState { } // Iterate over all calls to get the metadata - let mut skip = false; for (idx, call) in tx.calls.iter().enumerate() { info!(target: "consensus::validator", "Executing contract call {}", idx); - let wasm = match self.blockchain.wasm_bincode.get(call.contract_id) { - Ok(v) => { - info!(target: "consensus::validator", "Found wasm bincode for {}", call.contract_id); - v - } - Err(e) => { - error!( - target: "consensus::validator", - "Could not find wasm bincode for contract {}: {}", - call.contract_id, e - ); - skip = true; - break - } - }; + let wasm = self.blockchain.wasm_bincode.get(call.contract_id)?; // Write the actual payload data let mut payload = vec![]; @@ -908,53 +876,18 @@ impl ValidatorState { tx.calls.encode(&mut payload)?; // Actual call data // Instantiate the wasm runtime - let mut runtime = - match Runtime::new(&wasm, self.blockchain.clone(), call.contract_id) { - Ok(v) => v, - Err(e) => { - error!( - target: "consensus::validator", - "Failed to instantiate WASM runtime for contract {}: {}", - call.contract_id, e - ); - skip = true; - break - } - }; + let mut runtime = Runtime::new(&wasm, self.blockchain.clone(), call.contract_id)?; info!(target: "consensus::validator", "Executing \"metadata\" call"); - let metadata = match runtime.metadata(&payload) { - Ok(v) => v, - Err(e) => { - error!(target: "consensus::validator", "Failed to execute \"metadata\" call: {}", e); - skip = true; - break - } - }; + let metadata = runtime.metadata(&payload)?; // Decode the metadata retrieved from the execution let mut decoder = Cursor::new(&metadata); - // (zkas_ns, public_inputs) - let zkp_pub: Vec<(String, Vec)> = match Decodable::decode( - &mut decoder, - ) { - Ok(v) => v, - Err(e) => { - error!(target: "consensus::validator", "Failed to decode ZK public inputs from metadata: {}", e); - skip = true; - break - } - }; + // The tuple is (zkas_ns, public_inputs) + let zkp_pub: Vec<(String, Vec)> = Decodable::decode(&mut decoder)?; - let sig_pub: Vec = match Decodable::decode(&mut decoder) { - Ok(v) => v, - Err(e) => { - error!(target: "consensus::validator", "Failed to decode signature pubkeys from metadata: {}", e); - skip = true; - break - } - }; + let sig_pub: Vec = Decodable::decode(&mut decoder)?; // TODO: Make sure we've read all the bytes above. info!(target: "consensus::validator", "Successfully executed \"metadata\" call"); @@ -968,13 +901,11 @@ impl ValidatorState { continue } - let Ok((_, vk)) = self.blockchain.contracts.get_zkas( - &self.blockchain.sled_db, &call.contract_id, &zkas_ns - ) else { - error!(target: "consensus::validator", "Failed to find reference to zkas in sled"); - skip = true; - break - }; + let (_, vk) = self.blockchain.contracts.get_zkas( + &self.blockchain.sled_db, + &call.contract_id, + &zkas_ns, + )?; inner_vk_map.insert(zkas_ns.to_string(), vk); } @@ -985,28 +916,13 @@ impl ValidatorState { // After getting the metadata, we run the "exec" function with the same // runtime and the same payload. info!(target: "consensus::validator", "Executing \"exec\" call"); - match runtime.exec(&payload) { - Ok(v) => { - info!(target: "consensus::validator", "Successfully executed \"exec\" call"); - updates.push(v); - } - Err(e) => { - error!( - target: "consensus::validator", - "Failed to execute \"exec\" call for contract id {}: {}", - call.contract_id, e - ); - skip = true; - break - } - }; + let state_update = runtime.exec(&payload)?; + + info!(target: "consensus::validator", "Successfully executed \"exec\" call"); + updates.push(state_update); + // At this point we're done with the call and move on to the next one. } - if skip { - warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash); - erroneous_txs.push(tx.clone()); - continue - } // When we're done looping and executing over the tx's contract calls, we // move on with verification. First we verify the signatures as that's @@ -1014,9 +930,7 @@ impl ValidatorState { info!(target: "consensus::validator", "Verifying signatures for transaction {}", tx_hash); if sig_table.len() != tx.signatures.len() { error!(target: "consensus::validator", "Incorrect number of signatures in tx {}", tx_hash); - warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash); - erroneous_txs.push(tx.clone()); - continue + return Err(Error::InvalidSignature) } match tx.verify_sigs(sig_table) { @@ -1025,16 +939,10 @@ impl ValidatorState { } Err(e) => { error!(target: "consensus::validator", "Signature verification for tx {} failed: {}", tx_hash, e); - warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash); - erroneous_txs.push(tx.clone()); - continue + return Err(e.into()) } }; - // NOTE: When it comes to the ZK proofs, we first do a lookup of the - // verifying keys, but if we do not find them, we'll generate them - // inside of this function. This can be kinda expensive, so open to - // alternatives. info!(target: "consensus::validator", "Verifying ZK proofs for transaction {}", tx_hash); match tx.verify_zkps(verifying_keys.clone(), zkp_table).await { Ok(()) => { @@ -1042,15 +950,14 @@ impl ValidatorState { } Err(e) => { error!(target: "consensus::validator", "ZK proof verification for tx {} failed: {}", tx_hash, e); - warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash); - erroneous_txs.push(tx.clone()); - continue + return Err(e.into()) } }; // After the verifications stage passes, if we're told to write, we // apply the state updates. assert!(tx.calls.len() == updates.len()); + if write { info!(target: "consensus::validator", "Performing state updates"); for (call, update) in tx.calls.iter().zip(updates.iter()) { @@ -1058,53 +965,15 @@ impl ValidatorState { // TODO: Optimize this // TODO: Sum up the gas costs of previous calls during execution // and verification and these. - let wasm = match self.blockchain.wasm_bincode.get(call.contract_id) { - Ok(v) => { - info!(target: "consensus::validator", "Found wasm bincode for {}", call.contract_id); - v - } - Err(e) => { - error!( - target: "consensus::validator", - "Could not find wasm bincode for contract {}: {}", - call.contract_id, e - ); - skip = true; - break - } - }; + let wasm = self.blockchain.wasm_bincode.get(call.contract_id)?; let mut runtime = - match Runtime::new(&wasm, self.blockchain.clone(), call.contract_id) { - Ok(v) => v, - Err(e) => { - error!( - target: "consensus::validator", - "Failed to instantiate WASM runtime for contract {}: {}", - call.contract_id, e - ); - skip = true; - break - } - }; + Runtime::new(&wasm, self.blockchain.clone(), call.contract_id)?; info!(target: "consensus::validator", "Executing \"apply\" call"); - match runtime.apply(update) { - // TODO: FIXME: This should be done in an atomic tx/batch - Ok(()) => { - info!(target: "consensus::validator", "State update applied successfully") - } - Err(e) => { - error!(target: "consensus::validator", "Failed to apply state update: {}", e); - skip = true; - break - } - }; - } - if skip { - warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash); - erroneous_txs.push(tx.clone()); - continue + // TODO: FIXME: This should be done in an atomic tx/batch + runtime.apply(update)?; + info!(target: "consensus::validator", "State update applied successfully") } } else { info!(target: "consensus::validator", "Skipping apply of state updates because write=false"); @@ -1113,7 +982,7 @@ impl ValidatorState { info!(target: "consensus::validator", "Transaction {} verified successfully", tx_hash); } - Ok(erroneous_txs) + Ok(()) } /// Append to canonical state received finalized slot checkpoints from block sync task.