darkfid: erroneous txs handling

Darkfid will not apply erroneous txs state transitions and keep them in a separate sled tree. drk to filtre these txs to report correct balances/coins
This commit is contained in:
aggstam
2023-02-10 20:59:03 +02:00
parent bfc81aa946
commit b0d24ef776
8 changed files with 261 additions and 46 deletions

View File

@@ -224,6 +224,9 @@ impl RequestHandler for Darkfid {
Some("blockchain.lookup_zkas") => {
return self.blockchain_lookup_zkas(req.id, params).await
}
Some("blockchain.is_erroneous_tx") => {
return self.blockchain_is_erroneous_tx(req.id, params).await
}
// ===================
// Transaction methods

View File

@@ -142,4 +142,24 @@ impl Darkfid {
JsonResponse::new(json!(ret), id).into()
}
// RPCAPI:
// Queries the blockchain database to check if the provided transaction hash exists
// in the erroneous transactions set.
//
// --> {"jsonrpc": "2.0", "method": "blockchain.is_erroneous_tx", "params": [[tx_hash bytes]], "id": 1}
// <-- {"jsonrpc": "2.0", "result": bool, "id": 1}
pub async fn blockchain_is_erroneous_tx(&self, id: Value, params: &[Value]) -> JsonResult {
if params.len() != 1 || !params[0].is_array() {
return JsonError::new(InvalidParams, None, id).into()
}
let hash_bytes: [u8; 32] = serde_json::from_value(params[0].clone()).unwrap();
let tx_hash = blake3::Hash::try_from(hash_bytes).unwrap();
let blockchain = { self.validator_state.read().await.blockchain.clone() };
let Ok(result) = blockchain.is_erroneous_tx(&tx_hash) else {
return JsonError::new(InternalError, None, id).into()
};
JsonResponse::new(json!(result), id).into()
}
}

View File

@@ -117,6 +117,10 @@ impl Drk {
async fn scan_block_dao(&self, block: &BlockInfo) -> Result<()> {
eprintln!("[DAO] Iterating over {} transactions", block.txs.len());
for tx in block.txs.iter() {
// Verify transaction is not in the erroneous set
if self.is_erroneous_tx(tx).await? {
continue
}
self.apply_tx_dao_data(tx, true).await?;
}
@@ -131,6 +135,10 @@ impl Drk {
eprintln!("[Money] Iterating over {} transactions", block.txs.len());
for tx in block.txs.iter() {
// Verify transaction is not in the erroneous set
if self.is_erroneous_tx(tx).await? {
continue
}
self.apply_tx_money_data(tx, true).await?;
}
@@ -272,4 +280,15 @@ impl Drk {
Ok(())
}
/// Queries darkfid to check if transaction is in the erroneous set
async fn is_erroneous_tx(&self, tx: &Transaction) -> Result<bool> {
let serialized = serialize(tx);
let tx_hash = blake3::hash(&serialized);
let req = JsonRequest::new("blockchain.is_erroneous_tx", json!([tx_hash.as_bytes()]));
match self.rpc_client.request(req).await {
Ok(v) => Ok(serde_json::from_value(v)?),
Err(_) => Ok(false),
}
}
}

View File

@@ -1,4 +1,4 @@
darkfid localnet
================
This will start on darkfid node in single-node mode and a faucet node.
This will start one darkfid node in single-node mode and a faucet node.

View File

@@ -22,7 +22,7 @@ use darkfi::{
blockchain::{
block_store::{BlockOrderStore, BlockStore, HeaderStore},
slot_checkpoint_store::SlotCheckpointStore,
tx_store::TxStore,
tx_store::{ErroneousTxStore, TxStore},
Blockchain,
},
consensus::{
@@ -264,6 +264,27 @@ impl TxStoreInfo {
}
}
#[derive(Debug)]
struct ErroneousTxStoreInfo {
_transactions: Vec<TxInfo>,
}
impl ErroneousTxStoreInfo {
pub fn new(erroneoustxstore: &ErroneousTxStore) -> ErroneousTxStoreInfo {
let mut _transactions = Vec::new();
let result = erroneoustxstore.get_all();
match result {
Ok(iter) => {
for (hash, tx) in iter.iter() {
_transactions.push(TxInfo::new(hash.clone(), &tx));
}
}
Err(e) => println!("Error: {:?}", e),
}
ErroneousTxStoreInfo { _transactions }
}
}
#[derive(Debug)]
struct BlockchainInfo {
_headers: HeaderStoreInfo,
@@ -271,6 +292,7 @@ struct BlockchainInfo {
_order: BlockOrderStoreInfo,
_slot_checkpoints: SlotCheckpointStoreInfo,
_transactions: TxStoreInfo,
_erroneous_txs: ErroneousTxStoreInfo,
}
impl BlockchainInfo {
@@ -280,7 +302,15 @@ impl BlockchainInfo {
let _order = BlockOrderStoreInfo::new(&blockchain.order);
let _slot_checkpoints = SlotCheckpointStoreInfo::new(&blockchain.slot_checkpoints);
let _transactions = TxStoreInfo::new(&blockchain.transactions);
BlockchainInfo { _headers, _blocks, _order, _slot_checkpoints, _transactions }
let _erroneous_txs = ErroneousTxStoreInfo::new(&blockchain.erroneous_txs);
BlockchainInfo {
_headers,
_blocks,
_order,
_slot_checkpoints,
_transactions,
_erroneous_txs,
}
}
}
@@ -344,14 +374,10 @@ async fn generate(localnet: &str, name: &str) -> Result<()> {
#[async_std::main]
async fn main() -> Result<()> {
// Localnet folder
let localnet = "../../../contrib/localnet/darkfid/";
let localnet = "../../../contrib/localnet/darkfid-single-node/";
println!("Localnet folder: {localnet}");
// darkfid0
generate(localnet, "darkfid0").await?;
// darkfid1
generate(localnet, "darkfid1").await?;
// darkfid2
generate(localnet, "darkfid2").await?;
// faucetd
generate(localnet, "faucetd").await?;

View File

@@ -20,6 +20,7 @@ use log::debug;
use crate::{
consensus::{Block, BlockInfo, SlotCheckpoint},
tx::Transaction,
util::time::Timestamp,
Result,
};
@@ -31,7 +32,7 @@ pub mod slot_checkpoint_store;
pub use slot_checkpoint_store::SlotCheckpointStore;
pub mod tx_store;
pub use tx_store::TxStore;
pub use tx_store::{ErroneousTxStore, TxStore};
pub mod contract_store;
pub use contract_store::{ContractStateStore, WasmStore};
@@ -51,6 +52,8 @@ pub struct Blockchain {
pub slot_checkpoints: SlotCheckpointStore,
/// Transactions sled tree
pub transactions: TxStore,
/// Erroneous transactions sled tree
pub erroneous_txs: ErroneousTxStore,
/// Contract states
pub contracts: ContractStateStore,
/// Wasm bincodes
@@ -65,6 +68,7 @@ impl Blockchain {
let order = BlockOrderStore::new(db, genesis_ts, genesis_data)?;
let slot_checkpoints = SlotCheckpointStore::new(db)?;
let transactions = TxStore::new(db)?;
let erroneous_txs = ErroneousTxStore::new(db)?;
let contracts = ContractStateStore::new(db)?;
let wasm_bincode = WasmStore::new(db)?;
@@ -75,6 +79,7 @@ impl Blockchain {
order,
slot_checkpoints,
transactions,
erroneous_txs,
contracts,
wasm_bincode,
})
@@ -216,4 +221,14 @@ impl Blockchain {
};
Ok(!vec.is_empty())
}
/// Insert a given slice of [`Transactions`] containing erronenous txs into the blockchain database
pub fn add_erroneous_txs(&self, erronenous_txs: &[Transaction]) -> Result<Vec<blake3::Hash>> {
self.erroneous_txs.insert(erronenous_txs)
}
/// Check if the erroneoustxstore contains given transaction hash.
pub fn is_erroneous_tx(&self, tx_hash: &blake3::Hash) -> Result<bool> {
self.erroneous_txs.contains(tx_hash)
}
}

View File

@@ -21,6 +21,7 @@ use darkfi_serial::{deserialize, serialize};
use crate::{tx::Transaction, Error, Result};
const SLED_TX_TREE: &[u8] = b"_transactions";
const SLED_ERRONEOUS_TX_TREE: &[u8] = b"_erroneous_transactions";
/// The `TxStore` is a `sled` tree storing all the blockchain's
/// transactions where the key is the transaction hash, and the value is
@@ -47,9 +48,9 @@ impl TxStore {
for tx in transactions {
let serialized = serialize(tx);
let txhash = blake3::hash(&serialized);
batch.insert(txhash.as_bytes(), serialized);
ret.push(txhash);
let tx_hash = blake3::hash(&serialized);
batch.insert(tx_hash.as_bytes(), serialized);
ret.push(tx_hash);
}
self.0.apply_batch(batch)?;
@@ -57,8 +58,8 @@ impl TxStore {
}
/// Check if the txstore contains a given transaction hash.
pub fn contains(&self, txid: &blake3::Hash) -> Result<bool> {
Ok(self.0.contains_key(txid.as_bytes())?)
pub fn contains(&self, tx_hash: &blake3::Hash) -> Result<bool> {
Ok(self.0.contains_key(tx_hash.as_bytes())?)
}
/// Fetch given tx hashes from the txstore.
@@ -66,16 +67,20 @@ impl TxStore {
/// was found in the txstore, and otherwise it is `None`, if it has not.
/// The second parameter is a boolean which tells the function to fail in
/// case at least one block was not found.
pub fn get(&self, txids: &[blake3::Hash], strict: bool) -> Result<Vec<Option<Transaction>>> {
let mut ret = Vec::with_capacity(txids.len());
pub fn get(
&self,
tx_hashes: &[blake3::Hash],
strict: bool,
) -> Result<Vec<Option<Transaction>>> {
let mut ret = Vec::with_capacity(tx_hashes.len());
for txid in txids {
if let Some(found) = self.0.get(txid.as_bytes())? {
for tx_hash in tx_hashes {
if let Some(found) = self.0.get(tx_hash.as_bytes())? {
let tx = deserialize(&found)?;
ret.push(Some(tx));
} else {
if strict {
let s = txid.to_hex().as_str().to_string();
let s = tx_hash.to_hex().as_str().to_string();
return Err(Error::TransactionNotFound(s))
}
ret.push(None);
@@ -101,3 +106,87 @@ impl TxStore {
Ok(txs)
}
}
/// The `ErroneousTxStore` is a `sled` tree storing all the blockchain's
/// erroneous transactions where the key is the transaction hash, and the value is
/// the serialized transaction.
#[derive(Clone)]
pub struct ErroneousTxStore(sled::Tree);
impl ErroneousTxStore {
/// Opens a new or existing `ErroneousTxStore` on the given sled database.
pub fn new(db: &sled::Db) -> Result<Self> {
let tree = db.open_tree(SLED_ERRONEOUS_TX_TREE)?;
Ok(Self(tree))
}
/// Insert a slice of [`Transaction`] into the erroneoustxstore. With sled, the
/// operation is done as a batch.
/// The transactions are hashed with BLAKE3 and this hash is used as
/// the key, while the value is the serialized [`Transaction`] itself.
/// On success, the function returns the transaction hashes in the same
/// order as the input transactions.
pub fn insert(&self, transactions: &[Transaction]) -> Result<Vec<blake3::Hash>> {
let mut ret = Vec::with_capacity(transactions.len());
let mut batch = sled::Batch::default();
for tx in transactions {
let serialized = serialize(tx);
let tx_hash = blake3::hash(&serialized);
batch.insert(tx_hash.as_bytes(), serialized);
ret.push(tx_hash);
}
self.0.apply_batch(batch)?;
Ok(ret)
}
/// Check if the erroneoustxstore contains a given transaction hash.
pub fn contains(&self, tx_hash: &blake3::Hash) -> Result<bool> {
Ok(self.0.contains_key(tx_hash.as_bytes())?)
}
/// Fetch given erroneous tx hashes from the erroneoustxstore.
/// The resulting vector contains `Option`, which is `Some` if the tx
/// was found in the erroneoustxstore, and otherwise it is `None`, if it has not.
/// The second parameter is a boolean which tells the function to fail in
/// case at least one block was not found.
pub fn get(
&self,
tx_hashes: &[blake3::Hash],
strict: bool,
) -> Result<Vec<Option<Transaction>>> {
let mut ret = Vec::with_capacity(tx_hashes.len());
for tx_hash in tx_hashes {
if let Some(found) = self.0.get(tx_hash.as_bytes())? {
let tx = deserialize(&found)?;
ret.push(Some(tx));
} else {
if strict {
let s = tx_hash.to_hex().as_str().to_string();
return Err(Error::TransactionNotFound(s))
}
ret.push(None);
}
}
Ok(ret)
}
/// Retrieve all erroneous transactions from the erroneoustxstore
/// in the form of a tuple (`tx_hash`, `tx`).
/// Be careful as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<(blake3::Hash, Transaction)>> {
let mut txs = vec![];
for tx in self.0.iter() {
let (key, value) = tx.unwrap();
let hash_bytes: [u8; 32] = key.as_ref().try_into().unwrap();
let tx = deserialize(&value)?;
txs.push((hash_bytes.into(), tx));
}
Ok(txs)
}
}

View File

@@ -693,15 +693,19 @@ impl ValidatorState {
let blocks_subscriber = self.subscribers.get("blocks").unwrap().clone();
// Validating state transitions
let mut erroneous_txs = vec![];
for proposal in &finalized {
// TODO: Is this the right place? We're already doing this in protocol_sync.
// TODO: These state transitions have already been checked. (I wrote this, but where?)
// TODO: FIXME: The state transitions have already been written, they have to be in memory
// until this point.
info!(target: "consensus::validator", "Applying state transition for finalized block");
if let Err(e) = self.verify_transactions(&proposal.txs, true).await {
error!(target: "consensus::validator", "Finalized block transaction verifications failed: {}", e);
return Err(e)
match self.verify_transactions(&proposal.txs, true).await {
Ok(hashes) => erroneous_txs.extend(hashes),
Err(e) => {
error!(target: "consensus::validator", "Finalized block transaction verifications failed: {}", e);
return Err(e)
}
}
// Remove proposal transactions from memory pool
@@ -716,6 +720,7 @@ impl ValidatorState {
info!(target: "consensus::validator", "consensus: Sending notification about finalized block");
blocks_subscriber.notify(notif).await;
}
self.blockchain.add_erroneous_txs(&erroneous_txs)?;
// Setting leaders history to last proposal leaders count
let last_state_checkpoint = fork.sequence.last().unwrap().clone();
@@ -769,16 +774,21 @@ impl ValidatorState {
pub async fn receive_blocks(&mut self, blocks: &[BlockInfo]) -> Result<()> {
// Verify state transitions for all blocks and their respective transactions.
info!(target: "consensus::validator", "receive_blocks(): Starting state transition validations");
let mut erroneous_txs = vec![];
for block in blocks {
if let Err(e) = self.verify_transactions(&block.txs, true).await {
error!(target: "consensus::validator", "receive_blocks(): Transaction verifications failed: {}", e);
return Err(e)
match self.verify_transactions(&block.txs, true).await {
Ok(hashes) => erroneous_txs.extend(hashes),
Err(e) => {
error!(target: "consensus::validator", "receive_blocks(): Transaction verifications failed: {}", e);
return Err(e)
}
}
}
info!(target: "consensus::validator", "receive_blocks(): All state transitions passed");
info!(target: "consensus::validator", "receive_blocks(): Appending blocks to ledger");
self.blockchain.add(blocks)?;
self.blockchain.add_erroneous_txs(&erroneous_txs)?;
Ok(())
}
@@ -857,14 +867,21 @@ impl ValidatorState {
/// Validate signatures, wasm execution, and zk proofs for given transactions.
/// If all of those succeed, try to execute a state update for the contract calls.
/// Currently the verifications are sequential, and the function will fail if any
/// of the verifications fail.
/// Currently the verifications are sequential, and the function will skip a
/// transaction if any of the verifications fail.
/// The function takes a boolean called `write` which tells it to actually write
/// the state transitions to the database.
// TODO: Currently we keep erroneous transactions in the vector and blocks,
// in order to apply max fee logic in the future, to prevent spamming.
// TODO: This should be paralellized as if even one tx in the batch fails to verify,
// we can drop everything.
pub async fn verify_transactions(&self, txs: &[Transaction], write: bool) -> Result<()> {
// we can skip it.
pub async fn verify_transactions(
&self,
txs: &[Transaction],
write: bool,
) -> Result<Vec<Transaction>> {
info!(target: "consensus::validator", "Verifying {} transaction(s)", txs.len());
let mut erroneous_txs = vec![];
for tx in txs {
let tx_hash = blake3::hash(&serialize(tx));
info!(target: "consensus::validator", "Verifying transaction {}", tx_hash);
@@ -877,6 +894,7 @@ impl ValidatorState {
let mut updates = vec![];
// Iterate over all calls to get the metadata
let mut skip = false;
for (idx, call) in tx.calls.iter().enumerate() {
info!(target: "consensus::validator", "Executing contract call {}", idx);
let wasm = match self.blockchain.wasm_bincode.get(call.contract_id) {
@@ -890,7 +908,8 @@ impl ValidatorState {
"Could not find wasm bincode for contract {}: {}",
call.contract_id, e
);
return Err(Error::ContractNotFound(call.contract_id.to_string()))
skip = true;
break
}
};
@@ -906,10 +925,11 @@ impl ValidatorState {
Err(e) => {
error!(
target: "consensus::validator",
"Failed to instantiate WASM runtime for contract {}",
call.contract_id
"Failed to instantiate WASM runtime for contract {}: {}",
call.contract_id, e
);
return Err(e)
skip = true;
break
}
};
@@ -918,7 +938,8 @@ impl ValidatorState {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::validator", "Failed to execute \"metadata\" call: {}", e);
return Err(e)
skip = true;
break
}
};
@@ -930,7 +951,8 @@ impl ValidatorState {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::validator", "Failed to decode ZK public inputs from metadata: {}", e);
return Err(e.into())
skip = true;
break
}
};
@@ -938,7 +960,8 @@ impl ValidatorState {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::validator", "Failed to decode signature pubkeys from metadata: {}", e);
return Err(e.into())
skip = true;
break
}
};
@@ -961,11 +984,17 @@ impl ValidatorState {
"Failed to execute \"exec\" call for contract id {}: {}",
call.contract_id, e
);
return Err(e)
skip = true;
break
}
};
// At this point we're done with the call and move on to the next one.
}
if skip {
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
}
// When we're done looping and executing over the tx's contract calls, we
// move on with verification. First we verify the signatures as that's
@@ -973,7 +1002,9 @@ impl ValidatorState {
info!(target: "consensus::validator", "Verifying signatures for transaction {}", tx_hash);
if sig_table.len() != tx.signatures.len() {
error!(target: "consensus::validator", "Incorrect number of signatures in tx {}", tx_hash);
return Err(Error::InvalidSignature)
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
}
match tx.verify_sigs(sig_table) {
@@ -982,7 +1013,9 @@ impl ValidatorState {
}
Err(e) => {
error!(target: "consensus::validator", "Signature verification for tx {} failed: {}", tx_hash, e);
return Err(e)
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
}
};
@@ -997,7 +1030,9 @@ impl ValidatorState {
}
Err(e) => {
error!(target: "consensus::validator", "ZK proof verification for tx {} failed: {}", tx_hash, e);
return Err(e)
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
}
};
@@ -1022,7 +1057,8 @@ impl ValidatorState {
"Could not find wasm bincode for contract {}: {}",
call.contract_id, e
);
return Err(Error::ContractNotFound(call.contract_id.to_string()))
skip = true;
break
}
};
@@ -1032,10 +1068,11 @@ impl ValidatorState {
Err(e) => {
error!(
target: "consensus::validator",
"Failed to instantiate WASM runtime for contract {}",
call.contract_id
"Failed to instantiate WASM runtime for contract {}: {}",
call.contract_id, e
);
return Err(e)
skip = true;
break
}
};
@@ -1047,10 +1084,16 @@ impl ValidatorState {
}
Err(e) => {
error!(target: "consensus::validator", "Failed to apply state update: {}", e);
return Err(e)
skip = true;
break
}
};
}
if skip {
warn!(target: "consensus::validator", "Skipping transaction {}", tx_hash);
erroneous_txs.push(tx.clone());
continue
}
} else {
info!(target: "consensus::validator", "Skipping apply of state updates because write=false");
}
@@ -1058,7 +1101,7 @@ impl ValidatorState {
info!(target: "consensus::validator", "Transaction {} verified successfully", tx_hash);
}
Ok(())
Ok(erroneous_txs)
}
/// Append to canonical state received finalized slot checkpoints from block sync task.