consensus: Remove erroneus tx logic.

This commit is contained in:
parazyd
2023-03-06 20:10:01 +01:00
parent 96c2f8a7d4
commit 43f89dbfc7
7 changed files with 40 additions and 285 deletions

View File

@@ -225,9 +225,6 @@ impl RequestHandler for Darkfid {
Some("blockchain.lookup_zkas") => {
return self.blockchain_lookup_zkas(req.id, params).await
}
Some("blockchain.was_erroneous_tx") => {
return self.blockchain_was_erroneous_tx(req.id, params).await
}
// ===================
// Transaction methods

View File

@@ -192,24 +192,4 @@ impl Darkfid {
JsonResponse::new(json!(ret), id).into()
}
// RPCAPI:
// Queries the blockchain database to check if the provided transaction hash exists
// in the erroneous transactions set.
//
// --> {"jsonrpc": "2.0", "method": "blockchain.was_erroneous_tx", "params": [[tx_hash bytes]], "id": 1}
// <-- {"jsonrpc": "2.0", "result": bool, "id": 1}
pub async fn blockchain_was_erroneous_tx(&self, id: Value, params: &[Value]) -> JsonResult {
if params.len() != 1 || !params[0].is_array() {
return JsonError::new(InvalidParams, None, id).into()
}
let hash_bytes: [u8; 32] = serde_json::from_value(params[0].clone()).unwrap();
let tx_hash = blake3::Hash::try_from(hash_bytes).unwrap();
let blockchain = { self.validator_state.read().await.blockchain.clone() };
let Ok(result) = blockchain.was_erroneous_tx(&tx_hash) else {
return JsonError::new(InternalError, None, id).into()
};
JsonResponse::new(json!(result), id).into()
}
}

View File

@@ -1053,18 +1053,14 @@ async fn main() -> Result<()> {
{
tx
} else {
eprintln!("Tx not found");
return Ok(())
eprintln!("tx not found");
exit(1);
};
// Make sure the tx is correct
assert_eq!(tx.hash(), tx_hash);
println!("Transaction ID: {}", tx_hash);
let is_err = drk
.was_erroneous_tx(&tx_hash)
.await
.with_context(|| "Failed to get tx state")?;
println!("State: {}", if is_err { "failed" } else { "passed" });
Ok(())
}

View File

@@ -118,10 +118,6 @@ impl Drk {
async fn scan_block_dao(&self, block: &BlockInfo) -> Result<()> {
eprintln!("[DAO] Iterating over {} transactions", block.txs.len());
for tx in block.txs.iter() {
// Verify transaction is not in the erroneous set
if self.was_erroneous_tx(&tx.hash()).await? {
continue
}
self.apply_tx_dao_data(tx, true).await?;
}
@@ -136,10 +132,6 @@ impl Drk {
eprintln!("[Money] Iterating over {} transactions", block.txs.len());
for tx in block.txs.iter() {
// Verify transaction is not in the erroneous set
if self.was_erroneous_tx(&tx.hash()).await? {
continue
}
self.apply_tx_money_data(tx, true).await?;
}
@@ -307,13 +299,4 @@ impl Drk {
Ok(())
}
/// Queries darkfid to check if transaction is in the erroneous set
pub async fn was_erroneous_tx(&self, tx_hash: &blake3::Hash) -> Result<bool> {
let req = JsonRequest::new("blockchain.was_erroneous_tx", json!([tx_hash.as_bytes()]));
match self.rpc_client.request(req).await {
Ok(v) => Ok(serde_json::from_value(v)?),
Err(_) => Ok(false),
}
}
}

View File

@@ -20,7 +20,6 @@ use log::debug;
use crate::{
consensus::{Block, BlockInfo, SlotCheckpoint},
tx::Transaction,
util::time::Timestamp,
Result,
};
@@ -32,7 +31,7 @@ pub mod slot_checkpoint_store;
pub use slot_checkpoint_store::SlotCheckpointStore;
pub mod tx_store;
pub use tx_store::{ErroneousTxStore, TxStore};
pub use tx_store::TxStore;
pub mod contract_store;
pub use contract_store::{ContractStateStore, WasmStore};
@@ -52,8 +51,6 @@ pub struct Blockchain {
pub slot_checkpoints: SlotCheckpointStore,
/// Transactions sled tree
pub transactions: TxStore,
/// Erroneous transactions sled tree
pub erroneous_txs: ErroneousTxStore,
/// Contract states
pub contracts: ContractStateStore,
/// Wasm bincodes
@@ -68,7 +65,6 @@ impl Blockchain {
let order = BlockOrderStore::new(db, genesis_ts, genesis_data)?;
let slot_checkpoints = SlotCheckpointStore::new(db)?;
let transactions = TxStore::new(db)?;
let erroneous_txs = ErroneousTxStore::new(db)?;
let contracts = ContractStateStore::new(db)?;
let wasm_bincode = WasmStore::new(db)?;
@@ -79,7 +75,6 @@ impl Blockchain {
order,
slot_checkpoints,
transactions,
erroneous_txs,
contracts,
wasm_bincode,
})
@@ -221,14 +216,4 @@ impl Blockchain {
};
Ok(!vec.is_empty())
}
/// Insert a given slice of [`Transactions`] containing erronenous txs into the blockchain database
pub fn add_erroneous_txs(&self, erronenous_txs: &[Transaction]) -> Result<Vec<blake3::Hash>> {
self.erroneous_txs.insert(erronenous_txs)
}
/// Check if the erroneoustxstore contains given transaction hash.
pub fn was_erroneous_tx(&self, tx_hash: &blake3::Hash) -> Result<bool> {
self.erroneous_txs.contains(tx_hash)
}
}

View File

@@ -21,7 +21,6 @@ use darkfi_serial::{deserialize, serialize};
use crate::{tx::Transaction, Error, Result};
const SLED_TX_TREE: &[u8] = b"_transactions";
const SLED_ERRONEOUS_TX_TREE: &[u8] = b"_erroneous_transactions";
/// The `TxStore` is a `sled` tree storing all the blockchain's
/// transactions where the key is the transaction hash, and the value is
@@ -106,57 +105,3 @@ impl TxStore {
Ok(txs)
}
}
/// The `ErroneousTxStore` is a `sled` tree storing all the blockchain's
/// erroneous transactions where the key is the transaction hash, and the value is
/// an empty slice.
#[derive(Clone)]
pub struct ErroneousTxStore(sled::Tree);
impl ErroneousTxStore {
/// Opens a new or existing `ErroneousTxStore` on the given sled database.
pub fn new(db: &sled::Db) -> Result<Self> {
let tree = db.open_tree(SLED_ERRONEOUS_TX_TREE)?;
Ok(Self(tree))
}
/// Insert a slice of [`Transaction`] into the erroneoustxstore. With sled, the
/// operation is done as a batch.
/// The transactions are hashed with BLAKE3 and this hash is used as
/// the key, while the value is an empty slice.
/// On success, the function returns the transaction hashes in the same
/// order as the input transactions.
pub fn insert(&self, transactions: &[Transaction]) -> Result<Vec<blake3::Hash>> {
let mut ret = Vec::with_capacity(transactions.len());
let mut batch = sled::Batch::default();
for tx in transactions {
let serialized = serialize(tx);
let tx_hash = blake3::hash(&serialized);
batch.insert(tx_hash.as_bytes(), &[]);
ret.push(tx_hash);
}
self.0.apply_batch(batch)?;
Ok(ret)
}
/// Check if the erroneoustxstore contains a given transaction hash.
pub fn contains(&self, tx_hash: &blake3::Hash) -> Result<bool> {
Ok(self.0.contains_key(tx_hash.as_bytes())?)
}
/// Retrieve all erroneous transaction hashes from the erroneoustxstore.
/// Be careful as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<blake3::Hash>> {
let mut txs = vec![];
for tx in self.0.iter() {
let (key, _) = tx.unwrap();
let hash_bytes: [u8; 32] = key.as_ref().try_into().unwrap();
txs.push(hash_bytes.into());
}
Ok(txs)
}
}

View File

@@ -657,19 +657,15 @@ impl ValidatorState {
let blocks_subscriber = self.subscribers.get("blocks").unwrap().clone();
// Validating state transitions
let mut erroneous_txs = vec![];
for proposal in &finalized {
// TODO: Is this the right place? We're already doing this in protocol_sync.
// TODO: These state transitions have already been checked. (I wrote this, but where?)
// TODO: FIXME: The state transitions have already been written, they have to be in memory
// until this point.
info!(target: "consensus::validator", "Applying state transition for finalized block");
match self.verify_transactions(&proposal.txs, true).await {
Ok(hashes) => erroneous_txs.extend(hashes),
Err(e) => {
error!(target: "consensus::validator", "Finalized block transaction verifications failed: {}", e);
return Err(e)
}
if let Err(e) = self.verify_transactions(&proposal.txs, true).await {
error!(target: "consensus::validator", "Finalized block transaction verifications failed: {}", e);
return Err(e)
}
// Remove proposal transactions from memory pool
@@ -684,7 +680,6 @@ impl ValidatorState {
info!(target: "consensus::validator", "consensus: Sending notification about finalized block");
blocks_subscriber.notify(notif).await;
}
self.blockchain.add_erroneous_txs(&erroneous_txs)?;
// Setting leaders history to last proposal leaders count
let last_state_checkpoint = fork.sequence.last().unwrap().clone();
@@ -738,21 +733,16 @@ impl ValidatorState {
pub async fn receive_blocks(&mut self, blocks: &[BlockInfo]) -> Result<()> {
// Verify state transitions for all blocks and their respective transactions.
info!(target: "consensus::validator", "receive_blocks(): Starting state transition validations");
let mut erroneous_txs = vec![];
for block in blocks {
match self.verify_transactions(&block.txs, true).await {
Ok(hashes) => erroneous_txs.extend(hashes),
Err(e) => {
error!(target: "consensus::validator", "receive_blocks(): Transaction verifications failed: {}", e);
return Err(e)
}
}
if let Err(e) = self.verify_transactions(&block.txs, true).await {
error!(target: "consensus::validator", "receive_blocks(): Transaction verifications failed: {}", e);
return Err(e)
};
}
info!(target: "consensus::validator", "receive_blocks(): All state transitions passed");
info!(target: "consensus::validator", "receive_blocks(): Appending blocks to ledger");
info!(target: "consensus::validator", "receive_blocks(): All state transitions passed. Appending blocks to ledger.");
self.blockchain.add(blocks)?;
self.blockchain.add_erroneous_txs(&erroneous_txs)?;
Ok(())
}
@@ -843,8 +833,6 @@ impl ValidatorState {
/// transaction if any of the verifications fail.
/// The function takes a boolean called `write` which tells it to actually write
/// the state transitions to the database.
// TODO: Currently we keep erroneous transactions in the vector and blocks,
// in order to apply max fee logic in the future, to prevent spamming.
// TODO: This should be paralellized as if even one tx in the batch fails to verify,
// we can skip it. When things are parallel, make sure to write in a deterministic
// order.
@@ -854,13 +842,8 @@ impl ValidatorState {
// 3. Verify execution
// 4. Verify ZK proofs
// 5. (optionally) write
pub async fn verify_transactions(
&self,
txs: &[Transaction],
write: bool,
) -> Result<Vec<Transaction>> {
pub async fn verify_transactions(&self, txs: &[Transaction], write: bool) -> Result<()> {
info!(target: "consensus::validator", "Verifying {} transaction(s)", txs.len());
let mut erroneous_txs = vec![];
for tx in txs {
let tx_hash = blake3::hash(&serialize(tx));
@@ -883,24 +866,9 @@ impl ValidatorState {
}
// Iterate over all calls to get the metadata
let mut skip = false;
for (idx, call) in tx.calls.iter().enumerate() {
info!(target: "consensus::validator", "Executing contract call {}", idx);
let wasm = match self.blockchain.wasm_bincode.get(call.contract_id) {
Ok(v) => {
info!(target: "consensus::validator", "Found wasm bincode for {}", call.contract_id);
v
}
Err(e) => {
error!(
target: "consensus::validator",
"Could not find wasm bincode for contract {}: {}",
call.contract_id, e
);
skip = true;
break
}
};
let wasm = self.blockchain.wasm_bincode.get(call.contract_id)?;
// Write the actual payload data
let mut payload = vec![];
@@ -908,53 +876,18 @@ impl ValidatorState {
tx.calls.encode(&mut payload)?; // Actual call data
// Instantiate the wasm runtime
let mut runtime =
match Runtime::new(&wasm, self.blockchain.clone(), call.contract_id) {
Ok(v) => v,
Err(e) => {
error!(
target: "consensus::validator",
"Failed to instantiate WASM runtime for contract {}: {}",
call.contract_id, e
);
skip = true;
break
}
};
let mut runtime = Runtime::new(&wasm, self.blockchain.clone(), call.contract_id)?;
info!(target: "consensus::validator", "Executing \"metadata\" call");
let metadata = match runtime.metadata(&payload) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::validator", "Failed to execute \"metadata\" call: {}", e);
skip = true;
break
}
};
let metadata = runtime.metadata(&payload)?;
// Decode the metadata retrieved from the execution
let mut decoder = Cursor::new(&metadata);
// (zkas_ns, public_inputs)
let zkp_pub: Vec<(String, Vec<pallas::Base>)> = match Decodable::decode(
&mut decoder,
) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::validator", "Failed to decode ZK public inputs from metadata: {}", e);
skip = true;
break
}
};
// The tuple is (zkas_ns, public_inputs)
let zkp_pub: Vec<(String, Vec<pallas::Base>)> = Decodable::decode(&mut decoder)?;
let sig_pub: Vec<PublicKey> = match Decodable::decode(&mut decoder) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::validator", "Failed to decode signature pubkeys from metadata: {}", e);
skip = true;
break
}
};
let sig_pub: Vec<PublicKey> = Decodable::decode(&mut decoder)?;
// TODO: Make sure we've read all the bytes above.
info!(target: "consensus::validator", "Successfully executed \"metadata\" call");
@@ -968,13 +901,11 @@ impl ValidatorState {
continue
}
let Ok((_, vk)) = self.blockchain.contracts.get_zkas(
&self.blockchain.sled_db, &call.contract_id, &zkas_ns
) else {
error!(target: "consensus::validator", "Failed to find reference to zkas in sled");
skip = true;
break
};
let (_, vk) = self.blockchain.contracts.get_zkas(
&self.blockchain.sled_db,
&call.contract_id,
&zkas_ns,
)?;
inner_vk_map.insert(zkas_ns.to_string(), vk);
}
@@ -985,28 +916,13 @@ impl ValidatorState {
// After getting the metadata, we run the "exec" function with the same
// runtime and the same payload.
info!(target: "consensus::validator", "Executing \"exec\" call");
match runtime.exec(&payload) {
Ok(v) => {
info!(target: "consensus::validator", "Successfully executed \"exec\" call");
updates.push(v);
}
Err(e) => {
error!(
target: "consensus::validator",
"Failed to execute \"exec\" call for contract id {}: {}",
call.contract_id, e
);
skip = true;
break
}
};
let state_update = runtime.exec(&payload)?;
info!(target: "consensus::validator", "Successfully executed \"exec\" call");
updates.push(state_update);
// At this point we're done with the call and move on to the next one.
}
if skip {
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
}
// When we're done looping and executing over the tx's contract calls, we
// move on with verification. First we verify the signatures as that's
@@ -1014,9 +930,7 @@ impl ValidatorState {
info!(target: "consensus::validator", "Verifying signatures for transaction {}", tx_hash);
if sig_table.len() != tx.signatures.len() {
error!(target: "consensus::validator", "Incorrect number of signatures in tx {}", tx_hash);
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
return Err(Error::InvalidSignature)
}
match tx.verify_sigs(sig_table) {
@@ -1025,16 +939,10 @@ impl ValidatorState {
}
Err(e) => {
error!(target: "consensus::validator", "Signature verification for tx {} failed: {}", tx_hash, e);
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
return Err(e.into())
}
};
// NOTE: When it comes to the ZK proofs, we first do a lookup of the
// verifying keys, but if we do not find them, we'll generate them
// inside of this function. This can be kinda expensive, so open to
// alternatives.
info!(target: "consensus::validator", "Verifying ZK proofs for transaction {}", tx_hash);
match tx.verify_zkps(verifying_keys.clone(), zkp_table).await {
Ok(()) => {
@@ -1042,15 +950,14 @@ impl ValidatorState {
}
Err(e) => {
error!(target: "consensus::validator", "ZK proof verification for tx {} failed: {}", tx_hash, e);
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
return Err(e.into())
}
};
// After the verifications stage passes, if we're told to write, we
// apply the state updates.
assert!(tx.calls.len() == updates.len());
if write {
info!(target: "consensus::validator", "Performing state updates");
for (call, update) in tx.calls.iter().zip(updates.iter()) {
@@ -1058,53 +965,15 @@ impl ValidatorState {
// TODO: Optimize this
// TODO: Sum up the gas costs of previous calls during execution
// and verification and these.
let wasm = match self.blockchain.wasm_bincode.get(call.contract_id) {
Ok(v) => {
info!(target: "consensus::validator", "Found wasm bincode for {}", call.contract_id);
v
}
Err(e) => {
error!(
target: "consensus::validator",
"Could not find wasm bincode for contract {}: {}",
call.contract_id, e
);
skip = true;
break
}
};
let wasm = self.blockchain.wasm_bincode.get(call.contract_id)?;
let mut runtime =
match Runtime::new(&wasm, self.blockchain.clone(), call.contract_id) {
Ok(v) => v,
Err(e) => {
error!(
target: "consensus::validator",
"Failed to instantiate WASM runtime for contract {}: {}",
call.contract_id, e
);
skip = true;
break
}
};
Runtime::new(&wasm, self.blockchain.clone(), call.contract_id)?;
info!(target: "consensus::validator", "Executing \"apply\" call");
match runtime.apply(update) {
// TODO: FIXME: This should be done in an atomic tx/batch
Ok(()) => {
info!(target: "consensus::validator", "State update applied successfully")
}
Err(e) => {
error!(target: "consensus::validator", "Failed to apply state update: {}", e);
skip = true;
break
}
};
}
if skip {
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
// TODO: FIXME: This should be done in an atomic tx/batch
runtime.apply(update)?;
info!(target: "consensus::validator", "State update applied successfully")
}
} else {
info!(target: "consensus::validator", "Skipping apply of state updates because write=false");
@@ -1113,7 +982,7 @@ impl ValidatorState {
info!(target: "consensus::validator", "Transaction {} verified successfully", tx_hash);
}
Ok(erroneous_txs)
Ok(())
}
/// Append to canonical state received finalized slot checkpoints from block sync task.