diff --git a/script/research/blockchain-explorer/Cargo.toml b/script/research/blockchain-explorer/Cargo.toml index f0a6ecf4a..945c3cd95 100644 --- a/script/research/blockchain-explorer/Cargo.toml +++ b/script/research/blockchain-explorer/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "blockchain-explorer" version = "0.4.1" -description = "Command-line client to display statistics or export blockchain sled database contents" +description = "Daemon to listen for new blocks from darkfid and store them in an easily accessible format for further usage." authors = ["Dyne.org foundation "] repository = "https://github.com/darkrenaissance/darkfi" license = "AGPL-3.0-only" @@ -10,17 +10,33 @@ edition = "2021" [workspace] [dependencies] -blake3 = "1.5.1" -clap = {version = "4.4.11", features = ["derive"]} -darkfi = {path = "../../../", features = ["blockchain"]} +# Darkfi +darkfi = {path = "../../../", features = ["async-daemonize", "validator", "rusqlite"]} darkfi-sdk = {path = "../../../src/sdk"} -num-bigint = "0.4.5" -sled = "0.34.7" +darkfi-serial = {path = "../../../src/serial"} +drk = {path = "../../../bin/drk"} + +# Misc +log = "0.4.21" +rusqlite = {version = "0.31.0", features = ["sqlcipher"]} + +# JSON-RPC +async-trait = "0.1.80" +tinyjson = "2.5.1" +url = "2.5.0" + +# Daemon +easy-parallel = "3.3.1" +signal-hook-async-std = "0.2.2" +signal-hook = "0.3.17" +simplelog = "0.12.2" +smol = "2.0.0" + +# Argument parsing +serde = {version = "1.0.203", features = ["derive"]} +structopt = "0.3.26" +structopt-toml = "0.5.1" [patch.crates-io] halo2_proofs = {git="https://github.com/parazyd/halo2", branch="v4"} halo2_gadgets = {git="https://github.com/parazyd/halo2", branch="v4"} - -[lints] -workspace = true - diff --git a/script/research/blockchain-explorer/blockchain_explorer_config.toml b/script/research/blockchain-explorer/blockchain_explorer_config.toml new file mode 100644 index 000000000..bc3093efe --- /dev/null +++ b/script/research/blockchain-explorer/blockchain_explorer_config.toml @@ -0,0 +1,19 @@ +## blockchain-explorer configuration file +## +## Please make sure you go through all the settings so you can configure +## your daemon properly. +## +## The default values are left commented. They can be overridden either by +## uncommenting, or by using the command-line. + +# JSON-RPC listen URL +rpc_listen = "tcp://127.0.0.1:14567" + +# Path to daemon database +db_path = "~/.local/darkfi/blockchain-explorer/daemon.db" + +# Password for the daemon database +db_pass = "changeme" + +# darkfid JSON-RPC endpoint +endpoint = "tcp://127.0.0.1:8340" diff --git a/script/research/blockchain-explorer/blocks.sql b/script/research/blockchain-explorer/blocks.sql new file mode 100644 index 000000000..4bbd82b77 --- /dev/null +++ b/script/research/blockchain-explorer/blocks.sql @@ -0,0 +1,21 @@ +-- Database block table definition. +-- We store data in a usable format. +CREATE TABLE IF NOT EXISTS blocks ( + -- Header hash identifier of the block + header_hash TEXT PRIMARY KEY NOT NULL, + -- Block version + version INTEGER NOT NULL, + -- Previous block hash + previous TEXT NOT NULL, + -- Block height + height INTEGER NOT NULL, + -- Block creation timestamp + timestamp INTEGER NOT NULL, + -- The block's nonce. This value changes arbitrarily with mining. + nonce INTEGER NOT NULL, + -- Merkle tree root of the transactions hashes contained in this block + root TEXT NOT NULL, + -- Block producer signature + signature BLOB NOT NULL +); + diff --git a/script/research/blockchain-explorer/src/blocks.rs b/script/research/blockchain-explorer/src/blocks.rs new file mode 100644 index 000000000..66d61ab30 --- /dev/null +++ b/script/research/blockchain-explorer/src/blocks.rs @@ -0,0 +1,254 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2024 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use log::info; +use rusqlite::types::Value; + +use darkfi::{Error, Result}; +use darkfi_sdk::crypto::schnorr::Signature; +use darkfi_serial::{deserialize_async, serialize}; +use drk::{ + convert_named_params, + error::{WalletDbError, WalletDbResult}, +}; + +use crate::BlockchainExplorer; + +// Dtabase SQL table constant names. These have to represent the `block.sql` +// SQL schema. +pub const BLOCKS_TABLE: &str = "blocks"; + +// BLOCKS_TABLE +pub const BLOCKS_COL_HEADER_HASH: &str = "header_hash"; +pub const BLOCKS_COL_VERSION: &str = "version"; +pub const BLOCKS_COL_PREVIOUS: &str = "previous"; +pub const BLOCKS_COL_HEIGHT: &str = "height"; +pub const BLOCKS_COL_TIMESTAMP: &str = "timestamp"; +pub const BLOCKS_COL_NONCE: &str = "nonce"; +pub const BLOCKS_COL_ROOT: &str = "root"; +pub const BLOCKS_COL_SIGNATURE: &str = "signature"; + +#[derive(Debug, Clone)] +/// Structure representing a `BLOCKS_TABLE` record. +pub struct BlockRecord { + /// Header hash identifier of the block + pub header_hash: String, + /// Block version + pub version: u8, + /// Previous block hash + pub previous: String, + /// Block height + pub height: u32, + /// Block creation timestamp + pub timestamp: u64, + /// The block's nonce. This value changes arbitrarily with mining. + pub nonce: u64, + /// Merkle tree root of the transactions hashes contained in this block + pub root: String, + /// Block producer signature + pub signature: Signature, +} + +impl BlockchainExplorer { + /// Initialize database with blocks tables. + pub async fn initialize_blocks(&self) -> WalletDbResult<()> { + // Initialize blocks database schema + let database_schema = include_str!("../blocks.sql"); + self.database.exec_batch_sql(database_schema)?; + + Ok(()) + } + + /// Reset blocks table in the database. + pub fn reset_blocks(&self) -> WalletDbResult<()> { + info!(target: "blockchain-explorer::blocks::reset_blocks", "Resetting blocks..."); + let query = format!("DELETE FROM {};", BLOCKS_TABLE); + self.database.exec_sql(&query, &[]) + } + + /// Import given block into the database. + pub async fn put_block(&self, block: &BlockRecord) -> Result<()> { + let query = format!( + "INSERT OR REPLACE INTO {} ({}, {}, {}, {}, {}, {}, {}, {}) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8);", + BLOCKS_TABLE, + BLOCKS_COL_HEADER_HASH, + BLOCKS_COL_VERSION, + BLOCKS_COL_PREVIOUS, + BLOCKS_COL_HEIGHT, + BLOCKS_COL_TIMESTAMP, + BLOCKS_COL_NONCE, + BLOCKS_COL_ROOT, + BLOCKS_COL_SIGNATURE + ); + + if let Err(e) = self.database.exec_sql( + &query, + rusqlite::params![ + block.header_hash, + block.version, + block.previous, + block.height, + block.timestamp, + block.nonce, + block.root, + serialize(&block.signature), + ], + ) { + return Err(Error::RusqliteError(format!("[put_block] Block insert failed: {e:?}"))) + }; + + Ok(()) + } + + /// Auxiliary function to parse a `BLOCKS_TABLE` record. + async fn parse_block_record(&self, row: &[Value]) -> Result { + let Value::Text(ref header_hash) = row[0] else { + return Err(Error::ParseFailed("[parse_block_record] Header hash parsing failed")) + }; + let header_hash = header_hash.clone(); + + let Value::Integer(version) = row[1] else { + return Err(Error::ParseFailed("[parse_block_record] Version parsing failed")) + }; + let Ok(version) = u8::try_from(version) else { + return Err(Error::ParseFailed("[parse_block_record] Version parsing failed")) + }; + + let Value::Text(ref previous) = row[2] else { + return Err(Error::ParseFailed("[parse_block_record] Previous parsing failed")) + }; + let previous = previous.clone(); + + let Value::Integer(height) = row[3] else { + return Err(Error::ParseFailed("[parse_block_record] Height parsing failed")) + }; + let Ok(height) = u32::try_from(height) else { + return Err(Error::ParseFailed("[parse_block_record] Height parsing failed")) + }; + + let Value::Integer(timestamp) = row[4] else { + return Err(Error::ParseFailed("[parse_block_record] Timestamp parsing failed")) + }; + let Ok(timestamp) = u64::try_from(timestamp) else { + return Err(Error::ParseFailed("[parse_block_record] Timestamp parsing failed")) + }; + + let Value::Integer(nonce) = row[5] else { + return Err(Error::ParseFailed("[parse_block_record] Nonce parsing failed")) + }; + let Ok(nonce) = u64::try_from(nonce) else { + return Err(Error::ParseFailed("[parse_block_record] Nonce parsing failed")) + }; + + let Value::Text(ref root) = row[6] else { + return Err(Error::ParseFailed("[parse_block_record] Root parsing failed")) + }; + let root = root.clone(); + + let Value::Blob(ref signature_bytes) = row[7] else { + return Err(Error::ParseFailed( + "[parse_block_record] Signature bytes bytes parsing failed", + )) + }; + let signature = deserialize_async(signature_bytes).await?; + + Ok(BlockRecord { + header_hash, + version, + previous, + height, + timestamp, + nonce, + root, + signature, + }) + } + + /// Fetch all known blocks from the database. + pub async fn get_blocks(&self) -> Result> { + let rows = match self.database.query_multiple(BLOCKS_TABLE, &[], &[]) { + Ok(r) => r, + Err(e) => { + return Err(Error::RusqliteError(format!( + "[get_blocks] Blocks retrieval failed: {e:?}" + ))) + } + }; + + let mut blocks = Vec::with_capacity(rows.len()); + for row in rows { + blocks.push(self.parse_block_record(&row).await?); + } + + Ok(blocks) + } + + /// Fetch a block given its header hash. + pub async fn get_block_by_hash(&self, header_hash: &str) -> Result { + let row = match self.database.query_single( + BLOCKS_TABLE, + &[], + convert_named_params! {(BLOCKS_COL_HEADER_HASH, header_hash)}, + ) { + Ok(r) => r, + Err(e) => { + return Err(Error::RusqliteError(format!( + "[get_block_by_hash] Block retrieval failed: {e:?}" + ))) + } + }; + + self.parse_block_record(&row).await + } + + /// Fetch last block from the database. + pub async fn last_block(&self) -> WalletDbResult { + // First we prepare the query + let query = format!( + "SELECT {} FROM {} ORDER BY {} DESC LIMIT 1;", + BLOCKS_COL_HEIGHT, BLOCKS_TABLE, BLOCKS_COL_HEIGHT + ); + let Ok(conn) = self.database.conn.lock() else { + return Err(WalletDbError::FailedToAquireLock) + }; + let Ok(mut stmt) = conn.prepare(&query) else { + return Err(WalletDbError::QueryPreparationFailed) + }; + + // Execute the query using provided params + let Ok(mut rows) = stmt.query([]) else { return Err(WalletDbError::QueryExecutionFailed) }; + + // Check if row exists + let Ok(next) = rows.next() else { return Err(WalletDbError::QueryExecutionFailed) }; + let row = match next { + Some(row_result) => row_result, + None => return Ok(0_u32), + }; + + // Parse returned value + let Ok(value) = row.get(0) else { return Err(WalletDbError::ParseColumnValueError) }; + let Value::Integer(height) = value else { + return Err(WalletDbError::ParseColumnValueError) + }; + let Ok(height) = u32::try_from(height) else { + return Err(WalletDbError::ParseColumnValueError) + }; + + Ok(height) + } +} diff --git a/script/research/blockchain-explorer/src/error.rs b/script/research/blockchain-explorer/src/error.rs new file mode 100644 index 000000000..9cfd198ec --- /dev/null +++ b/script/research/blockchain-explorer/src/error.rs @@ -0,0 +1,45 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2024 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use darkfi::rpc::jsonrpc::{ErrorCode::ServerError, JsonError, JsonResult}; + +/// Custom RPC errors available for blockchain explorer. +/// Please sort them sensefully. +pub enum RpcError { + // Misc errors + PingFailed = -32300, +} + +fn to_tuple(e: RpcError) -> (i32, String) { + let msg = match e { + // Misc errors + RpcError::PingFailed => "Darkfid daemon ping error", + }; + + (e as i32, msg.to_string()) +} + +pub fn server_error(e: RpcError, id: u16, msg: Option<&str>) -> JsonResult { + let (code, default_msg) = to_tuple(e); + + if let Some(message) = msg { + return JsonError::new(ServerError(code), Some(message.to_string()), id).into() + } + + JsonError::new(ServerError(code), Some(default_msg), id).into() +} diff --git a/script/research/blockchain-explorer/src/main.rs b/script/research/blockchain-explorer/src/main.rs index e7de1a486..6c7ebd078 100644 --- a/script/research/blockchain-explorer/src/main.rs +++ b/script/research/blockchain-explorer/src/main.rs @@ -16,451 +16,218 @@ * along with this program. If not, see . */ -use std::{fs::File, io::Write}; +use std::{ + collections::HashSet, + fs, + io::{stdin, stdout, Write}, + sync::Arc, +}; + +use log::{error, info}; +use smol::{lock::Mutex, stream::StreamExt}; +use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; +use url::Url; -use clap::Parser; use darkfi::{ - blockchain::{ - block_store::{Block, BlockDifficulty, BlockRanks, BlockStore}, - contract_store::ContractStore, - header_store::{Header, HeaderHash, HeaderStore}, - tx_store::TxStore, - Blockchain, + async_daemonize, cli_desc, + rpc::{ + client::RpcClient, + server::{listen_and_serve, RequestHandler}, }, - cli_desc, - tx::Transaction, - util::{path::expand_path, time::Timestamp}, - Result, + system::{StoppableTask, StoppableTaskPtr}, + util::path::expand_path, + Error, Result, }; -use darkfi_sdk::{ - blockchain::block_epoch, - crypto::{ContractId, MerkleNode}, - tx::TransactionHash, -}; -use num_bigint::BigUint; +use drk::walletdb::{WalletDb, WalletPtr}; -#[derive(Parser)] -#[command(about = cli_desc!())] +/// Crate errors +mod error; + +/// JSON-RPC requests handler and methods +mod rpc; +use rpc::subscribe_blocks; + +/// Database functionality related to blocks +mod blocks; + +const CONFIG_FILE: &str = "blockchain_explorer_config.toml"; +const CONFIG_FILE_CONTENTS: &str = include_str!("../blockchain_explorer_config.toml"); + +#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] +#[serde(default)] +#[structopt(name = "blockcahin-explorer", about = cli_desc!())] struct Args { - #[arg(short, long, default_value = "../../../contrib/localnet/darkfid-single-node/")] - /// Path containing the node folders - path: String, + #[structopt(short, long)] + /// Configuration file to use + config: Option, - #[arg(short, long, default_values = ["darkfid"])] - /// Node folder name (supports multiple values) - node: Vec, + #[structopt(short, long, default_value = "tcp://127.0.0.1:14567")] + /// JSON-RPC listen URL + rpc_listen: Url, - #[arg(short, long, default_value = "")] - /// Node blockchain folder - blockchain: String, + #[structopt(long, default_value = "~/.local/darkfi/blockchain-explorer/daemon.db")] + /// Path to daemon database + db_path: String, - #[arg(short, long)] - /// Export all contents into a JSON file - export: bool, + #[structopt(long)] + /// Password for the daemon database. + /// If it's not present, daemon will prompt the user for it. + db_pass: Option, + + #[structopt(long)] + /// Reset the databae and start syncing from first block + reset: bool, + + #[structopt(short, long, default_value = "tcp://127.0.0.1:8340")] + /// darkfid JSON-RPC endpoint + endpoint: Url, + + #[structopt(short, long)] + /// Set log file to ouput into + log: Option, + + #[structopt(short, parse(from_occurrences))] + /// Increase verbosity (-vvv supported) + verbose: u8, } -#[derive(Debug)] -struct HeaderInfo { - _hash: HeaderHash, - _version: u8, - _previous: HeaderHash, - _height: u32, - _timestamp: Timestamp, - _nonce: u64, - _root: MerkleNode, +/// Daemon structure +pub struct BlockchainExplorer { + /// Daemon database operations handler + pub database: WalletPtr, + /// JSON-RPC connection tracker + pub rpc_connections: Mutex>, + /// JSON-RPC client to execute requests to darkfid daemon + pub rpc_client: RpcClient, } -impl HeaderInfo { - pub fn new(_hash: HeaderHash, header: &Header) -> HeaderInfo { - HeaderInfo { - _hash, - _version: header.version, - _previous: header.previous, - _height: header.height, - _timestamp: header.timestamp, - _nonce: header.nonce, - _root: header.root, - } - } -} - -#[derive(Debug)] -struct HeaderStoreInfo { - _main: Vec, - _sync: Vec, -} - -impl HeaderStoreInfo { - pub fn new(headerstore: &HeaderStore) -> HeaderStoreInfo { - let mut _main = Vec::new(); - let result = headerstore.get_all(); - match result { - Ok(iter) => { - for (hash, header) in iter.iter() { - _main.push(HeaderInfo::new(*hash, header)); +impl BlockchainExplorer { + async fn new( + db_path: String, + db_pass: Option, + endpoint: Url, + ex: Arc>, + ) -> Result { + // Grab password + let db_pass = match db_pass { + Some(pass) => pass, + None => { + let mut pass = String::new(); + while pass.trim().is_empty() { + info!(target: "blockchain-explorer", "Provide database passsword:"); + stdout().flush()?; + stdin().read_line(&mut pass).unwrap_or(0); } + pass.trim().to_string() } - Err(e) => println!("Error: {:?}", e), + }; + + // Script kiddies protection + if db_pass == "changeme" { + error!(target: "blockchain-explorer", "Please don't use default database password..."); + return Err(Error::ParseFailed("Default database password usage")) } - let mut _sync = Vec::new(); - let result = headerstore.get_all_sync(); - match result { - Ok(iter) => { - for (_, header) in iter.iter() { - _sync.push(HeaderInfo::new(header.hash(), header)); - } + + // Initialize database + let db_path = expand_path(&db_path)?; + if !db_path.exists() { + if let Some(parent) = db_path.parent() { + fs::create_dir_all(parent)?; } - Err(e) => println!("Error: {:?}", e), } - HeaderStoreInfo { _main, _sync } - } -} - -#[derive(Debug)] -struct BlockInfo { - _hash: HeaderHash, - _header: HeaderHash, - _txs: Vec, - _signature: String, -} - -impl BlockInfo { - pub fn new(_hash: HeaderHash, block: &Block) -> BlockInfo { - BlockInfo { - _hash, - _header: block.header, - _txs: block.txs.clone(), - _signature: format!("{:?}", block.signature), - } - } -} - -#[derive(Debug)] -struct OrderInfo { - _height: u32, - _hash: HeaderHash, -} - -impl OrderInfo { - pub fn new(_height: u32, _hash: HeaderHash) -> OrderInfo { - OrderInfo { _height, _hash } - } -} - -#[derive(Debug)] -struct BlockRanksInfo { - _target_rank: BigUint, - _targets_rank: BigUint, - _hash_rank: BigUint, - _hashes_rank: BigUint, -} - -impl BlockRanksInfo { - pub fn new(ranks: &BlockRanks) -> BlockRanksInfo { - BlockRanksInfo { - _target_rank: ranks.target_rank.clone(), - _targets_rank: ranks.targets_rank.clone(), - _hash_rank: ranks.hash_rank.clone(), - _hashes_rank: ranks.hashes_rank.clone(), - } - } -} - -#[derive(Debug)] -struct BlockDifficultyInfo { - _height: u32, - _timestamp: Timestamp, - _difficulty: BigUint, - _cummulative_difficulty: BigUint, - _ranks: BlockRanksInfo, -} - -impl BlockDifficultyInfo { - pub fn new(difficulty: &BlockDifficulty) -> BlockDifficultyInfo { - BlockDifficultyInfo { - _height: difficulty.height, - _timestamp: difficulty.timestamp, - _difficulty: difficulty.difficulty.clone(), - _cummulative_difficulty: difficulty.cummulative_difficulty.clone(), - _ranks: BlockRanksInfo::new(&difficulty.ranks), - } - } -} - -#[derive(Debug)] -struct BlockStoreInfo { - _main: Vec, - _order: Vec, - _difficulty: Vec, -} - -impl BlockStoreInfo { - pub fn new(blockstore: &BlockStore) -> BlockStoreInfo { - let mut _main = Vec::new(); - let result = blockstore.get_all(); - match result { - Ok(iter) => { - for (hash, block) in iter.iter() { - _main.push(BlockInfo::new(*hash, block)); - } + let database = match WalletDb::new(Some(db_path), Some(&db_pass)) { + Ok(w) => w, + Err(e) => { + let err = format!("{e:?}"); + error!(target: "blockchain-explorer", "Error initializing database: {err}"); + return Err(Error::RusqliteError(err)) } - Err(e) => println!("Error: {:?}", e), + }; + + // Initialize rpc client + let rpc_client = RpcClient::new(endpoint, ex).await?; + + let explorer = Self { database, rpc_connections: Mutex::new(HashSet::new()), rpc_client }; + + // Initialize all the database tables + if let Err(e) = explorer.initialize_blocks().await { + let err = format!("{e:?}"); + error!(target: "blockchain-explorer", "Error initializing database tables: {err}"); + return Err(Error::RusqliteError(err)) } - let mut _order = Vec::new(); - let result = blockstore.get_all_order(); - match result { - Ok(iter) => { - for (height, hash) in iter.iter() { - _order.push(OrderInfo::new(*height, *hash)); - } + // TODO: map transaction structure to their corresponding files with sql table and retrieval methods + + Ok(explorer) + } +} + +async_daemonize!(realmain); +async fn realmain(args: Args, ex: Arc>) -> Result<()> { + info!(target: "blockchain-explorer", "Initializing DarkFi blockchain explorer node..."); + let explorer = + BlockchainExplorer::new(args.db_path, args.db_pass, args.endpoint.clone(), ex.clone()) + .await?; + let explorer = Arc::new(explorer); + info!(target: "blockchain-explorer", "Node initialized successfully!"); + + // JSON-RPC server + info!(target: "blockchain-explorer", "Starting JSON-RPC server"); + // Here we create a task variable so we can manually close the + // task later. + let rpc_task = StoppableTask::new(); + let explorer_ = explorer.clone(); + rpc_task.clone().start( + listen_and_serve(args.rpc_listen, explorer.clone(), None, ex.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::RpcServerStopped) => explorer_.stop_connections().await, + Err(e) => error!(target: "blockchain-explorer", "Failed starting sync JSON-RPC server: {}", e), } - Err(e) => println!("Error: {:?}", e), - } - let mut _difficulty = Vec::new(); - let result = blockstore.get_all_difficulty(); - match result { - Ok(iter) => { - for (_, difficulty) in iter.iter() { - _difficulty.push(BlockDifficultyInfo::new(difficulty)); - } - } - Err(e) => println!("Error: {:?}", e), - } - BlockStoreInfo { _main, _order, _difficulty } + }, + Error::RpcServerStopped, + ex.clone(), + ); + + // Sync blocks + info!(target: "blockchain-explorer", "Syncing blocks from darkfid..."); + if let Err(e) = explorer.sync_blocks(args.reset).await { + let err = format!("{e:?}"); + error!(target: "blockchain-explorer", "Error syncing blocks: {err}"); + return Err(Error::RusqliteError(err)) } -} -#[derive(Debug)] -struct TxInfo { - _hash: TransactionHash, - _payload: Transaction, -} - -impl TxInfo { - pub fn new(_hash: TransactionHash, tx: &Transaction) -> TxInfo { - TxInfo { _hash, _payload: tx.clone() } - } -} - -#[derive(Debug)] -struct TxLocationInfo { - _hash: TransactionHash, - _block_height: u32, - _index: u16, -} - -impl TxLocationInfo { - pub fn new(_hash: TransactionHash, _block_height: u32, _index: u16) -> TxLocationInfo { - TxLocationInfo { _hash, _block_height, _index } - } -} - -#[derive(Debug)] -struct PendingOrderInfo { - _order: u64, - _hash: TransactionHash, -} - -impl PendingOrderInfo { - pub fn new(_order: u64, _hash: TransactionHash) -> PendingOrderInfo { - PendingOrderInfo { _order, _hash } - } -} - -#[derive(Debug)] -struct TxStoreInfo { - _main: Vec, - _location: Vec, - _pending: Vec, - _pending_order: Vec, -} - -impl TxStoreInfo { - pub fn new(txstore: &TxStore) -> TxStoreInfo { - let mut _main = Vec::new(); - let result = txstore.get_all(); - match result { - Ok(iter) => { - for (hash, tx) in iter.iter() { - _main.push(TxInfo::new(*hash, tx)); - } - } - Err(e) => println!("Error: {:?}", e), + info!(target: "blockchain-explorer", "Subscribing to new blocks..."); + let (subscriber_task, listener_task) = match subscribe_blocks( + explorer.clone(), + args.endpoint, + ex.clone(), + ) + .await + { + Ok(pair) => pair, + Err(e) => { + let err = format!("{e:?}"); + error!(target: "blockchain-explorer", "Error while setting up blocks subscriber: {err}"); + return Err(Error::RusqliteError(err)) } - let mut _location = Vec::new(); - let result = txstore.get_all_location(); - match result { - Ok(iter) => { - for (hash, location) in iter.iter() { - _location.push(TxLocationInfo::new(*hash, location.0, location.1)); - } - } - Err(e) => println!("Error: {:?}", e), - } - let mut _pending = Vec::new(); - let result = txstore.get_all_pending(); - match result { - Ok(iter) => { - for (hash, tx) in iter.iter() { - _pending.push(TxInfo::new(*hash, tx)); - } - } - Err(e) => println!("Error: {:?}", e), - } - let mut _pending_order = Vec::new(); - let result = txstore.get_all_pending_order(); - match result { - Ok(iter) => { - for (order, hash) in iter.iter() { - _pending_order.push(PendingOrderInfo::new(*order, *hash)); - } - } - Err(e) => println!("Error: {:?}", e), - } - TxStoreInfo { _main, _location, _pending, _pending_order } - } -} + }; -#[derive(Debug)] -struct ContractStateInfo { - _id: ContractId, - _state_hashes: Vec, -} + // Signal handling for graceful termination. + let (signals_handler, signals_task) = SignalHandler::new(ex)?; + signals_handler.wait_termination(signals_task).await?; + info!(target: "blockchain-explorer", "Caught termination signal, cleaning up and exiting..."); -impl ContractStateInfo { - pub fn new(_id: ContractId, state_hashes: &[blake3::Hash]) -> ContractStateInfo { - ContractStateInfo { _id, _state_hashes: state_hashes.to_vec() } - } -} + info!(target: "blockchain-explorer", "Stopping JSON-RPC server..."); + rpc_task.stop().await; -#[derive(Debug)] -struct WasmInfo { - _id: ContractId, - _bincode_hash: blake3::Hash, -} + info!(target: "blockchain-explorer", "Stopping darkfid listener..."); + listener_task.stop().await; -impl WasmInfo { - pub fn new(_id: ContractId, bincode: &[u8]) -> WasmInfo { - let _bincode_hash = blake3::hash(bincode); - WasmInfo { _id, _bincode_hash } - } -} + info!(target: "blockchain-explorer", "Stopping darkfid subscriber..."); + subscriber_task.stop().await; -#[derive(Debug)] -struct ContractStoreInfo { - _state: Vec, - _wasm: Vec, -} - -impl ContractStoreInfo { - pub fn new(contractsstore: &ContractStore) -> ContractStoreInfo { - let mut _state = Vec::new(); - let result = contractsstore.get_all_states(); - match result { - Ok(iter) => { - for (id, state_hash) in iter.iter() { - _state.push(ContractStateInfo::new(*id, state_hash)); - } - } - Err(e) => println!("Error: {:?}", e), - } - let mut _wasm = Vec::new(); - let result = contractsstore.get_all_wasm(); - match result { - Ok(iter) => { - for (id, bincode) in iter.iter() { - _wasm.push(WasmInfo::new(*id, bincode)); - } - } - Err(e) => println!("Error: {:?}", e), - } - ContractStoreInfo { _state, _wasm } - } -} -#[derive(Debug)] -struct BlockchainInfo { - _headers: HeaderStoreInfo, - _blocks: BlockStoreInfo, - _transactions: TxStoreInfo, - _contracts: ContractStoreInfo, -} - -impl BlockchainInfo { - pub fn new(blockchain: &Blockchain) -> BlockchainInfo { - BlockchainInfo { - _headers: HeaderStoreInfo::new(&blockchain.headers), - _blocks: BlockStoreInfo::new(&blockchain.blocks), - _transactions: TxStoreInfo::new(&blockchain.transactions), - _contracts: ContractStoreInfo::new(&blockchain.contracts), - } - } -} - -fn statistics(folder: &str, node: &str, blockchain: &str) -> Result<()> { - println!("Retrieving blockchain statistics for {node}..."); - - // Node folder - let folder = folder.to_owned() + node; - - // Initialize or load sled database - let path = folder.to_owned() + blockchain; - let db_path = expand_path(&path).unwrap(); - let sled_db = sled::open(db_path)?; - - // Retrieve statistics - let blockchain = Blockchain::new(&sled_db)?; - let (height, block) = blockchain.last()?; - let epoch = block_epoch(height); - let blocks = blockchain.len(); - let txs = blockchain.txs_len(); - drop(sled_db); - - // Print statistics - println!("Latest height: {height}"); - println!("Epoch: {epoch}"); - println!("Latest block: {block}"); - println!("Total blocks: {blocks}"); - println!("Total transactions: {txs}"); - - Ok(()) -} - -fn export(folder: &str, node: &str, blockchain: &str) -> Result<()> { - println!("Exporting data for {node}..."); - - // Node folder - let folder = folder.to_owned() + node; - - // Initialize or load sled database - let path = folder.to_owned() + blockchain; - let db_path = expand_path(&path).unwrap(); - let sled_db = sled::open(db_path)?; - - // Data export - let blockchain = Blockchain::new(&sled_db)?; - let info = BlockchainInfo::new(&blockchain); - let info_string = format!("{:#?}", info); - let file_name = node.to_owned() + "_db"; - let mut file = File::create(file_name.clone())?; - file.write_all(info_string.as_bytes())?; - drop(sled_db); - println!("Data exported to file: {file_name}"); - - Ok(()) -} - -fn main() -> Result<()> { - // Parse arguments - let args = Args::parse(); - println!("Node folder path: {}", args.path); - // Export data for each node - for node in args.node { - if args.export { - export(&args.path, &node, &args.blockchain)?; - continue - } - statistics(&args.path, &node, &args.blockchain)?; - } + info!(target: "blockchain-explorer", "Stopping JSON-RPC client..."); + explorer.rpc_client.stop().await; Ok(()) } diff --git a/script/research/blockchain-explorer/src/main_old.rs b/script/research/blockchain-explorer/src/main_old.rs new file mode 100644 index 000000000..e7de1a486 --- /dev/null +++ b/script/research/blockchain-explorer/src/main_old.rs @@ -0,0 +1,466 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2024 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::{fs::File, io::Write}; + +use clap::Parser; +use darkfi::{ + blockchain::{ + block_store::{Block, BlockDifficulty, BlockRanks, BlockStore}, + contract_store::ContractStore, + header_store::{Header, HeaderHash, HeaderStore}, + tx_store::TxStore, + Blockchain, + }, + cli_desc, + tx::Transaction, + util::{path::expand_path, time::Timestamp}, + Result, +}; +use darkfi_sdk::{ + blockchain::block_epoch, + crypto::{ContractId, MerkleNode}, + tx::TransactionHash, +}; +use num_bigint::BigUint; + +#[derive(Parser)] +#[command(about = cli_desc!())] +struct Args { + #[arg(short, long, default_value = "../../../contrib/localnet/darkfid-single-node/")] + /// Path containing the node folders + path: String, + + #[arg(short, long, default_values = ["darkfid"])] + /// Node folder name (supports multiple values) + node: Vec, + + #[arg(short, long, default_value = "")] + /// Node blockchain folder + blockchain: String, + + #[arg(short, long)] + /// Export all contents into a JSON file + export: bool, +} + +#[derive(Debug)] +struct HeaderInfo { + _hash: HeaderHash, + _version: u8, + _previous: HeaderHash, + _height: u32, + _timestamp: Timestamp, + _nonce: u64, + _root: MerkleNode, +} + +impl HeaderInfo { + pub fn new(_hash: HeaderHash, header: &Header) -> HeaderInfo { + HeaderInfo { + _hash, + _version: header.version, + _previous: header.previous, + _height: header.height, + _timestamp: header.timestamp, + _nonce: header.nonce, + _root: header.root, + } + } +} + +#[derive(Debug)] +struct HeaderStoreInfo { + _main: Vec, + _sync: Vec, +} + +impl HeaderStoreInfo { + pub fn new(headerstore: &HeaderStore) -> HeaderStoreInfo { + let mut _main = Vec::new(); + let result = headerstore.get_all(); + match result { + Ok(iter) => { + for (hash, header) in iter.iter() { + _main.push(HeaderInfo::new(*hash, header)); + } + } + Err(e) => println!("Error: {:?}", e), + } + let mut _sync = Vec::new(); + let result = headerstore.get_all_sync(); + match result { + Ok(iter) => { + for (_, header) in iter.iter() { + _sync.push(HeaderInfo::new(header.hash(), header)); + } + } + Err(e) => println!("Error: {:?}", e), + } + HeaderStoreInfo { _main, _sync } + } +} + +#[derive(Debug)] +struct BlockInfo { + _hash: HeaderHash, + _header: HeaderHash, + _txs: Vec, + _signature: String, +} + +impl BlockInfo { + pub fn new(_hash: HeaderHash, block: &Block) -> BlockInfo { + BlockInfo { + _hash, + _header: block.header, + _txs: block.txs.clone(), + _signature: format!("{:?}", block.signature), + } + } +} + +#[derive(Debug)] +struct OrderInfo { + _height: u32, + _hash: HeaderHash, +} + +impl OrderInfo { + pub fn new(_height: u32, _hash: HeaderHash) -> OrderInfo { + OrderInfo { _height, _hash } + } +} + +#[derive(Debug)] +struct BlockRanksInfo { + _target_rank: BigUint, + _targets_rank: BigUint, + _hash_rank: BigUint, + _hashes_rank: BigUint, +} + +impl BlockRanksInfo { + pub fn new(ranks: &BlockRanks) -> BlockRanksInfo { + BlockRanksInfo { + _target_rank: ranks.target_rank.clone(), + _targets_rank: ranks.targets_rank.clone(), + _hash_rank: ranks.hash_rank.clone(), + _hashes_rank: ranks.hashes_rank.clone(), + } + } +} + +#[derive(Debug)] +struct BlockDifficultyInfo { + _height: u32, + _timestamp: Timestamp, + _difficulty: BigUint, + _cummulative_difficulty: BigUint, + _ranks: BlockRanksInfo, +} + +impl BlockDifficultyInfo { + pub fn new(difficulty: &BlockDifficulty) -> BlockDifficultyInfo { + BlockDifficultyInfo { + _height: difficulty.height, + _timestamp: difficulty.timestamp, + _difficulty: difficulty.difficulty.clone(), + _cummulative_difficulty: difficulty.cummulative_difficulty.clone(), + _ranks: BlockRanksInfo::new(&difficulty.ranks), + } + } +} + +#[derive(Debug)] +struct BlockStoreInfo { + _main: Vec, + _order: Vec, + _difficulty: Vec, +} + +impl BlockStoreInfo { + pub fn new(blockstore: &BlockStore) -> BlockStoreInfo { + let mut _main = Vec::new(); + let result = blockstore.get_all(); + match result { + Ok(iter) => { + for (hash, block) in iter.iter() { + _main.push(BlockInfo::new(*hash, block)); + } + } + Err(e) => println!("Error: {:?}", e), + } + let mut _order = Vec::new(); + let result = blockstore.get_all_order(); + match result { + Ok(iter) => { + for (height, hash) in iter.iter() { + _order.push(OrderInfo::new(*height, *hash)); + } + } + Err(e) => println!("Error: {:?}", e), + } + let mut _difficulty = Vec::new(); + let result = blockstore.get_all_difficulty(); + match result { + Ok(iter) => { + for (_, difficulty) in iter.iter() { + _difficulty.push(BlockDifficultyInfo::new(difficulty)); + } + } + Err(e) => println!("Error: {:?}", e), + } + BlockStoreInfo { _main, _order, _difficulty } + } +} + +#[derive(Debug)] +struct TxInfo { + _hash: TransactionHash, + _payload: Transaction, +} + +impl TxInfo { + pub fn new(_hash: TransactionHash, tx: &Transaction) -> TxInfo { + TxInfo { _hash, _payload: tx.clone() } + } +} + +#[derive(Debug)] +struct TxLocationInfo { + _hash: TransactionHash, + _block_height: u32, + _index: u16, +} + +impl TxLocationInfo { + pub fn new(_hash: TransactionHash, _block_height: u32, _index: u16) -> TxLocationInfo { + TxLocationInfo { _hash, _block_height, _index } + } +} + +#[derive(Debug)] +struct PendingOrderInfo { + _order: u64, + _hash: TransactionHash, +} + +impl PendingOrderInfo { + pub fn new(_order: u64, _hash: TransactionHash) -> PendingOrderInfo { + PendingOrderInfo { _order, _hash } + } +} + +#[derive(Debug)] +struct TxStoreInfo { + _main: Vec, + _location: Vec, + _pending: Vec, + _pending_order: Vec, +} + +impl TxStoreInfo { + pub fn new(txstore: &TxStore) -> TxStoreInfo { + let mut _main = Vec::new(); + let result = txstore.get_all(); + match result { + Ok(iter) => { + for (hash, tx) in iter.iter() { + _main.push(TxInfo::new(*hash, tx)); + } + } + Err(e) => println!("Error: {:?}", e), + } + let mut _location = Vec::new(); + let result = txstore.get_all_location(); + match result { + Ok(iter) => { + for (hash, location) in iter.iter() { + _location.push(TxLocationInfo::new(*hash, location.0, location.1)); + } + } + Err(e) => println!("Error: {:?}", e), + } + let mut _pending = Vec::new(); + let result = txstore.get_all_pending(); + match result { + Ok(iter) => { + for (hash, tx) in iter.iter() { + _pending.push(TxInfo::new(*hash, tx)); + } + } + Err(e) => println!("Error: {:?}", e), + } + let mut _pending_order = Vec::new(); + let result = txstore.get_all_pending_order(); + match result { + Ok(iter) => { + for (order, hash) in iter.iter() { + _pending_order.push(PendingOrderInfo::new(*order, *hash)); + } + } + Err(e) => println!("Error: {:?}", e), + } + TxStoreInfo { _main, _location, _pending, _pending_order } + } +} + +#[derive(Debug)] +struct ContractStateInfo { + _id: ContractId, + _state_hashes: Vec, +} + +impl ContractStateInfo { + pub fn new(_id: ContractId, state_hashes: &[blake3::Hash]) -> ContractStateInfo { + ContractStateInfo { _id, _state_hashes: state_hashes.to_vec() } + } +} + +#[derive(Debug)] +struct WasmInfo { + _id: ContractId, + _bincode_hash: blake3::Hash, +} + +impl WasmInfo { + pub fn new(_id: ContractId, bincode: &[u8]) -> WasmInfo { + let _bincode_hash = blake3::hash(bincode); + WasmInfo { _id, _bincode_hash } + } +} + +#[derive(Debug)] +struct ContractStoreInfo { + _state: Vec, + _wasm: Vec, +} + +impl ContractStoreInfo { + pub fn new(contractsstore: &ContractStore) -> ContractStoreInfo { + let mut _state = Vec::new(); + let result = contractsstore.get_all_states(); + match result { + Ok(iter) => { + for (id, state_hash) in iter.iter() { + _state.push(ContractStateInfo::new(*id, state_hash)); + } + } + Err(e) => println!("Error: {:?}", e), + } + let mut _wasm = Vec::new(); + let result = contractsstore.get_all_wasm(); + match result { + Ok(iter) => { + for (id, bincode) in iter.iter() { + _wasm.push(WasmInfo::new(*id, bincode)); + } + } + Err(e) => println!("Error: {:?}", e), + } + ContractStoreInfo { _state, _wasm } + } +} +#[derive(Debug)] +struct BlockchainInfo { + _headers: HeaderStoreInfo, + _blocks: BlockStoreInfo, + _transactions: TxStoreInfo, + _contracts: ContractStoreInfo, +} + +impl BlockchainInfo { + pub fn new(blockchain: &Blockchain) -> BlockchainInfo { + BlockchainInfo { + _headers: HeaderStoreInfo::new(&blockchain.headers), + _blocks: BlockStoreInfo::new(&blockchain.blocks), + _transactions: TxStoreInfo::new(&blockchain.transactions), + _contracts: ContractStoreInfo::new(&blockchain.contracts), + } + } +} + +fn statistics(folder: &str, node: &str, blockchain: &str) -> Result<()> { + println!("Retrieving blockchain statistics for {node}..."); + + // Node folder + let folder = folder.to_owned() + node; + + // Initialize or load sled database + let path = folder.to_owned() + blockchain; + let db_path = expand_path(&path).unwrap(); + let sled_db = sled::open(db_path)?; + + // Retrieve statistics + let blockchain = Blockchain::new(&sled_db)?; + let (height, block) = blockchain.last()?; + let epoch = block_epoch(height); + let blocks = blockchain.len(); + let txs = blockchain.txs_len(); + drop(sled_db); + + // Print statistics + println!("Latest height: {height}"); + println!("Epoch: {epoch}"); + println!("Latest block: {block}"); + println!("Total blocks: {blocks}"); + println!("Total transactions: {txs}"); + + Ok(()) +} + +fn export(folder: &str, node: &str, blockchain: &str) -> Result<()> { + println!("Exporting data for {node}..."); + + // Node folder + let folder = folder.to_owned() + node; + + // Initialize or load sled database + let path = folder.to_owned() + blockchain; + let db_path = expand_path(&path).unwrap(); + let sled_db = sled::open(db_path)?; + + // Data export + let blockchain = Blockchain::new(&sled_db)?; + let info = BlockchainInfo::new(&blockchain); + let info_string = format!("{:#?}", info); + let file_name = node.to_owned() + "_db"; + let mut file = File::create(file_name.clone())?; + file.write_all(info_string.as_bytes())?; + drop(sled_db); + println!("Data exported to file: {file_name}"); + + Ok(()) +} + +fn main() -> Result<()> { + // Parse arguments + let args = Args::parse(); + println!("Node folder path: {}", args.path); + // Export data for each node + for node in args.node { + if args.export { + export(&args.path, &node, &args.blockchain)?; + continue + } + statistics(&args.path, &node, &args.blockchain)?; + } + + Ok(()) +} diff --git a/script/research/blockchain-explorer/src/rpc.rs b/script/research/blockchain-explorer/src/rpc.rs new file mode 100644 index 000000000..7791ca9d8 --- /dev/null +++ b/script/research/blockchain-explorer/src/rpc.rs @@ -0,0 +1,334 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2024 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::{collections::HashSet, sync::Arc, time::Instant}; + +use async_trait::async_trait; +use log::{debug, error, info, warn}; +use smol::lock::MutexGuard; +use tinyjson::JsonValue; +use url::Url; + +use darkfi::{ + blockchain::BlockInfo, + rpc::{ + client::RpcClient, + jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult}, + server::RequestHandler, + }, + system::{Publisher, StoppableTask, StoppableTaskPtr}, + util::encoding::base64, + Error, Result, +}; +use darkfi_serial::deserialize_async; +use drk::error::{WalletDbError, WalletDbResult}; + +use crate::{ + blocks::BlockRecord, + error::{server_error, RpcError}, + BlockchainExplorer, +}; + +#[async_trait] +impl RequestHandler for BlockchainExplorer { + async fn handle_request(&self, req: JsonRequest) -> JsonResult { + debug!(target: "blockchain-explorer::rpc", "--> {}", req.stringify().unwrap()); + + match req.method.as_str() { + // ===================== + // Miscellaneous methods + // ===================== + "ping" => self.pong(req.id, req.params).await, + "ping_darkfid" => self.ping_darkfid(req.id, req.params).await, + + // TODO: add statistics retrieval method + // TODO: add last n blocks retrieval method + // TODO: add block retrieval method by its header hash + // TODO: add transactions retrieval method by their block hash + // TODO: add transaction retrieval method by its hash + // TODO: add any other usefull method + + // ============== + // Invalid method + // ============== + _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), + } + } + + async fn connections_mut(&self) -> MutexGuard<'_, HashSet> { + self.rpc_connections.lock().await + } +} + +impl BlockchainExplorer { + // RPCAPI: + // Pings configured darkfid daemon for liveness. + // Returns `true` on success. + // + // --> {"jsonrpc": "2.0", "method": "ping_darkfid", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "result": "true", "id": 1} + async fn ping_darkfid(&self, id: u16, _params: JsonValue) -> JsonResult { + debug!(target: "blockchain-explorer::rpc::ping_darkfid", "Pinging darkfid daemon..."); + if let Err(e) = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await { + error!(target: "blockchain-explorer::rpc::ping_darkfid", "Failed to ping darkfid daemon: {}", e); + return server_error(RpcError::PingFailed, id, None) + } + JsonResponse::new(JsonValue::Boolean(true), id).into() + } + + /// Auxiliary function to execute a request towards the configured darkfid daemon JSON-RPC endpoint. + pub async fn darkfid_daemon_request( + &self, + method: &str, + params: &JsonValue, + ) -> Result { + debug!(target: "blockchain-explorer::rpc::darkfid_daemon_request", "Executing request {} with params: {:?}", method, params); + let latency = Instant::now(); + let req = JsonRequest::new(method, params.clone()); + let rep = self.rpc_client.request(req).await?; + let latency = latency.elapsed(); + debug!(target: "blockchain-explorer::rpc::darkfid_daemon_request", "Got reply: {:?}", rep); + debug!(target: "blockchain-explorer::rpc::darkfid_daemon_request", "Latency: {:?}", latency); + Ok(rep) + } + + // Queries darkfid for a block with given height. + async fn get_block_by_height(&self, height: u32) -> Result { + let params = self + .darkfid_daemon_request( + "blockchain.get_block", + &JsonValue::Array(vec![JsonValue::String(height.to_string())]), + ) + .await?; + let param = params.get::().unwrap(); + let bytes = base64::decode(param).unwrap(); + let block = deserialize_async(&bytes).await?; + Ok(block) + } + + /// Syncs the blockchain starting from the last synced block. + /// If reset flag is provided, all tables are reset, and start scanning from beginning. + pub async fn sync_blocks(&self, reset: bool) -> WalletDbResult<()> { + // Grab last scanned block height + let mut height = self.last_block().await?; + // If last scanned block is genesis (0) or reset flag + // has been provided we reset, otherwise continue with + // the next block height + if height == 0 || reset { + self.reset_blocks()?; + height = 0; + } else { + height += 1; + }; + + loop { + let rep = match self + .darkfid_daemon_request("blockchain.last_known_block", &JsonValue::Array(vec![])) + .await + { + Ok(r) => r, + Err(e) => { + error!(target: "blockchain-explorer::rpc::sync_blocks", "[sync_blocks] RPC client request failed: {e:?}"); + return Err(WalletDbError::GenericError) + } + }; + let last = *rep.get::().unwrap() as u32; + + info!(target: "blockchain-explorer::rpc::sync_blocks", "Requested to scan from block number: {height}"); + info!(target: "blockchain-explorer::rpc::sync_blocks", "Last known block number reported by darkfid: {last}"); + + // Already scanned last known block + if height > last { + return Ok(()) + } + + while height <= last { + info!(target: "blockchain-explorer::rpc::sync_blocks", "Requesting block {height}... "); + + let block = match self.get_block_by_height(height).await { + Ok(r) => r, + Err(e) => { + error!(target: "blockchain-explorer::rpc::sync_blocks", "[sync_blocks] RPC client request failed: {e:?}"); + return Err(WalletDbError::GenericError) + } + }; + + let block = BlockRecord { + header_hash: block.hash().to_string(), + version: block.header.version, + previous: block.header.previous.to_string(), + height: block.header.height, + timestamp: block.header.timestamp.inner(), + nonce: block.header.nonce, + root: block.header.root.to_string(), + signature: block.signature, + }; + if let Err(e) = self.put_block(&block).await { + error!(target: "blockchain-explorer::rpc::sync_blocks", "[sync_blocks] Scan block failed: {e:?}"); + return Err(WalletDbError::GenericError) + }; + + height += 1; + } + } + } +} + +/// Subscribes to darkfid's JSON-RPC notification endpoint that serves +/// new finalized blocks. Upon receiving them, store them to the database. +pub async fn subscribe_blocks( + explorer: Arc, + endpoint: Url, + ex: Arc>, +) -> Result<(StoppableTaskPtr, StoppableTaskPtr)> { + let rep = explorer + .darkfid_daemon_request("blockchain.last_known_block", &JsonValue::Array(vec![])) + .await?; + let last_known = *rep.get::().unwrap() as u32; + let last_scanned = match explorer.last_block().await { + Ok(l) => l, + Err(e) => { + return Err(Error::RusqliteError(format!( + "[subscribe_blocks] Retrieving last scanned block failed: {e:?}" + ))) + } + }; + + if last_known != last_scanned { + warn!(target: "blockchain-explorer::rpc::subscribe_blocks", "Warning: Last scanned block is not the last known block."); + warn!(target: "blockchain-explorer::rpc::subscribe_blocks", "You should first fully scan the blockchain, and then subscribe"); + return Err(Error::RusqliteError( + "[subscribe_blocks] Blockchain not fully scanned".to_string(), + )) + } + + info!(target: "blockchain-explorer::rpc::subscribe_blocks", "Subscribing to receive notifications of incoming blocks"); + let publisher = Publisher::new(); + let subscription = publisher.clone().subscribe().await; + let _ex = ex.clone(); + let subscriber_task = StoppableTask::new(); + subscriber_task.clone().start( + // Weird hack to prevent lifetimes hell + async move { + let ex = _ex.clone(); + let rpc_client = RpcClient::new(endpoint, ex).await?; + let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![])); + rpc_client.subscribe(req, publisher).await + }, + |res| async move { + match res { + Ok(()) => { /* Do nothing */ } + Err(e) => error!(target: "blockchain-explorer::rpc::subscribe_blocks", "[subscribe_blocks] JSON-RPC server error: {e:?}"), + } + }, + Error::RpcServerStopped, + ex.clone(), + ); + info!(target: "blockchain-explorer::rpc::subscribe_blocks", "Detached subscription to background"); + info!(target: "blockchain-explorer::rpc::subscribe_blocks", "All is good. Waiting for block notifications..."); + + let listener_task = StoppableTask::new(); + listener_task.clone().start( + // Weird hack to prevent lifetimes hell + async move { + loop { + match subscription.receive().await { + JsonResult::Notification(n) => { + info!(target: "blockchain-explorer::rpc::subscribe_blocks", "Got Block notification from darkfid subscription"); + if n.method != "blockchain.subscribe_blocks" { + return Err(Error::UnexpectedJsonRpc(format!( + "Got foreign notification from darkfid: {}", + n.method + ))) + } + + // Verify parameters + if !n.params.is_array() { + return Err(Error::UnexpectedJsonRpc( + "Received notification params are not an array".to_string(), + )) + } + let params = n.params.get::>().unwrap(); + if params.is_empty() { + return Err(Error::UnexpectedJsonRpc( + "Notification parameters are empty".to_string(), + )) + } + + for param in params { + let param = param.get::().unwrap(); + let bytes = base64::decode(param).unwrap(); + + let block_data: BlockInfo = match deserialize_async(&bytes).await { + Ok(b) => b, + Err(e) => { + return Err(Error::UnexpectedJsonRpc(format!( + "[subscribe_blocks] Deserializing block failed: {e:?}" + ))) + }, + }; + let header_hash = block_data.hash().to_string(); + info!(target: "blockchain-explorer::rpc::subscribe_blocks", "======================================="); + info!(target: "blockchain-explorer::rpc::subscribe_blocks", "Block header: {header_hash}"); + info!(target: "blockchain-explorer::rpc::subscribe_blocks", "======================================="); + + info!(target: "blockchain-explorer::rpc::subscribe_blocks", "Deserialized successfully. Storring block..."); + let block = BlockRecord { + header_hash, + version: block_data.header.version, + previous: block_data.header.previous.to_string(), + height: block_data.header.height, + timestamp: block_data.header.timestamp.inner(), + nonce: block_data.header.nonce, + root: block_data.header.root.to_string(), + signature: block_data.signature, + }; + if let Err(e) = explorer.put_block(&block).await { + return Err(Error::RusqliteError(format!( + "[subscribe_blocks] Scanning block failed: {e:?}" + ))) + } + } + } + + JsonResult::Error(e) => { + // Some error happened in the transmission + return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))) + } + + x => { + // And this is weird + return Err(Error::UnexpectedJsonRpc(format!( + "Got unexpected data from JSON-RPC: {x:?}" + ))) + } + } + }; + }, + |res| async move { + match res { + Ok(()) => { /* Do nothing */ } + Err(e) => error!(target: "blockchain-explorer::rpc::subscribe_blocks", "[subscribe_blocks] JSON-RPC server error: {e:?}"), + } + }, + Error::RpcServerStopped, + ex, + ); + + Ok((subscriber_task, listener_task)) +}