darkfid2/rpc: basic tx methods added

This commit is contained in:
aggstam
2023-07-18 19:00:52 +03:00
parent 48057c25b0
commit 9f81e693ed
11 changed files with 227 additions and 16 deletions

1
Cargo.lock generated
View File

@@ -1782,6 +1782,7 @@ dependencies = [
"async-std",
"async-trait",
"blake3",
"bs58",
"darkfi",
"darkfi-consensus-contract",
"darkfi-contract-test-harness",

View File

@@ -19,6 +19,7 @@ darkfi-serial = {path = "../../src/serial"}
# Misc
blake3 = "1.4.1"
bs58 = "0.5.0"
log = "0.4.19"
sled = "0.34.7"

View File

@@ -24,15 +24,15 @@ use darkfi::rpc::jsonrpc::{ErrorCode::ServerError, JsonError, JsonResult};
/// Please sort them sensefully.
pub enum RpcError {
// Transaction-related errors
_TxSimulationFail = -32110,
_TxBroadcastFail = -32111,
TxSimulationFail = -32110,
TxBroadcastFail = -32111,
// State-related errors,
_NotSynced = -32120,
NotSynced = -32120,
UnknownSlot = -32121,
// Parsing errors
_ParseError = -32190,
ParseError = -32190,
// Contract-related errors
ContractZkasDbNotFound = -32200,
@@ -41,13 +41,13 @@ pub enum RpcError {
fn to_tuple(e: RpcError) -> (i64, String) {
let msg = match e {
// Transaction-related errors
RpcError::_TxSimulationFail => "Failed simulating transaction state change",
RpcError::_TxBroadcastFail => "Failed broadcasting transaction",
RpcError::TxSimulationFail => "Failed simulating transaction state change",
RpcError::TxBroadcastFail => "Failed broadcasting transaction",
// State-related errors
RpcError::_NotSynced => "Blockchain is not synced",
RpcError::NotSynced => "Blockchain is not synced",
RpcError::UnknownSlot => "Did not find slot",
// Parsing errors
RpcError::_ParseError => "Parse error",
RpcError::ParseError => "Parse error",
// Contract-related errors
RpcError::ContractZkasDbNotFound => "zkas database not found for given contract",
};

View File

@@ -16,7 +16,10 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::{stream::StreamExt, sync::Arc};
use async_std::{
stream::StreamExt,
sync::{Arc, Mutex},
};
use log::info;
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use url::Url;
@@ -39,9 +42,10 @@ mod tests;
mod error;
use error::{server_error, RpcError};
/// JSON-RPC requests handler
/// JSON-RPC requests handler and methods
mod rpc;
mod rpc_blockchain;
mod rpc_tx;
/// Utility functions
mod utils;
@@ -91,6 +95,7 @@ pub struct Darkfid {
sync_p2p: P2pPtr,
consensus_p2p: Option<P2pPtr>,
validator: ValidatorPtr,
synced: Mutex<bool>,
}
impl Darkfid {
@@ -99,7 +104,7 @@ impl Darkfid {
consensus_p2p: Option<P2pPtr>,
validator: ValidatorPtr,
) -> Self {
Self { sync_p2p, consensus_p2p, validator }
Self { synced: Mutex::new(false), sync_p2p, consensus_p2p, validator }
}
}
@@ -153,6 +158,9 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'_>>) -> Result<()> {
let _ex = ex.clone();
ex.spawn(listen_and_serve(args.rpc_listen, darkfid.clone(), _ex)).detach();
// Simulate that we have synced
*darkfid.synced.lock().await = true;
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new()?;
signals_handler.wait_termination(signals_task).await?;

View File

@@ -67,6 +67,12 @@ impl RequestHandler for Darkfid {
return self.blockchain_lookup_zkas(req.id, params).await
}
// ===================
// Transaction methods
// ===================
Some("tx.simulate") => return self.tx_simulate(req.id, params).await,
Some("tx.broadcast") => return self.tx_broadcast(req.id, params).await,
// ==============
// Invalid method
// ==============

146
bin/darkfid2/src/rpc_tx.rs Normal file
View File

@@ -0,0 +1,146 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi_serial::deserialize;
use log::error;
use serde_json::{json, Value};
use darkfi::{
rpc::jsonrpc::{ErrorCode::InvalidParams, JsonError, JsonResponse, JsonResult},
tx::Transaction,
};
use super::Darkfid;
use crate::{server_error, RpcError};
impl Darkfid {
// RPCAPI:
// Simulate a network state transition with the given transaction.
// Returns `true` if the transaction is valid, otherwise, a corresponding
// error.
//
// --> {"jsonrpc": "2.0", "method": "tx.simulate", "params": ["base58encodedTX"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn tx_simulate(&self, id: Value, params: &[Value]) -> JsonResult {
if params.len() != 1 || !params[0].is_string() {
return JsonError::new(InvalidParams, None, id).into()
}
if !(*self.synced.lock().await) {
error!("[RPC] tx.simulate: Blockchain is not synced");
return server_error(RpcError::NotSynced, id, None)
}
// Try to deserialize the transaction
let tx_bytes = match bs58::decode(params[0].as_str().unwrap().trim()).into_vec() {
Ok(v) => v,
Err(e) => {
error!("[RPC] tx.simulate: Failed decoding base58 transaction: {}", e);
return server_error(RpcError::ParseError, id, None)
}
};
let tx: Transaction = match deserialize(&tx_bytes) {
Ok(v) => v,
Err(e) => {
error!("[RPC] tx.simulate: Failed deserializing bytes into Transaction: {}", e);
return server_error(RpcError::ParseError, id, None)
}
};
// Simulate state transition
let lock = self.validator.read().await;
let current_slot = lock.consensus.time_keeper.current_slot();
let result = lock.add_transactions(&[tx], current_slot, false).await;
if result.is_err() {
error!(
"[RPC] tx.simulate: Failed to validate state transition: {}",
result.err().unwrap()
);
return server_error(RpcError::TxSimulationFail, id, None)
};
JsonResponse::new(json!(true), id).into()
}
// RPCAPI:
// Broadcast a given transaction to the P2P network.
// The function will first simulate the state transition in order to see
// if the transaction is actually valid, and in turn it will return an
// error if this is the case. Otherwise, a transaction ID will be returned.
//
// --> {"jsonrpc": "2.0", "method": "tx.broadcast", "params": ["base58encodedTX"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "txID...", "id": 1}
pub async fn tx_broadcast(&self, id: Value, params: &[Value]) -> JsonResult {
if params.len() != 1 || !params[0].is_string() {
return JsonError::new(InvalidParams, None, id).into()
}
if !(*self.synced.lock().await) {
error!("[RPC] tx.transfer: Blockchain is not synced");
return server_error(RpcError::NotSynced, id, None)
}
// Try to deserialize the transaction
let tx_bytes = match bs58::decode(params[0].as_str().unwrap().trim()).into_vec() {
Ok(v) => v,
Err(e) => {
error!("[RPC] tx.broadcast: Failed decoding base58 transaction: {}", e);
return server_error(RpcError::ParseError, id, None)
}
};
let tx: Transaction = match deserialize(&tx_bytes) {
Ok(v) => v,
Err(e) => {
error!("[RPC] tx.broadcast: Failed deserializing bytes into Transaction: {}", e);
return server_error(RpcError::ParseError, id, None)
}
};
if self.consensus_p2p.is_some() {
// Consider we're participating in consensus here?
// The append_tx function performs a state transition check.
if self.validator.write().await.append_tx(tx.clone()).await.is_err() {
error!("[RPC] tx.broadcast: Failed to append transaction to mempool");
return server_error(RpcError::TxSimulationFail, id, None)
}
} else {
// We'll perform the state transition check here.
let lock = self.validator.read().await;
let current_slot = lock.consensus.time_keeper.current_slot();
let result = lock.add_transactions(&[tx.clone()], current_slot, false).await;
if result.is_err() {
error!(
"[RPC] tx.simulate: Failed to validate state transition: {}",
result.err().unwrap()
);
return server_error(RpcError::TxSimulationFail, id, None)
};
}
self.sync_p2p.broadcast(&tx).await;
if self.sync_p2p.channels().lock().await.is_empty() {
error!("[RPC] tx.broadcast: Failed broadcasting tx, no connected channels");
return server_error(RpcError::TxBroadcastFail, id, None)
}
let tx_hash = tx.hash().to_string();
JsonResponse::new(json!(tx_hash), id).into()
}
}

View File

@@ -92,8 +92,8 @@ impl Harness {
pub async fn validate_chains(&self) -> Result<()> {
let genesis_txs_total = self.config.alice_initial + self.config.bob_initial;
let alice = &self.alice._validator.read().await;
let bob = &self.bob._validator.read().await;
let alice = &self.alice.validator.read().await;
let bob = &self.bob.validator.read().await;
alice.validate_blockchain(genesis_txs_total, vec![]).await?;
bob.validate_blockchain(genesis_txs_total, vec![]).await?;
@@ -104,8 +104,8 @@ impl Harness {
}
pub async fn add_blocks(&self, blocks: &[BlockInfo]) -> Result<()> {
let alice = &self.alice._validator.read().await;
let bob = &self.bob._validator.read().await;
let alice = &self.alice.validator.read().await;
let bob = &self.bob.validator.read().await;
alice.add_blocks(blocks).await?;
bob.add_blocks(blocks).await?;

View File

@@ -31,7 +31,7 @@ async fn add_blocks() -> Result<()> {
let th = Harness::new(config).await?;
// Retrieve genesis block
let previous = th.alice._validator.read().await.blockchain.last_block()?;
let previous = th.alice.validator.read().await.blockchain.last_block()?;
// Generate next block
let block1 = th.generate_next_block(&previous, 1).await?;

View File

@@ -496,6 +496,9 @@ impl Error {
/// Transaction verification errors
#[derive(Debug, Clone, thiserror::Error)]
pub enum TxVerifyFailed {
#[error("Transaction {0} already exists")]
AlreadySeenTx(String),
#[error("Invalid transaction signature")]
InvalidSignature,

View File

@@ -50,6 +50,16 @@ impl TimeKeeper {
Self { genesis_ts, epoch_length, slot_time, verifying_slot }
}
/// Generate a Timekeeper for current slot
pub fn current(&self) -> Self {
Self {
genesis_ts: self.genesis_ts,
epoch_length: self.epoch_length,
slot_time: self.slot_time,
verifying_slot: self.current_slot(),
}
}
/// Calculates current epoch.
pub fn current_epoch(&self) -> u64 {
self.slot_epoch(self.current_slot())

View File

@@ -18,6 +18,7 @@
use async_std::sync::{Arc, RwLock};
use darkfi_sdk::{blockchain::Slot, crypto::PublicKey};
use darkfi_serial::serialize;
use log::{debug, error, info, warn};
use crate::{
@@ -119,6 +120,41 @@ impl Validator {
Ok(state)
}
/// The node retrieves a transaction, validates its state transition,
/// and appends it to the pending txs store.
pub async fn append_tx(&mut self, tx: Transaction) -> Result<()> {
let tx_hash = blake3::hash(&serialize(&tx));
// Check if we have already seen this tx
let tx_in_txstore = self.blockchain.transactions.contains(&tx_hash)?;
let tx_in_pending_txs_store = self.blockchain.pending_txs.contains(&tx_hash)?;
if tx_in_txstore || tx_in_pending_txs_store {
info!(target: "validator", "append_tx(): We have already seen this tx");
return Err(TxVerifyFailed::AlreadySeenTx(tx_hash.to_string()).into())
}
// Verify state transition
info!(target: "validator", "append_tx(): Starting state transition validation");
// TODO: this should be over all forks overlays
let overlay = BlockchainOverlay::new(&self.blockchain)?;
// Generate a time keeper for current slot
let time_keeper = self.consensus.time_keeper.current();
// Verify transaction
let erroneous_txs = verify_transactions(&overlay, &time_keeper, &[tx.clone()]).await?;
if !erroneous_txs.is_empty() {
return Err(TxVerifyFailed::ErroneousTxs(erroneous_txs).into())
}
// Add transaction to pending txs store
self.blockchain.add_pending_txs(&[tx])?;
info!(target: "validator", "append_tx(): Appended tx to pending txs store");
Ok(())
}
// ==========================
// State transition functions
// ==========================