From 6a9356b55201bf371df6386c368b1e3e663a3fcb Mon Sep 17 00:00:00 2001 From: Boog900 Date: Thu, 6 Mar 2025 16:17:09 +0000 Subject: [PATCH] consensus: Batch get outputs (#382) * add specific method for context * add new statemachine for tx verification * fix consensus crates build * working builds * fix CI * add docs * fix CI * fix docs * fix clippy * cleanup * add docs to `blockchain_context` * fix doc tests * add output cache * new monero-serai * todo * todo * Revert "new monero-serai" This reverts commit fe3f6acc676fe59e794d5f92f07f76445db35199. * use indexmap to request outputs * clean up * fix typos * fix CI * fix cargo hack * update doc comment * fix non-rct miner tx error * Apply suggestions from code review Co-authored-by: hinto-janai * Update storage/database/src/backend/redb/env.rs Co-authored-by: hinto-janai --------- Co-authored-by: hinto-janai --- Cargo.lock | 5 + .../src/blockchain/manager/handler.rs | 11 +- binaries/cuprated/src/main.rs | 4 +- .../cuprated/src/rpc/request/blockchain.rs | 6 +- binaries/cuprated/src/txpool/incoming_tx.rs | 1 + consensus/Cargo.toml | 1 + consensus/rules/Cargo.toml | 1 + .../rules/src/transactions/contextual_data.rs | 8 +- consensus/src/block.rs | 16 ++- consensus/src/block/batch_prepare.rs | 41 +++++- consensus/src/transactions.rs | 32 +++-- consensus/src/transactions/contextual_data.rs | 100 +++++++++----- consensus/tests/verify_correct_txs.rs | 12 +- storage/blockchain/Cargo.toml | 1 + storage/blockchain/src/service/read.rs | 60 ++++++--- storage/blockchain/src/service/tests.rs | 19 ++- storage/database/src/backend/heed/env.rs | 2 +- storage/database/src/backend/redb/database.rs | 2 - storage/database/src/backend/redb/env.rs | 3 +- types/Cargo.toml | 3 +- types/src/blockchain.rs | 12 +- types/src/lib.rs | 4 +- types/src/output_cache.rs | 127 ++++++++++++++++++ 23 files changed, 368 insertions(+), 103 deletions(-) create mode 100644 types/src/output_cache.rs diff --git a/Cargo.lock b/Cargo.lock index d14afab..3ba27d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -609,6 +609,7 @@ dependencies = [ "curve25519-dalek", "hex", "hex-literal", + "indexmap", "monero-serai", "pretty_assertions", "proptest", @@ -635,6 +636,7 @@ dependencies = [ "futures", "hex", "hex-literal", + "indexmap", "monero-serai", "proptest", "proptest-derive", @@ -682,6 +684,7 @@ dependencies = [ "curve25519-dalek", "hex", "hex-literal", + "indexmap", "monero-serai", "proptest", "proptest-derive", @@ -1008,6 +1011,7 @@ dependencies = [ "curve25519-dalek", "hex", "hex-literal", + "indexmap", "monero-serai", "pretty_assertions", "proptest", @@ -1835,6 +1839,7 @@ checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", "hashbrown 0.15.2", + "rayon", ] [[package]] diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index c73d86f..a8c512a 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -166,9 +166,12 @@ impl super::BlockchainManager { /// This function will panic if any internal service returns an unexpected error that we cannot /// recover from or if the incoming batch contains no blocks. async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) { - let Ok(prepped_blocks) = - batch_prepare_main_chain_blocks(batch.blocks, &mut self.blockchain_context_service) - .await + let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks( + batch.blocks, + &mut self.blockchain_context_service, + self.blockchain_read_handle.clone(), + ) + .await else { batch.peer_handle.ban_peer(LONG_BAN); self.stop_current_block_downloader.notify_one(); @@ -181,6 +184,7 @@ impl super::BlockchainManager { txs, &mut self.blockchain_context_service, self.blockchain_read_handle.clone(), + Some(&mut output_cache), ) .await else { @@ -404,6 +408,7 @@ impl super::BlockchainManager { prepped_txs, &mut self.blockchain_context_service, self.blockchain_read_handle.clone(), + None, ) .await?; diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 21eb07d..ced8408 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -16,8 +16,8 @@ reason = "TODO: remove after v1.0.0" )] -use std::mem; -use std::sync::Arc; +use std::{mem, sync::Arc}; + use tokio::sync::mpsc; use tower::{Service, ServiceExt}; use tracing::level_filters::LevelFilter; diff --git a/binaries/cuprated/src/rpc/request/blockchain.rs b/binaries/cuprated/src/rpc/request/blockchain.rs index 97c7f48..f3c19c6 100644 --- a/binaries/cuprated/src/rpc/request/blockchain.rs +++ b/binaries/cuprated/src/rpc/request/blockchain.rs @@ -6,6 +6,7 @@ use std::{ }; use anyhow::Error; +use indexmap::{IndexMap, IndexSet}; use monero_serai::block::Block; use tower::{Service, ServiceExt}; @@ -13,6 +14,7 @@ use cuprate_blockchain::{service::BlockchainReadHandle, types::AltChainInfo}; use cuprate_helper::cast::{u64_to_usize, usize_to_u64}; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, + output_cache::OutputCache, Chain, ChainInfo, CoinbaseTxSum, ExtendedBlockHeader, HardFork, MinerData, OutputHistogramEntry, OutputHistogramInput, OutputOnChain, }; @@ -185,8 +187,8 @@ pub(crate) async fn generated_coins( /// [`BlockchainReadRequest::Outputs`] pub(crate) async fn outputs( blockchain_read: &mut BlockchainReadHandle, - outputs: HashMap>, -) -> Result>, Error> { + outputs: IndexMap>, +) -> Result { let BlockchainResponse::Outputs(outputs) = blockchain_read .ready() .await? diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index 93d47f1..17eb8e5 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -172,6 +172,7 @@ async fn handle_incoming_txs( context.current_adjusted_timestamp_for_time_lock(), context.current_hf, blockchain_read_handle, + None, ) .verify() .await diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 076e0ad..8a6edba 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -23,6 +23,7 @@ monero-serai = { workspace = true, features = ["std"] } rayon = { workspace = true } thread_local = { workspace = true } +indexmap = { workspace = true, features = ["std"] } hex = { workspace = true } rand = { workspace = true } diff --git a/consensus/rules/Cargo.toml b/consensus/rules/Cargo.toml index 8999cbc..6e7c8b6 100644 --- a/consensus/rules/Cargo.toml +++ b/consensus/rules/Cargo.toml @@ -21,6 +21,7 @@ curve25519-dalek = { workspace = true, features = ["alloc", "zeroize", "precompu rand = { workspace = true, features = ["std", "std_rng"] } +indexmap = { workspace = true, features = ["std"] } hex = { workspace = true, features = ["std"] } hex-literal = { workspace = true } crypto-bigint = { workspace = true } diff --git a/consensus/rules/src/transactions/contextual_data.rs b/consensus/rules/src/transactions/contextual_data.rs index 73bc12e..35f8e47 100644 --- a/consensus/rules/src/transactions/contextual_data.rs +++ b/consensus/rules/src/transactions/contextual_data.rs @@ -1,9 +1,7 @@ -use std::{ - cmp::{max, min}, - collections::{HashMap, HashSet}, -}; +use std::cmp::{max, min}; use curve25519_dalek::EdwardsPoint; +use indexmap::{IndexMap, IndexSet}; use monero_serai::transaction::{Input, Timelock}; use crate::{transactions::TransactionError, HardFork}; @@ -33,7 +31,7 @@ pub fn get_absolute_offsets(relative_offsets: &[u64]) -> Result, Transa /// pub fn insert_ring_member_ids( inputs: &[Input], - output_ids: &mut HashMap>, + output_ids: &mut IndexMap>, ) -> Result<(), TransactionError> { if inputs.is_empty() { return Err(TransactionError::NoInputs); diff --git a/consensus/src/block.rs b/consensus/src/block.rs index b94be0f..89e89e0 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -36,7 +36,7 @@ mod batch_prepare; mod free; pub use alt_block::sanity_check_alt_block; -pub use batch_prepare::batch_prepare_main_chain_blocks; +pub use batch_prepare::{batch_prepare_main_chain_blocks, BatchPrepareCache}; use free::pull_ordered_transactions; /// A pre-prepared block with all data needed to verify it, except the block's proof of work. @@ -243,7 +243,7 @@ where // Check that the txs included are what we need and that there are not any extra. let ordered_txs = pull_ordered_transactions(&prepped_block.block, txs)?; - verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, database).await + verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, database, None).await } /// Fully verify a block that has already been prepared using [`batch_prepare_main_chain_blocks`]. @@ -252,6 +252,7 @@ pub async fn verify_prepped_main_chain_block( mut txs: Vec, context_svc: &mut BlockchainContextService, database: D, + batch_prep_cache: Option<&mut BatchPrepareCache>, ) -> Result where D: Database + Clone + Send + 'static, @@ -283,6 +284,7 @@ where context.current_adjusted_timestamp_for_time_lock(), context.current_hf, database, + batch_prep_cache.as_deref(), ) .verify() .await?; @@ -304,7 +306,7 @@ where ) .map_err(ConsensusError::Block)?; - Ok(VerifiedBlockInformation { + let block = VerifiedBlockInformation { block_hash: prepped_block.block_hash, block: prepped_block.block, block_blob: prepped_block.block_blob, @@ -324,5 +326,11 @@ where height: context.chain_height, long_term_weight: context.next_block_long_term_weight(block_weight), cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty, - }) + }; + + if let Some(batch_prep_cache) = batch_prep_cache { + batch_prep_cache.output_cache.add_block_to_cache(&block); + } + + Ok(block) } diff --git a/consensus/src/block/batch_prepare.rs b/consensus/src/block/batch_prepare.rs index 54cb3fe..57515af 100644 --- a/consensus/src/block/batch_prepare.rs +++ b/consensus/src/block/batch_prepare.rs @@ -13,21 +13,41 @@ use cuprate_consensus_rules::{ ConsensusError, HardFork, }; use cuprate_helper::asynch::rayon_spawn_async; -use cuprate_types::TransactionVerificationData; +use cuprate_types::{output_cache::OutputCache, TransactionVerificationData}; use crate::{ batch_verifier::MultiThreadedBatchVerifier, block::{free::order_transactions, PreparedBlock, PreparedBlockExPow}, - transactions::start_tx_verification, + transactions::{check_kis_unique, contextual_data::get_output_cache, start_tx_verification}, BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError, + __private::Database, }; +/// Cached state created when batch preparing a group of blocks. +/// +/// This cache is only valid for the set of blocks it was created with, it should not be used for +/// other blocks. +pub struct BatchPrepareCache { + pub(crate) output_cache: OutputCache, + /// [`true`] if all the key images in the batch have been checked for double spends in the batch and + /// the whole chain. + pub(crate) key_images_spent_checked: bool, +} + /// Batch prepares a list of blocks for verification. #[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))] -pub async fn batch_prepare_main_chain_blocks( +#[expect(clippy::type_complexity)] +pub async fn batch_prepare_main_chain_blocks( blocks: Vec<(Block, Vec)>, context_svc: &mut BlockchainContextService, -) -> Result)>, ExtendedConsensusError> { + mut database: D, +) -> Result< + ( + Vec<(PreparedBlock, Vec)>, + BatchPrepareCache, + ), + ExtendedConsensusError, +> { let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); tracing::debug!("Calculating block hashes."); @@ -189,5 +209,16 @@ pub async fn batch_prepare_main_chain_blocks( }) .await?; - Ok(blocks) + check_kis_unique(blocks.iter().flat_map(|(_, txs)| txs.iter()), &mut database).await?; + + let output_cache = + get_output_cache(blocks.iter().flat_map(|(_, txs)| txs.iter()), database).await?; + + Ok(( + blocks, + BatchPrepareCache { + output_cache, + key_images_spent_checked: true, + }, + )) } diff --git a/consensus/src/transactions.rs b/consensus/src/transactions.rs index f776247..7b6222e 100644 --- a/consensus/src/transactions.rs +++ b/consensus/src/transactions.rs @@ -41,11 +41,13 @@ use cuprate_consensus_rules::{ use cuprate_helper::asynch::rayon_spawn_async; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, + output_cache::OutputCache, CachedVerificationState, TransactionVerificationData, TxVersion, }; use crate::{ batch_verifier::MultiThreadedBatchVerifier, + block::BatchPrepareCache, transactions::contextual_data::{batch_get_decoy_info, batch_get_ring_member_info}, Database, ExtendedConsensusError, }; @@ -155,6 +157,7 @@ impl VerificationWanted { time_for_time_lock: u64, hf: HardFork, database: D, + batch_prep_cache: Option<&BatchPrepareCache>, ) -> FullVerification { FullVerification { prepped_txs: self.prepped_txs, @@ -163,6 +166,7 @@ impl VerificationWanted { time_for_time_lock, hf, database, + batch_prep_cache, } } } @@ -208,7 +212,7 @@ impl SemanticVerification { /// Full transaction verification. /// /// [`VerificationWanted::full`] -pub struct FullVerification { +pub struct FullVerification<'a, D> { prepped_txs: Vec, current_chain_height: usize, @@ -216,14 +220,20 @@ pub struct FullVerification { time_for_time_lock: u64, hf: HardFork, database: D, + batch_prep_cache: Option<&'a BatchPrepareCache>, } -impl FullVerification { +impl FullVerification<'_, D> { /// Fully verify each transaction. pub async fn verify( mut self, ) -> Result, ExtendedConsensusError> { - check_kis_unique(&self.prepped_txs, &mut self.database).await?; + if self + .batch_prep_cache + .is_none_or(|c| !c.key_images_spent_checked) + { + check_kis_unique(self.prepped_txs.iter(), &mut self.database).await?; + } let hashes_in_main_chain = hashes_referenced_in_main_chain(&self.prepped_txs, &mut self.database).await?; @@ -250,6 +260,7 @@ impl FullVerification { }), self.hf, self.database.clone(), + self.batch_prep_cache.map(|c| &c.output_cache), ) .await?; } @@ -262,19 +273,20 @@ impl FullVerification { self.time_for_time_lock, self.hf, self.database, + self.batch_prep_cache.map(|c| &c.output_cache), ) .await } } /// Check that each key image used in each transaction is unique in the whole chain. -async fn check_kis_unique( - txs: &[TransactionVerificationData], +pub(crate) async fn check_kis_unique( + mut txs: impl Iterator, database: &mut D, ) -> Result<(), ExtendedConsensusError> { - let mut spent_kis = HashSet::with_capacity(txs.len()); + let mut spent_kis = HashSet::with_capacity(txs.size_hint().1.unwrap_or(0) * 2); - txs.iter().try_for_each(|tx| { + txs.try_for_each(|tx| { tx.tx.prefix().inputs.iter().try_for_each(|input| { if let Input::ToKey { key_image, .. } = input { if !spent_kis.insert(key_image.compress().0) { @@ -432,13 +444,14 @@ async fn verify_transactions_decoy_info( txs: impl Iterator + Clone, hf: HardFork, database: D, + output_cache: Option<&OutputCache>, ) -> Result<(), ExtendedConsensusError> { // Decoy info is not validated for V1 txs. if hf == HardFork::V1 { return Ok(()); } - batch_get_decoy_info(txs, hf, database) + batch_get_decoy_info(txs, hf, database, output_cache) .await? .try_for_each(|decoy_info| decoy_info.and_then(|di| Ok(check_decoy_info(&di, hf)?)))?; @@ -450,6 +463,7 @@ async fn verify_transactions_decoy_info( /// The inputs to this function are the txs wanted to be verified and a list of [`VerificationNeeded`], /// if any other [`VerificationNeeded`] is specified other than [`VerificationNeeded::Contextual`] or /// [`VerificationNeeded::SemanticAndContextual`], nothing will be verified for that tx. +#[expect(clippy::too_many_arguments)] async fn verify_transactions( mut txs: Vec, verification_needed: Vec, @@ -458,6 +472,7 @@ async fn verify_transactions( current_time_lock_timestamp: u64, hf: HardFork, database: D, + output_cache: Option<&OutputCache>, ) -> Result, ExtendedConsensusError> where D: Database, @@ -478,6 +493,7 @@ where .map(|(tx, _)| tx), hf, database, + output_cache, ) .await?; diff --git a/consensus/src/transactions/contextual_data.rs b/consensus/src/transactions/contextual_data.rs index f9e3115..7223669 100644 --- a/consensus/src/transactions/contextual_data.rs +++ b/consensus/src/transactions/contextual_data.rs @@ -10,8 +10,10 @@ //! //! Because this data is unique for *every* transaction and the context service is just for blockchain state data. //! -use std::collections::{HashMap, HashSet}; +use std::{borrow::Cow, collections::HashSet}; + +use indexmap::IndexMap; use monero_serai::transaction::{Input, Timelock}; use tower::ServiceExt; use tracing::instrument; @@ -23,8 +25,10 @@ use cuprate_consensus_rules::{ }, ConsensusError, HardFork, TxVersion, }; + use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, + output_cache::OutputCache, OutputOnChain, }; @@ -134,18 +138,16 @@ fn new_rings( }) } -/// Retrieves the [`TxRingMembersInfo`] for the inputted [`TransactionVerificationData`]. +/// Retrieves an [`OutputCache`] for the list of transactions. /// -/// This function batch gets all the ring members for the inputted transactions and fills in data about -/// them. -pub async fn batch_get_ring_member_info( - txs_verification_data: impl Iterator + Clone, - hf: HardFork, +/// The [`OutputCache`] will only contain the outputs currently in the blockchain. +pub async fn get_output_cache( + txs_verification_data: impl Iterator, mut database: D, -) -> Result, ExtendedConsensusError> { - let mut output_ids = HashMap::new(); +) -> Result { + let mut output_ids = IndexMap::new(); - for tx_v_data in txs_verification_data.clone() { + for tx_v_data in txs_verification_data { insert_ring_member_ids(&tx_v_data.tx.prefix().inputs, &mut output_ids) .map_err(ConsensusError::Transaction)?; } @@ -159,23 +161,47 @@ pub async fn batch_get_ring_member_info( panic!("Database sent incorrect response!") }; - let BlockchainResponse::NumberOutputsWithAmount(outputs_with_amount) = database - .ready() - .await? - .call(BlockchainReadRequest::NumberOutputsWithAmount( - outputs.keys().copied().collect(), - )) - .await? - else { - panic!("Database sent incorrect response!") + Ok(outputs) +} + +/// Retrieves the [`TxRingMembersInfo`] for the inputted [`TransactionVerificationData`]. +/// +/// This function batch gets all the ring members for the inputted transactions and fills in data about +/// them. +pub async fn batch_get_ring_member_info( + txs_verification_data: impl Iterator + Clone, + hf: HardFork, + mut database: D, + cache: Option<&OutputCache>, +) -> Result, ExtendedConsensusError> { + let mut output_ids = IndexMap::new(); + + for tx_v_data in txs_verification_data.clone() { + insert_ring_member_ids(&tx_v_data.tx.prefix().inputs, &mut output_ids) + .map_err(ConsensusError::Transaction)?; + } + + let outputs = if let Some(cache) = cache { + Cow::Borrowed(cache) + } else { + let BlockchainResponse::Outputs(outputs) = database + .ready() + .await? + .call(BlockchainReadRequest::Outputs(output_ids)) + .await? + else { + unreachable!(); + }; + + Cow::Owned(outputs) }; Ok(txs_verification_data .map(move |tx_v_data| { - let numb_outputs = |amt| outputs_with_amount.get(&amt).copied().unwrap_or(0); + let numb_outputs = |amt| outputs.number_outs_with_amount(amt); let ring_members_for_tx = get_ring_members_for_inputs( - |amt, idx| outputs.get(&amt)?.get(&idx).copied(), + |amt, idx| outputs.get_output(amt, idx).copied(), &tx_v_data.tx.prefix().inputs, ) .map_err(ConsensusError::Transaction)?; @@ -202,12 +228,13 @@ pub async fn batch_get_ring_member_info( /// This functions panics if `hf == HardFork::V1` as decoy info /// should not be needed for V1. #[instrument(level = "debug", skip_all)] -pub async fn batch_get_decoy_info<'a, D: Database>( +pub async fn batch_get_decoy_info<'a, 'b, D: Database>( txs_verification_data: impl Iterator + Clone, hf: HardFork, mut database: D, + cache: Option<&'b OutputCache>, ) -> Result< - impl Iterator> + sealed::Captures<&'a ()>, + impl Iterator> + sealed::Captures<(&'a (), &'b ())>, ExtendedConsensusError, > { // decoy info is not needed for V1. @@ -229,15 +256,24 @@ pub async fn batch_get_decoy_info<'a, D: Database>( unique_input_amounts.len() ); - let BlockchainResponse::NumberOutputsWithAmount(outputs_with_amount) = database - .ready() - .await? - .call(BlockchainReadRequest::NumberOutputsWithAmount( - unique_input_amounts.into_iter().collect(), - )) - .await? - else { - panic!("Database sent incorrect response!") + let outputs_with_amount = if let Some(cache) = cache { + unique_input_amounts + .into_iter() + .map(|amount| (amount, cache.number_outs_with_amount(amount))) + .collect() + } else { + let BlockchainResponse::NumberOutputsWithAmount(outputs_with_amount) = database + .ready() + .await? + .call(BlockchainReadRequest::NumberOutputsWithAmount( + unique_input_amounts.into_iter().collect(), + )) + .await? + else { + unreachable!(); + }; + + outputs_with_amount }; Ok(txs_verification_data.map(move |tx_v_data| { diff --git a/consensus/tests/verify_correct_txs.rs b/consensus/tests/verify_correct_txs.rs index 5415849..1166e7a 100644 --- a/consensus/tests/verify_correct_txs.rs +++ b/consensus/tests/verify_correct_txs.rs @@ -8,12 +8,14 @@ use std::{ }; use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY}; +use indexmap::IndexMap; use monero_serai::transaction::{Timelock, Transaction}; use tower::service_fn; use cuprate_consensus::{__private::Database, transactions::start_tx_verification}; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, + output_cache::OutputCache, OutputOnChain, }; @@ -32,15 +34,17 @@ fn dummy_database(outputs: BTreeMap) -> impl Database + Clon BlockchainReadRequest::Outputs(outs) => { let idxs = &outs[&0]; - let mut ret = HashMap::new(); + let mut ret = IndexMap::new(); ret.insert( 0_u64, idxs.iter() .map(|idx| (*idx, *outputs.get(idx).unwrap())) - .collect::>(), + .collect::>(), ); + let ret = OutputCache::new(ret, IndexMap::new(), IndexMap::new()); + BlockchainResponse::Outputs(ret) } BlockchainReadRequest::KeyImagesSpent(_) => BlockchainResponse::KeyImagesSpent(false), @@ -87,7 +91,7 @@ macro_rules! test_verify_valid_v2_tx { ) .prepare() .unwrap() - .full(10, [0; 32], u64::MAX, HardFork::$hf, database.clone()) + .full(10, [0; 32], u64::MAX, HardFork::$hf, database.clone(), None) .verify() .await.is_ok() ); @@ -116,7 +120,7 @@ macro_rules! test_verify_valid_v2_tx { ) .prepare() .unwrap() - .full(10, [0; 32], u64::MAX, HardFork::$hf, database.clone()) + .full(10, [0; 32], u64::MAX, HardFork::$hf, database.clone(), None) .verify() .await.is_err() ); diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index c935924..34324eb 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -31,6 +31,7 @@ rand = { workspace = true, features = ["std", "std_rng"] } monero-serai = { workspace = true, features = ["std"] } serde = { workspace = true, optional = true } +indexmap = { workspace = true, features = ["rayon"] } tower = { workspace = true } thread_local = { workspace = true } rayon = { workspace = true } diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 45c5aa6..9d88e66 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -15,6 +15,7 @@ use std::{ sync::Arc, }; +use indexmap::{IndexMap, IndexSet}; use rayon::{ iter::{Either, IntoParallelIterator, ParallelIterator}, prelude::*, @@ -27,7 +28,8 @@ use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThre use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, OutputOnChain, TxsInBlock, + output_cache::OutputCache, + Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, TxsInBlock, }; use crate::{ @@ -413,15 +415,36 @@ fn generated_coins(env: &ConcreteEnv, height: usize) -> ResponseResult { /// [`BlockchainReadRequest::Outputs`]. #[inline] -fn outputs(env: &ConcreteEnv, outputs: HashMap>) -> ResponseResult { +fn outputs(env: &ConcreteEnv, outputs: IndexMap>) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. let env_inner = env.env_inner(); let tx_ro = thread_local(env); let tables = thread_local(env); + let amount_of_outs = outputs + .par_iter() + .map(|(&amount, _)| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + if amount == 0 { + Ok((amount, tables.rct_outputs().len()?)) + } else { + // v1 transactions. + match tables.num_outputs().get(&amount) { + Ok(count) => Ok((amount, count)), + // If we get a request for an `amount` that doesn't exist, + // we return `0` instead of an error. + Err(RuntimeError::KeyNotFound) => Ok((amount, 0)), + Err(e) => Err(e), + } + } + }) + .collect::>()?; + // The 2nd mapping function. // This is pulled out from the below `map()` for readability. - let inner_map = |amount, amount_index| -> DbResult<(AmountIndex, OutputOnChain)> { + let inner_map = |amount, amount_index| { let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); @@ -430,26 +453,31 @@ fn outputs(env: &ConcreteEnv, outputs: HashMap>) -> amount_index, }; - let output_on_chain = id_to_output_on_chain(&id, tables)?; + let output_on_chain = match id_to_output_on_chain(&id, tables) { + Ok(output) => output, + Err(RuntimeError::KeyNotFound) => return Ok(Either::Right(amount_index)), + Err(e) => return Err(e), + }; - Ok((amount_index, output_on_chain)) + Ok(Either::Left((amount_index, output_on_chain))) }; // Collect results using `rayon`. - let map = outputs + let (map, wanted_outputs) = outputs .into_par_iter() .map(|(amount, amount_index_set)| { - Ok(( - amount, - amount_index_set - .into_par_iter() - .map(|amount_index| inner_map(amount, amount_index)) - .collect::>>()?, - )) - }) - .collect::>>>()?; + let (left, right) = amount_index_set + .into_par_iter() + .map(|amount_index| inner_map(amount, amount_index)) + .collect::>()?; - Ok(BlockchainResponse::Outputs(map)) + Ok(((amount, left), (amount, right))) + }) + .collect::>, IndexMap<_, IndexSet<_>>)>>()?; + + let cache = OutputCache::new(map, amount_of_outs, wanted_outputs); + + Ok(BlockchainResponse::Outputs(cache)) } /// [`BlockchainReadRequest::NumberOutputsWithAmount`]. diff --git a/storage/blockchain/src/service/tests.rs b/storage/blockchain/src/service/tests.rs index 38db665..1f05f0f 100644 --- a/storage/blockchain/src/service/tests.rs +++ b/storage/blockchain/src/service/tests.rs @@ -11,6 +11,7 @@ use std::{ sync::Arc, }; +use indexmap::{IndexMap, IndexSet}; use pretty_assertions::assert_eq; use rand::Rng; use tower::{Service, ServiceExt}; @@ -241,7 +242,7 @@ async fn test_template( //----------------------------------------------------------------------- Output checks // Create the map of amounts and amount indices. let (map, output_count) = { - let mut map = HashMap::>::new(); + let mut map = IndexMap::>::new(); // Used later to compare the amount of Outputs // returned in the Response is equal to the amount @@ -270,7 +271,7 @@ async fn test_template( .and_modify(|set| { set.insert(id.amount_index); }) - .or_insert_with(|| HashSet::from([id.amount_index])); + .or_insert_with(|| IndexSet::from([id.amount_index])); }); (map, output_count) @@ -300,22 +301,18 @@ async fn test_template( }; // Assert amount of `Amount`'s are the same. - assert_eq!(map.len(), response.len()); + assert_eq!(map.len(), response.cached_outputs().len()); // Assert we get back the same map of // `Amount`'s and `AmountIndex`'s. let mut response_output_count = 0; - #[expect( - clippy::iter_over_hash_type, - reason = "order doesn't matter in this test" - )] - for (amount, output_map) in response { - let amount_index_set = &map[&amount]; + for (amount, output_map) in response.cached_outputs() { + let amount_index_set = &map[amount]; for (amount_index, output) in output_map { response_output_count += 1; - assert!(amount_index_set.contains(&amount_index)); - assert!(outputs_on_chain.contains(&output)); + assert!(amount_index_set.contains(amount_index)); + assert!(outputs_on_chain.contains(output)); } } diff --git a/storage/database/src/backend/heed/env.rs b/storage/database/src/backend/heed/env.rs index 4d7e1b5..b1311b9 100644 --- a/storage/database/src/backend/heed/env.rs +++ b/storage/database/src/backend/heed/env.rs @@ -137,7 +137,7 @@ impl Env for ConcreteEnv { // MAYBE: Set comparison functions for certain tables // unsafe { - env_open_options.flags(flags); + env_open_options.flags(flags | EnvFlags::NO_READ_AHEAD); } // Set the memory map size to diff --git a/storage/database/src/backend/redb/database.rs b/storage/database/src/backend/redb/database.rs index 84d7c75..0371501 100644 --- a/storage/database/src/backend/redb/database.rs +++ b/storage/database/src/backend/redb/database.rs @@ -1,8 +1,6 @@ //! Implementation of `trait DatabaseR{o,w}` for `redb`. //---------------------------------------------------------------------------------------------------- Import -use std::ops::RangeBounds; - use redb::ReadableTable; use crate::{ diff --git a/storage/database/src/backend/redb/env.rs b/storage/database/src/backend/redb/env.rs index b8db70d..59697db 100644 --- a/storage/database/src/backend/redb/env.rs +++ b/storage/database/src/backend/redb/env.rs @@ -103,7 +103,8 @@ impl Env for ConcreteEnv { // `redb`'s syncs are tied with write transactions, // so just create one, don't do anything and commit. let mut tx_rw = self.env.begin_write()?; - tx_rw.set_durability(redb::Durability::Paranoid); + tx_rw.set_durability(redb::Durability::Immediate); + tx_rw.set_two_phase_commit(true); TxRw::commit(tx_rw) } diff --git a/types/Cargo.toml b/types/Cargo.toml index e1ffb19..73afe27 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["cuprate", "types"] [features] default = ["blockchain", "epee", "serde", "json", "hex"] -blockchain = [] +blockchain = ["dep:indexmap", "dep:cuprate-helper", "cuprate-helper/crypto"] epee = ["dep:cuprate-epee-encoding"] serde = ["dep:serde", "hex"] proptest = ["dep:proptest", "dep:proptest-derive"] @@ -31,6 +31,7 @@ hex = { workspace = true, features = ["serde", "alloc"], optional = serde = { workspace = true, features = ["std", "derive"], optional = true } strum = { workspace = true, features = ["derive"] } thiserror = { workspace = true } +indexmap = { workspace = true, features = ["std"], optional = true } proptest = { workspace = true, optional = true } proptest-derive = { workspace = true, optional = true } diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index 58a23b2..370065c 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -8,10 +8,12 @@ use std::{ ops::Range, }; +use indexmap::{IndexMap, IndexSet}; use monero_serai::block::Block; use crate::{ - types::{Chain, ExtendedBlockHeader, OutputOnChain, TxsInBlock, VerifiedBlockInformation}, + output_cache::OutputCache, + types::{Chain, ExtendedBlockHeader, TxsInBlock, VerifiedBlockInformation}, AltBlockInformation, BlockCompleteEntry, ChainId, ChainInfo, CoinbaseTxSum, OutputHistogramEntry, OutputHistogramInput, }; @@ -86,7 +88,7 @@ pub enum BlockchainReadRequest { /// For RCT outputs, the amounts would be `0` and /// the amount indices would represent the global /// RCT output indices. - Outputs(HashMap>), + Outputs(IndexMap>), /// Request the amount of outputs with a certain amount. /// @@ -254,9 +256,9 @@ pub enum BlockchainResponse { /// Response to [`BlockchainReadRequest::Outputs`]. /// - /// Inner value is all the outputs requested, - /// associated with their amount and amount index. - Outputs(HashMap>), + /// Inner value is an [`OutputCache`], missing outputs won't trigger an error, they just will not be + /// in the cache until the cache is updated with the block containing those outputs. + Outputs(OutputCache), /// Response to [`BlockchainReadRequest::NumberOutputsWithAmount`]. /// diff --git a/types/src/lib.rs b/types/src/lib.rs index 7aaf0b9..802d9fe 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -34,10 +34,12 @@ pub use types::{ #[cfg(feature = "blockchain")] pub mod blockchain; +#[cfg(feature = "blockchain")] +pub mod output_cache; + #[cfg(feature = "json")] pub mod json; #[cfg(feature = "hex")] pub mod hex; - //---------------------------------------------------------------------------------------------------- Private diff --git a/types/src/output_cache.rs b/types/src/output_cache.rs new file mode 100644 index 0000000..04e53bf --- /dev/null +++ b/types/src/output_cache.rs @@ -0,0 +1,127 @@ +use curve25519_dalek::EdwardsPoint; +use indexmap::{IndexMap, IndexSet}; +use monero_serai::transaction::Transaction; + +use cuprate_helper::{cast::u64_to_usize, crypto::compute_zero_commitment}; + +use crate::{OutputOnChain, VerifiedBlockInformation}; + +/// A cache of outputs from the blockchain database. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct OutputCache { + /// A map of (amount, amount idx) -> output. + cached_outputs: IndexMap>, + /// A map of an output amount to the amount of outputs in the blockchain with that amount. + number_of_outputs: IndexMap, + /// A set of outputs that were requested but were not currently in the DB. + wanted_outputs: IndexMap>, +} + +impl OutputCache { + /// Create a new [`OutputCache`]. + pub const fn new( + cached_outputs: IndexMap>, + number_of_outputs: IndexMap, + wanted_outputs: IndexMap>, + ) -> Self { + Self { + cached_outputs, + number_of_outputs, + wanted_outputs, + } + } + + /// Returns the set of currently cached outputs. + /// + /// # Warning + /// + /// [`Self::get_output`] should be preferred over this when possible, this will not contain all outputs + /// asked for necessarily. + pub const fn cached_outputs(&self) -> &IndexMap> { + &self.cached_outputs + } + + /// Returns the number of outputs in the blockchain with the given amount. + /// + /// # Warning + /// + /// The cache will only track the amount of outputs with a given amount for the requested outputs. + /// So if you do not request an output with `amount` when generating the cache the amount of outputs + /// with value `amount` will not be tracked. + pub fn number_outs_with_amount(&self, amount: u64) -> usize { + u64_to_usize( + self.number_of_outputs + .get(&amount) + .copied() + .unwrap_or_default(), + ) + } + + /// Request an output with a given amount and amount index from the cache. + pub fn get_output(&self, amount: u64, index: u64) -> Option<&OutputOnChain> { + self.cached_outputs + .get(&amount) + .and_then(|map| map.get(&index)) + } + + /// Adds a [`Transaction`] to the cache. + fn add_tx(&mut self, height: usize, tx: &Transaction) { + for (i, out) in tx.prefix().outputs.iter().enumerate() { + let amount = if MINER_TX && tx.version() == 2 { + 0 + } else { + out.amount.unwrap_or_default() + }; + + let Some(outputs_with_amount) = self.number_of_outputs.get_mut(&amount) else { + continue; + }; + + let amount_index_of_out = *outputs_with_amount; + *outputs_with_amount += 1; + + if let Some(set) = self.wanted_outputs.get_mut(&amount) { + if set.swap_remove(&amount_index_of_out) { + self.cached_outputs.entry(amount).or_default().insert( + amount_index_of_out, + OutputOnChain { + height, + time_lock: tx.prefix().additional_timelock, + // TODO: this needs to check the point is canonical. + key: out.key.decompress(), + commitment: get_output_commitment(tx, i), + }, + ); + } + } + } + } + + /// Adds a block to the cache. + /// + /// This function will add any outputs to the cache that were requested when building the cache + /// but were not in the DB, if they are in the block. + pub fn add_block_to_cache(&mut self, block: &VerifiedBlockInformation) { + self.add_tx::(block.height, &block.block.miner_transaction); + + for tx in &block.txs { + self.add_tx::(block.height, &tx.tx); + } + } +} + +/// Returns the amount commitment for the output at the given index `i` in the [`Transaction`] +fn get_output_commitment(tx: &Transaction, i: usize) -> EdwardsPoint { + match tx { + Transaction::V1 { prefix, .. } => { + compute_zero_commitment(prefix.outputs[i].amount.unwrap_or_default()) + } + Transaction::V2 { prefix, proofs } => { + let Some(proofs) = proofs else { + return compute_zero_commitment(prefix.outputs[i].amount.unwrap_or_default()); + }; + + proofs.base.commitments[i] + } + } +}