From 807bfafb35dd3e0ea8a6a76000e8d1cb95afb675 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Fri, 15 Aug 2025 15:41:23 +0100 Subject: [PATCH] cuprated: add txpool manager (#483) * add txpool manager * add version check for tx-pool * add docs * fix merge * fix test * fix merge * fix cargo hack * clean up imports * fix typo * add small buffer to rebroadcasts * review fixes * small clean up * fmt * fix merge * review fixes --- Cargo.lock | 1 + binaries/cuprated/src/blockchain/manager.rs | 9 +- .../src/blockchain/manager/handler.rs | 7 +- .../cuprated/src/blockchain/manager/tests.rs | 13 +- binaries/cuprated/src/config.rs | 2 +- binaries/cuprated/src/config/storage.rs | 11 +- binaries/cuprated/src/main.rs | 8 +- binaries/cuprated/src/p2p/request_handler.rs | 25 +- .../cuprated/src/rpc/handlers/json_rpc.rs | 9 +- binaries/cuprated/src/txpool.rs | 2 + binaries/cuprated/src/txpool/dandelion.rs | 12 +- .../cuprated/src/txpool/dandelion/tx_store.rs | 29 +- binaries/cuprated/src/txpool/incoming_tx.rs | 138 ++--- binaries/cuprated/src/txpool/manager.rs | 497 ++++++++++++++++++ p2p/dandelion-tower/Cargo.toml | 2 +- p2p/dandelion-tower/src/pool/manager.rs | 45 +- p2p/dandelion-tower/src/pool/mod.rs | 13 +- storage/txpool/Cargo.toml | 3 +- storage/txpool/README.md | 2 +- storage/txpool/src/free.rs | 77 ++- storage/txpool/src/lib.rs | 2 +- storage/txpool/src/ops.rs | 2 +- storage/txpool/src/ops/tx_write.rs | 2 + storage/txpool/src/service.rs | 2 +- storage/txpool/src/service/free.rs | 4 +- storage/txpool/src/service/interface.rs | 6 +- storage/txpool/src/service/read.rs | 27 +- storage/txpool/src/service/write.rs | 14 +- storage/txpool/src/tables.rs | 6 +- storage/txpool/src/tx.rs | 8 +- storage/txpool/src/types.rs | 8 + 31 files changed, 804 insertions(+), 182 deletions(-) create mode 100644 binaries/cuprated/src/txpool/manager.rs diff --git a/Cargo.lock b/Cargo.lock index 4fb2ed2..1b4560b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1543,6 +1543,7 @@ dependencies = [ "thiserror 2.0.12", "tokio", "tower", + "tracing", ] [[package]] diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 4e02915..d0637b3 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -28,6 +28,7 @@ use crate::{ types::ConsensusBlockchainReadHandle, }, constants::PANIC_CRITICAL_SERVICE_ERROR, + txpool::TxpoolManagerHandle, }; mod commands; @@ -46,7 +47,7 @@ pub async fn init_blockchain_manager( clearnet_interface: NetworkInterface, blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, - txpool_write_handle: TxpoolWriteHandle, + txpool_manager_handle: TxpoolManagerHandle, mut blockchain_context_service: BlockchainContextService, block_downloader_config: BlockDownloaderConfig, ) { @@ -72,7 +73,7 @@ pub async fn init_blockchain_manager( blockchain_read_handle, BoxError::from, ), - txpool_write_handle, + txpool_manager_handle, blockchain_context_service, stop_current_block_downloader, broadcast_svc: clearnet_interface.broadcast_svc(), @@ -93,8 +94,8 @@ pub struct BlockchainManager { blockchain_write_handle: BlockchainWriteHandle, /// A [`BlockchainReadHandle`]. blockchain_read_handle: ConsensusBlockchainReadHandle, - /// A [`TxpoolWriteHandle`]. - txpool_write_handle: TxpoolWriteHandle, + + txpool_manager_handle: TxpoolManagerHandle, /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve /// values without needing to go to a [`BlockchainReadHandle`]. blockchain_context_service: BlockchainContextService, diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index c7a39df..5f88707 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -619,11 +619,8 @@ impl super::BlockchainManager { .await .expect(PANIC_CRITICAL_SERVICE_ERROR); - self.txpool_write_handle - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(TxpoolWriteRequest::NewBlock { spent_key_images }) + self.txpool_manager_handle + .new_block(spent_key_images) .await .expect(PANIC_CRITICAL_SERVICE_ERROR); } diff --git a/binaries/cuprated/src/blockchain/manager/tests.rs b/binaries/cuprated/src/blockchain/manager/tests.rs index 7efa313..1363a3b 100644 --- a/binaries/cuprated/src/blockchain/manager/tests.rs +++ b/binaries/cuprated/src/blockchain/manager/tests.rs @@ -14,9 +14,12 @@ use cuprate_p2p::{block_downloader::BlockBatch, BroadcastSvc}; use cuprate_p2p_core::handles::HandleBuilder; use cuprate_types::{CachedVerificationState, TransactionVerificationData, TxVersion}; -use crate::blockchain::{ - check_add_genesis, manager::BlockchainManager, manager::BlockchainManagerCommand, - ConsensusBlockchainReadHandle, +use crate::{ + blockchain::{ + check_add_genesis, manager::BlockchainManager, manager::BlockchainManagerCommand, + ConsensusBlockchainReadHandle, + }, + txpool::TxpoolManagerHandle, }; async fn mock_manager(data_dir: PathBuf) -> BlockchainManager { @@ -30,7 +33,7 @@ async fn mock_manager(data_dir: PathBuf) -> BlockchainManager { let (mut blockchain_read_handle, mut blockchain_write_handle, _) = cuprate_blockchain::service::init(blockchain_config).unwrap(); let (txpool_read_handle, txpool_write_handle, _) = - cuprate_txpool::service::init(txpool_config).unwrap(); + cuprate_txpool::service::init(&txpool_config).unwrap(); check_add_genesis( &mut blockchain_read_handle, @@ -56,7 +59,7 @@ async fn mock_manager(data_dir: PathBuf) -> BlockchainManager { BlockchainManager { blockchain_write_handle, blockchain_read_handle, - txpool_write_handle, + txpool_manager_handle: TxpoolManagerHandle::mock(), blockchain_context_service, stop_current_block_downloader: Arc::new(Default::default()), broadcast_svc: BroadcastSvc::mock(), diff --git a/binaries/cuprated/src/config.rs b/binaries/cuprated/src/config.rs index bbfa55e..a60e6e9 100644 --- a/binaries/cuprated/src/config.rs +++ b/binaries/cuprated/src/config.rs @@ -45,7 +45,7 @@ use fs::FileSystemConfig; use p2p::P2PConfig; use rayon::RayonConfig; pub use rpc::RpcConfig; -use storage::StorageConfig; +pub use storage::{StorageConfig, TxpoolConfig}; use tokio::TokioConfig; use tor::TorConfig; use tracing_config::TracingConfig; diff --git a/binaries/cuprated/src/config/storage.rs b/binaries/cuprated/src/config/storage.rs index 7754045..f86563e 100644 --- a/binaries/cuprated/src/config/storage.rs +++ b/binaries/cuprated/src/config/storage.rs @@ -62,7 +62,7 @@ config_struct! { pub struct BlockchainConfig { } /// The tx-pool config. - #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] #[serde(deny_unknown_fields, default)] pub struct TxpoolConfig { /// The maximum size of the tx-pool. @@ -71,6 +71,14 @@ config_struct! { /// Valid values | >= 0 /// Examples | 100_000_000, 50_000_000 pub max_txpool_byte_size: usize, + + /// The maximum age of transactions in the pool in seconds. + /// Transactions will be dropped after this time is reached. + /// + /// Type | Number + /// Valid values | >= 0 + /// Examples | 100_000_000, 50_000_000 + pub maximum_age_secs: u64, } } @@ -79,6 +87,7 @@ impl Default for TxpoolConfig { Self { sync_mode: SyncMode::default(), max_txpool_byte_size: 100_000_000, + maximum_age_secs: 60 * 60 * 24, } } } diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 5bf6feb..8f9f27e 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -92,7 +92,7 @@ fn main() { .expect(DATABASE_CORRUPT_MSG); let (txpool_read_handle, txpool_write_handle, _) = - cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool) + cuprate_txpool::service::init_with_pool(&config.txpool_config(), db_thread_pool) .inspect_err(|e| error!("Txpool database error: {e}")) .expect(DATABASE_CORRUPT_MSG); @@ -137,13 +137,15 @@ fn main() { // Create the incoming tx handler service. let tx_handler = IncomingTxHandler::init( + config.storage.txpool.clone(), network_interfaces.clearnet_network_interface.clone(), network_interfaces.tor_network_interface, txpool_write_handle.clone(), txpool_read_handle.clone(), context_svc.clone(), blockchain_read_handle.clone(), - ); + ) + .await; // Send tx handler sender to all network zones for zone in tx_handler_subscribers { @@ -157,7 +159,7 @@ fn main() { network_interfaces.clearnet_network_interface, blockchain_write_handle, blockchain_read_handle.clone(), - txpool_write_handle.clone(), + tx_handler.txpool_manager.clone(), context_svc.clone(), config.block_downloader_config(), ) diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 79f14db..fdb5ca8 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -14,6 +14,7 @@ use monero_serai::{block::Block, transaction::Transaction}; use tokio::sync::{broadcast, oneshot, watch}; use tokio_stream::wrappers::WatchStream; use tower::{Service, ServiceExt}; +use tracing::instrument; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::{ @@ -22,10 +23,9 @@ use cuprate_consensus::{ }; use cuprate_dandelion_tower::TxState; use cuprate_fixed_bytes::ByteArrayVec; -use cuprate_helper::cast::u64_to_usize; use cuprate_helper::{ asynch::rayon_spawn_async, - cast::usize_to_u64, + cast::{u64_to_usize, usize_to_u64}, map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, }; use cuprate_p2p::constants::{ @@ -363,6 +363,7 @@ async fn new_fluffy_block( } /// [`ProtocolRequest::NewTransactions`] +#[instrument(level = "debug", skip_all, fields(txs = request.txs.len(), stem = !request.dandelionpp_fluff))] async fn new_transactions( peer_information: PeerInformation, request: NewTransactions, @@ -373,16 +374,22 @@ where A: NetZoneAddress, InternalPeerID: Into, { + tracing::debug!("handling new transactions"); + let context = blockchain_context_service.blockchain_context(); // If we are more than 2 blocks behind the peer then ignore the txs - we are probably still syncing. - if usize_to_u64(context.chain_height + 2) - < peer_information - .core_sync_data - .lock() - .unwrap() - .current_height - { + let peer_height = peer_information + .core_sync_data + .lock() + .unwrap() + .current_height; + if usize_to_u64(context.chain_height + 2) < peer_height { + tracing::debug!( + our_height = context.chain_height, + peer_height, + "we are too far behind peer, ignoring txs." + ); return Ok(ProtocolResponse::NA); } diff --git a/binaries/cuprated/src/rpc/handlers/json_rpc.rs b/binaries/cuprated/src/rpc/handlers/json_rpc.rs index ffcbfda..e05470f 100644 --- a/binaries/cuprated/src/rpc/handlers/json_rpc.rs +++ b/binaries/cuprated/src/rpc/handlers/json_rpc.rs @@ -23,6 +23,7 @@ use cuprate_helper::{ cast::{u32_to_usize, u64_to_usize, usize_to_u64}, fmt::HexPrefix, map::split_u128_into_low_high_bits, + time::current_unix_timestamp, }; use cuprate_hex::{Hex, HexVec}; use cuprate_p2p_core::{client::handshaker::builder::DummyAddressBook, ClearNet, Network}; @@ -923,13 +924,15 @@ async fn get_transaction_pool_backlog( mut state: CupratedRpcHandler, _: GetTransactionPoolBacklogRequest, ) -> Result { + let now = current_unix_timestamp(); + let backlog = txpool::backlog(&mut state.txpool_read) .await? .into_iter() .map(|entry| TxBacklogEntry { - weight: entry.weight, + weight: usize_to_u64(entry.weight), fee: entry.fee, - time_in_pool: entry.time_in_pool.as_secs(), + time_in_pool: now - entry.received_at, }) .collect(); @@ -968,7 +971,7 @@ async fn get_miner_data( .into_iter() .map(|entry| GetMinerDataTxBacklogEntry { id: Hex(entry.id), - weight: entry.weight, + weight: usize_to_u64(entry.weight), fee: entry.fee, }) .collect(); diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index eccf2ff..8a99625 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -8,8 +8,10 @@ use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; mod dandelion; mod incoming_tx; +mod manager; mod relay_rules; mod txs_being_handled; pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs}; +pub use manager::TxpoolManagerHandle; pub use relay_rules::RelayRuleError; diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs index 14473e9..17a2c1b 100644 --- a/binaries/cuprated/src/txpool/dandelion.rs +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -6,6 +6,9 @@ use std::{ use futures::{future::BoxFuture, FutureExt, TryFutureExt}; use tower::{Service, ServiceExt}; +use tokio::sync::mpsc; +use tokio_util::sync::PollSender; + use cuprate_dandelion_tower::{ pool::DandelionPoolService, traits::StemRequest, DandelionConfig, DandelionRouteReq, DandelionRouter, DandelionRouterError, Graph, State, TxState, @@ -25,6 +28,7 @@ mod stem_service; mod tx_store; pub use anon_net_service::AnonTxService; +pub use diffuse_service::DiffuseService; /// The configuration used for [`cuprate_dandelion_tower`]. /// @@ -39,7 +43,7 @@ const DANDELION_CONFIG: DandelionConfig = DandelionConfig { /// A [`DandelionRouter`] with all generic types defined. pub(super) type ConcreteDandelionRouter = DandelionRouter< stem_service::OutboundPeerStream, - diffuse_service::DiffuseService, + DiffuseService, CrossNetworkInternalPeerId, stem_service::StemPeerService, DandelionTx, @@ -106,7 +110,7 @@ impl Service> for Mai pub fn start_dandelion_pool_manager( router: MainDandelionRouter, txpool_read_handle: TxpoolReadHandle, - txpool_write_handle: TxpoolWriteHandle, + promote_tx: mpsc::Sender<[u8; 32]>, ) -> DandelionPoolService { cuprate_dandelion_tower::pool::start_dandelion_pool_manager( // TODO: make this constant configurable? @@ -114,7 +118,7 @@ pub fn start_dandelion_pool_manager( router, tx_store::TxStoreService { txpool_read_handle, - txpool_write_handle, + promote_tx: PollSender::new(promote_tx), }, DANDELION_CONFIG, ) @@ -128,7 +132,7 @@ where InternalPeerID: Into, { DandelionRouter::new( - diffuse_service::DiffuseService { + DiffuseService { clear_net_broadcast_service: network_interface.broadcast_svc(), }, stem_service::OutboundPeerStream::::new(network_interface), diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs index b890ffd..01f8517 100644 --- a/binaries/cuprated/src/txpool/dandelion/tx_store.rs +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -1,7 +1,12 @@ -use std::task::{Context, Poll}; +use std::{ + future::ready, + task::{Context, Poll}, +}; use bytes::Bytes; use futures::{future::BoxFuture, FutureExt}; +use tokio::sync::mpsc; +use tokio_util::sync::PollSender; use tower::{Service, ServiceExt}; use cuprate_dandelion_tower::{ @@ -21,7 +26,7 @@ use super::{DandelionTx, TxId}; /// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides. pub struct TxStoreService { pub txpool_read_handle: TxpoolReadHandle, - pub txpool_write_handle: TxpoolWriteHandle, + pub promote_tx: PollSender<[u8; 32]>, } impl Service> for TxStoreService { @@ -29,8 +34,8 @@ impl Service> for TxStoreService { type Error = tower::BoxError; type Future = BoxFuture<'static, Result>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.promote_tx.poll_reserve(cx).map_err(Into::into) } fn call(&mut self, req: TxStoreRequest) -> Self::Future { @@ -60,15 +65,13 @@ impl Service> for TxStoreService { Ok(_) => unreachable!(), }) .boxed(), - TxStoreRequest::Promote(tx_id) => self - .txpool_write_handle - .clone() - .oneshot(TxpoolWriteRequest::Promote(tx_id)) - .map(|res| match res { - Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok), - Err(e) => Err(e.into()), - }) - .boxed(), + TxStoreRequest::Promote(tx_id) => ready( + self.promote_tx + .send_item(tx_id) + .map_err(Into::into) + .map(|()| TxStoreResponse::Ok), + ) + .boxed(), } } } diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index e1f81c4..069e6d3 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -7,13 +7,15 @@ use std::{ use bytes::Bytes; use futures::{future::BoxFuture, FutureExt}; use monero_serai::transaction::Transaction; +use tokio::sync::mpsc; use tower::{BoxError, Service, ServiceExt}; +use tracing::instrument; use cuprate_blockchain::service::BlockchainReadHandle; -use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions}; use cuprate_consensus::{ - transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, - BlockchainContextService, ExtendedConsensusError, + transactions::{new_tx_verification_data, start_tx_verification, PrepTransactions}, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, + ExtendedConsensusError, }; use cuprate_dandelion_tower::{ pool::{DandelionPoolService, IncomingTxBuilder}, @@ -35,11 +37,15 @@ use cuprate_types::TransactionVerificationData; use crate::{ blockchain::ConsensusBlockchainReadHandle, + config::TxpoolConfig, constants::PANIC_CRITICAL_SERVICE_ERROR, p2p::CrossNetworkInternalPeerId, signals::REORG_LOCK, txpool::{ - dandelion::{self, AnonTxService, ConcreteDandelionRouter, MainDandelionRouter}, + dandelion::{ + self, AnonTxService, ConcreteDandelionRouter, DiffuseService, MainDandelionRouter, + }, + manager::{start_txpool_manager, TxpoolManagerHandle}, relay_rules::{check_tx_relay_rules, RelayRuleError}, txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally}, }, @@ -92,8 +98,7 @@ pub struct IncomingTxHandler { /// The dandelion txpool manager. pub(super) dandelion_pool_manager: DandelionPoolService, - /// The txpool write handle. - pub(super) txpool_write_handle: TxpoolWriteHandle, + pub txpool_manager: TxpoolManagerHandle, /// The txpool read handle. pub(super) txpool_read_handle: TxpoolReadHandle, /// The blockchain read handle. @@ -103,7 +108,9 @@ pub struct IncomingTxHandler { impl IncomingTxHandler { /// Initialize the [`IncomingTxHandler`]. #[expect(clippy::significant_drop_tightening)] - pub fn init( + #[instrument(level = "info", skip_all, name = "start_txpool")] + pub async fn init( + txpool_config: TxpoolConfig, clear_net: NetworkInterface, tor_net: Option>, txpool_write_handle: TxpoolWriteHandle, @@ -111,22 +118,37 @@ impl IncomingTxHandler { blockchain_context_cache: BlockchainContextService, blockchain_read_handle: BlockchainReadHandle, ) -> Self { + let diffuse_service = DiffuseService { + clear_net_broadcast_service: clear_net.broadcast_svc(), + }; let clearnet_router = dandelion::dandelion_router(clear_net); let tor_router = tor_net.map(AnonTxService::new); let dandelion_router = MainDandelionRouter::new(clearnet_router, tor_router); + let (promote_tx, promote_rx) = mpsc::channel(25); + let dandelion_pool_manager = dandelion::start_dandelion_pool_manager( dandelion_router, txpool_read_handle.clone(), - txpool_write_handle.clone(), + promote_tx, ); + let txpool_manager = start_txpool_manager( + txpool_write_handle, + txpool_read_handle.clone(), + promote_rx, + diffuse_service, + dandelion_pool_manager.clone(), + txpool_config, + ) + .await; + Self { txs_being_handled: TxsBeingHandled::new(), blockchain_context_cache, dandelion_pool_manager, - txpool_write_handle, + txpool_manager, txpool_read_handle, blockchain_read_handle: ConsensusBlockchainReadHandle::new( blockchain_read_handle, @@ -151,8 +173,8 @@ impl Service for IncomingTxHandler { self.txs_being_handled.clone(), self.blockchain_context_cache.clone(), self.blockchain_read_handle.clone(), - self.txpool_write_handle.clone(), self.txpool_read_handle.clone(), + self.txpool_manager.clone(), self.dandelion_pool_manager.clone(), ) .boxed() @@ -170,8 +192,8 @@ async fn handle_incoming_txs( txs_being_handled: TxsBeingHandled, mut blockchain_context_cache: BlockchainContextService, blockchain_read_handle: ConsensusBlockchainReadHandle, - mut txpool_write_handle: TxpoolWriteHandle, mut txpool_read_handle: TxpoolReadHandle, + mut txpool_manager_handle: TxpoolManagerHandle, mut dandelion_pool_manager: DandelionPoolService, ) -> Result<(), IncomingTxError> { let _reorg_guard = REORG_LOCK.read().await; @@ -209,28 +231,33 @@ async fn handle_incoming_txs( return Err(IncomingTxError::RelayRule(e)); } - if !do_not_relay { - handle_valid_tx( - tx, - state.clone(), - &mut txpool_write_handle, - &mut dandelion_pool_manager, - ) - .await; + tracing::debug!( + tx = hex::encode(tx.tx_hash), + "passing tx to tx-pool manager" + ); + + // TODO: take into account `do_not_relay` in the tx-pool manager. + + if txpool_manager_handle + .tx_tx + .send((tx, state.clone())) + .await + .is_err() + { + tracing::warn!("The txpool manager has been stopped, dropping incoming txs"); + return Ok(()); } } // Re-relay any txs we got in the block that were already in our stem pool. - if !do_not_relay { - for stem_tx in stem_pool_txs { - rerelay_stem_tx( - &stem_tx, - state.clone(), - &mut txpool_read_handle, - &mut dandelion_pool_manager, - ) - .await; - } + for stem_tx in stem_pool_txs { + rerelay_stem_tx( + &stem_tx, + state.clone(), + &mut txpool_read_handle, + &mut dandelion_pool_manager, + ) + .await; } Ok(()) @@ -267,6 +294,7 @@ async fn prepare_incoming_txs( // If a duplicate is in here the incoming tx batch contained the same tx twice. if !tx_blob_hashes.insert(tx_blob_hash) { + tracing::debug!("peer sent duplicate tx in batch, ignoring batch."); return Some(Err(IncomingTxError::DuplicateTransaction)); } @@ -321,58 +349,6 @@ async fn prepare_incoming_txs( .await } -/// Handle a verified tx. -/// -/// This will add the tx to the txpool and route it to the network. -async fn handle_valid_tx( - tx: TransactionVerificationData, - state: TxState, - txpool_write_handle: &mut TxpoolWriteHandle, - dandelion_pool_manager: &mut DandelionPoolService< - DandelionTx, - TxId, - CrossNetworkInternalPeerId, - >, -) { - let incoming_tx = - IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash); - - let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(TxpoolWriteRequest::AddTransaction { - tx: Box::new(tx), - state_stem: state.is_stem_stage(), - }) - .await - .expect("TODO") - else { - unreachable!() - }; - - // TODO: track double spends to quickly ignore them from their blob hash. - if let Some(tx_hash) = double_spend { - return; - } - - // TODO: There is a race condition possible if a tx and block come in at the same time: . - - let incoming_tx = incoming_tx - .with_routing_state(state) - .with_state_in_db(None) - .build() - .unwrap(); - - dandelion_pool_manager - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(incoming_tx) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); -} - /// Re-relay a tx that was already in our stem pool. async fn rerelay_stem_tx( tx_hash: &TxId, diff --git a/binaries/cuprated/src/txpool/manager.rs b/binaries/cuprated/src/txpool/manager.rs new file mode 100644 index 0000000..6faf99c --- /dev/null +++ b/binaries/cuprated/src/txpool/manager.rs @@ -0,0 +1,497 @@ +use std::{ + cmp::min, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use bytes::Bytes; +use futures::StreamExt; +use indexmap::IndexMap; +use rand::Rng; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::{time::delay_queue, time::DelayQueue}; +use tower::{Service, ServiceExt}; +use tracing::{instrument, Instrument, Span}; + +use cuprate_dandelion_tower::{ + pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}, + traits::DiffuseRequest, + TxState, +}; +use cuprate_helper::time::current_unix_timestamp; +use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, + TxpoolReadHandle, TxpoolWriteHandle, +}; +use cuprate_types::TransactionVerificationData; + +use crate::{ + config::TxpoolConfig, + constants::PANIC_CRITICAL_SERVICE_ERROR, + p2p::{CrossNetworkInternalPeerId, NetworkInterfaces}, + txpool::{ + dandelion::DiffuseService, + incoming_tx::{DandelionTx, TxId}, + }, +}; + +const INCOMING_TX_QUEUE_SIZE: usize = 100; + +/// Starts the transaction pool manager service. +/// +/// # Panics +/// +/// This function may panic if any inner service has an unrecoverable error. +pub async fn start_txpool_manager( + mut txpool_write_handle: TxpoolWriteHandle, + mut txpool_read_handle: TxpoolReadHandle, + promote_tx_channel: mpsc::Receiver<[u8; 32]>, + diffuse_service: DiffuseService, + dandelion_pool_manager: DandelionPoolService, + config: TxpoolConfig, +) -> TxpoolManagerHandle { + let TxpoolReadResponse::Backlog(backlog) = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::Backlog) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + tracing::info!(txs_in_pool = backlog.len(), "starting txpool manager"); + + let mut stem_txs = Vec::new(); + + let mut tx_timeouts = DelayQueue::with_capacity(backlog.len()); + let current_txs = backlog + .into_iter() + .map(|tx| { + let timeout_key = if tx.private { + stem_txs.push(tx.id); + None + } else { + let next_timeout = calculate_next_timeout(tx.received_at, config.maximum_age_secs); + Some(tx_timeouts.insert(tx.id, Duration::from_secs(next_timeout))) + }; + + ( + tx.id, + TxInfo { + weight: tx.weight, + fee: tx.fee, + received_at: tx.received_at, + private: tx.private, + timeout_key, + }, + ) + }) + .collect(); + + let mut manager = TxpoolManager { + current_txs, + tx_timeouts, + txpool_write_handle, + txpool_read_handle, + dandelion_pool_manager, + promote_tx_channel, + diffuse_service, + config, + }; + + tracing::info!(stem_txs = stem_txs.len(), "promoting stem txs"); + + for tx in stem_txs { + manager.promote_tx(tx).await; + } + + let (tx_tx, tx_rx) = mpsc::channel(INCOMING_TX_QUEUE_SIZE); + let (spent_kis_tx, spent_kis_rx) = mpsc::channel(1); + + tokio::spawn(manager.run(tx_rx, spent_kis_rx)); + + TxpoolManagerHandle { + tx_tx, + spent_kis_tx, + } +} + +/// A handle to the tx-pool manager. +#[derive(Clone)] +pub struct TxpoolManagerHandle { + /// The incoming tx channel. + pub tx_tx: mpsc::Sender<( + TransactionVerificationData, + TxState, + )>, + + /// The spent key images in a new block tx. + spent_kis_tx: mpsc::Sender<(Vec<[u8; 32]>, oneshot::Sender<()>)>, +} + +impl TxpoolManagerHandle { + /// Create a mock [`TxpoolManagerHandle`] that does nothing. + /// + /// Useful for testing. + #[expect(clippy::let_underscore_must_use)] + pub fn mock() -> Self { + let (spent_kis_tx, mut spent_kis_rx) = mpsc::channel(1); + let (tx_tx, mut tx_rx) = mpsc::channel(100); + + tokio::spawn(async move { + loop { + let Some(rec): Option<(_, oneshot::Sender<()>)> = spent_kis_rx.recv().await else { + return; + }; + + let _ = rec.1.send(()); + } + }); + + tokio::spawn(async move { + loop { + if tx_rx.recv().await.is_none() { + return; + } + } + }); + + Self { + tx_tx, + spent_kis_tx, + } + } + + /// Tell the tx-pool about spent key images in an incoming block. + pub async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) -> anyhow::Result<()> { + let (tx, rx) = oneshot::channel(); + + drop(self.spent_kis_tx.send((spent_key_images, tx)).await); + + rx.await + .map_err(|_| anyhow::anyhow!("txpool manager stopped")) + } +} + +/// Information on a transaction in the tx-pool. +struct TxInfo { + /// The weight of the transaction. + weight: usize, + /// The fee the transaction paid. + fee: u64, + /// The UNIX timestamp when the tx was received. + received_at: u64, + /// Whether the tx is in the private pool. + private: bool, + + /// The [`delay_queue::Key`] for the timeout queue in the manager. + /// + /// This will be [`None`] if the tx is private as timeouts for them are handled in the dandelion pool. + timeout_key: Option, +} + +struct TxpoolManager { + current_txs: IndexMap<[u8; 32], TxInfo>, + + /// A [`DelayQueue`] for waiting on tx timeouts. + /// + /// Timeouts can be for re-relaying or removal from the pool. + tx_timeouts: DelayQueue<[u8; 32]>, + + txpool_write_handle: TxpoolWriteHandle, + txpool_read_handle: TxpoolReadHandle, + + dandelion_pool_manager: DandelionPoolService, + /// The channel the dandelion manager will use to communicate that a tx should be promoted to the + /// public pool. + promote_tx_channel: mpsc::Receiver<[u8; 32]>, + /// The [`DiffuseService`] to diffuse txs to the p2p network. + /// + /// Used for re-relays. + diffuse_service: DiffuseService, + + config: TxpoolConfig, +} + +impl TxpoolManager { + /// Removes a transaction from the tx-pool manager, and optionally the database too. + /// + /// # Panics + /// + /// This function will panic if the tx is not in the tx-pool manager. + #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))] + async fn remove_tx_from_pool(&mut self, tx: [u8; 32], remove_from_db: bool) { + tracing::debug!("removing tx from pool"); + + let tx_info = self.current_txs.swap_remove(&tx).unwrap(); + + tx_info + .timeout_key + .and_then(|key| self.tx_timeouts.try_remove(&key)); + + if remove_from_db { + self.txpool_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::RemoveTransaction(tx)) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + } + } + + /// Re-relay a tx to the network. + /// + /// # Panics + /// + /// This function will panic if the tx is not in the tx-pool. + #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))] + async fn rerelay_tx(&mut self, tx: [u8; 32]) { + tracing::debug!("re-relaying tx to network"); + + let TxpoolReadResponse::TxBlob { + tx_blob, + state_stem: _, + } = self + .txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxBlob(tx)) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + self.diffuse_service + .call(DiffuseRequest(DandelionTx(Bytes::from(tx_blob)))) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + } + + /// Handles a transaction timeout, be either rebroadcasting or dropping the tx from the pool. + /// If a rebroadcast happens, this function will handle adding another timeout to the queue. + #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))] + async fn handle_tx_timeout(&mut self, tx: [u8; 32]) { + let Some(tx_info) = self.current_txs.get(&tx) else { + tracing::warn!("tx timed out, but tx not in pool"); + return; + }; + + let time_in_pool = current_unix_timestamp() - tx_info.received_at; + + // Check if the tx has timed out, with a small buffer to prevent rebroadcasting if the time is + // slightly off. + if time_in_pool + 10 > self.config.maximum_age_secs { + tracing::warn!("tx has been in pool too long, removing from pool"); + self.remove_tx_from_pool(tx, true).await; + return; + } + + let received_at = tx_info.received_at; + + tracing::debug!(time_in_pool, "tx timed out, resending to network"); + + self.rerelay_tx(tx).await; + + let tx_info = self.current_txs.get_mut(&tx).unwrap(); + + let next_timeout = calculate_next_timeout(received_at, self.config.maximum_age_secs); + tracing::trace!(in_secs = next_timeout, "setting next tx timeout"); + + tx_info.timeout_key = Some( + self.tx_timeouts + .insert(tx, Duration::from_secs(next_timeout)), + ); + } + + /// Adds a tx to the tx-pool manager. + #[instrument(level = "trace", skip_all, fields(tx_id = hex::encode(tx)))] + fn track_tx(&mut self, tx: [u8; 32], weight: usize, fee: u64, private: bool) { + let now = current_unix_timestamp(); + + let timeout_key = if private { + // The dandelion pool handles stem tx embargo. + None + } else { + let timeout = calculate_next_timeout(now, self.config.maximum_age_secs); + + tracing::trace!(in_secs = timeout, "setting next tx timeout"); + + Some(self.tx_timeouts.insert(tx, Duration::from_secs(timeout))) + }; + + self.current_txs.insert( + tx, + TxInfo { + weight, + fee, + received_at: now, + private, + timeout_key, + }, + ); + } + + /// Handles an incoming tx, adding it to the pool and routing it. + #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx.tx_hash), state))] + async fn handle_incoming_tx( + &mut self, + tx: TransactionVerificationData, + state: TxState, + ) { + tracing::debug!("handling new tx"); + + let incoming_tx = + IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash); + + let (tx_hash, tx_weight, tx_fee) = (tx.tx_hash, tx.tx_weight, tx.fee); + + let TxpoolWriteResponse::AddTransaction(double_spend) = self + .txpool_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::AddTransaction { + tx: Box::new(tx), + state_stem: state.is_stem_stage(), + }) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + if let Some(tx_hash) = double_spend { + tracing::debug!( + double_spent = hex::encode(tx_hash), + "transaction is a double spend, ignoring" + ); + return; + } + + self.track_tx(tx_hash, tx_weight, tx_fee, state.is_stem_stage()); + + let incoming_tx = incoming_tx + .with_routing_state(state) + .with_state_in_db(None) + .build() + .unwrap(); + + self.dandelion_pool_manager + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(incoming_tx) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + } + + /// Promote a tx to the public pool. + #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))] + async fn promote_tx(&mut self, tx: [u8; 32]) { + let Some(tx_info) = self.current_txs.get_mut(&tx) else { + tracing::debug!("not promoting tx, tx not in pool"); + return; + }; + + if !tx_info.private { + tracing::trace!("not promoting tx, tx is already public"); + return; + } + + tracing::debug!("promoting tx"); + + // It's now in the public pool, pretend we just saw it. + tx_info.received_at = current_unix_timestamp(); + + let next_timeout = + calculate_next_timeout(tx_info.received_at, self.config.maximum_age_secs); + tracing::trace!(in_secs = next_timeout, "setting next tx timeout"); + tx_info.timeout_key = Some( + self.tx_timeouts + .insert(tx, Duration::from_secs(next_timeout)), + ); + + self.txpool_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::Promote(tx)) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + } + + /// Handles removing all transactions that have been included/double spent in an incoming block. + #[instrument(level = "debug", skip_all)] + async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) { + tracing::debug!("handling new block"); + + let TxpoolWriteResponse::NewBlock(removed_txs) = self + .txpool_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::NewBlock { spent_key_images }) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + for tx in removed_txs { + self.remove_tx_from_pool(tx, false).await; + } + } + + #[expect(clippy::let_underscore_must_use)] + async fn run( + mut self, + mut tx_rx: mpsc::Receiver<( + TransactionVerificationData, + TxState, + )>, + mut block_rx: mpsc::Receiver<(Vec<[u8; 32]>, oneshot::Sender<()>)>, + ) { + loop { + tokio::select! { + Some(tx) = self.tx_timeouts.next() => { + self.handle_tx_timeout(tx.into_inner()).await; + } + Some((tx, state)) = tx_rx.recv() => { + self.handle_incoming_tx(tx, state).await; + } + Some(tx) = self.promote_tx_channel.recv() => { + self.promote_tx(tx).await; + } + Some((spent_kis, tx)) = block_rx.recv() => { + self.new_block(spent_kis).await; + let _ = tx.send(()); + } + } + } + } +} + +/// Calculates the amount of time to wait before resending a tx to the network. +fn calculate_next_timeout(received_at: u64, max_time_in_pool: u64) -> u64 { + /// The base time between re-relays to the p2p network. + const TX_RERELAY_TIME: u64 = 300; + + /* + This is a simple exponential backoff. + The first timeout is TX_RERELAY_TIME seconds, the second is 2 * TX_RERELAY_TIME seconds, then 4, 8, 16, etc. + */ + let now = current_unix_timestamp(); + + let time_in_pool = now - received_at; + + let time_till_max_timeout = max_time_in_pool.saturating_sub(time_in_pool); + + let timeouts = time_in_pool / TX_RERELAY_TIME; + + min((timeouts + 1) * TX_RERELAY_TIME, time_till_max_timeout) +} diff --git a/p2p/dandelion-tower/Cargo.toml b/p2p/dandelion-tower/Cargo.toml index 92e4915..42637a3 100644 --- a/p2p/dandelion-tower/Cargo.toml +++ b/p2p/dandelion-tower/Cargo.toml @@ -11,7 +11,7 @@ txpool = ["dep:rand_distr", "dep:tokio-util", "dep:tokio"] [dependencies] tower = { workspace = true, features = ["util"] } -tracing = { workspace = true, features = ["std"] } +tracing = { workspace = true, features = ["std", "attributes"] } futures = { workspace = true, features = ["std"] } tokio = { workspace = true, features = ["rt", "sync", "macros"], optional = true} diff --git a/p2p/dandelion-tower/src/pool/manager.rs b/p2p/dandelion-tower/src/pool/manager.rs index a911959..394ed6b 100644 --- a/p2p/dandelion-tower/src/pool/manager.rs +++ b/p2p/dandelion-tower/src/pool/manager.rs @@ -14,6 +14,7 @@ use tokio::{ }; use tokio_util::time::{delay_queue, DelayQueue}; use tower::{Service, ServiceExt}; +use tracing::Instrument; use crate::{ pool::IncomingTx, @@ -223,43 +224,47 @@ where Ok(tx.map(|tx| tx.0)) } + #[expect(clippy::type_complexity)] /// Starts the [`DandelionPoolManager`]. pub(crate) async fn run( mut self, - mut rx: mpsc::Receiver<(IncomingTx, oneshot::Sender<()>)>, + mut rx: mpsc::Receiver<( + (IncomingTx, tracing::Span), + oneshot::Sender<()>, + )>, ) { tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config); loop { - tracing::trace!("Waiting for next event."); tokio::select! { // biased to handle current txs before routing new ones. biased; Some(fired) = self.embargo_timers.next() => { - tracing::debug!("Embargo timer fired, did not see stem tx in time."); + let span = tracing::debug_span!("embargo_timer_fired"); + tracing::debug!(parent: &span,"Embargo timer fired, did not see stem tx in time."); let tx_id = fired.into_inner(); - if let Err(e) = self.promote_and_fluff_tx(tx_id).await { - tracing::error!("Error handling fired embargo timer: {e}"); + if let Err(e) = self.promote_and_fluff_tx(tx_id).instrument(span.clone()).await { + tracing::error!(parent: &span, "Error handling fired embargo timer: {e}"); return; } } Some(Ok((tx_id, res))) = self.routing_set.join_next() => { - tracing::trace!("Received d++ routing result."); + let span = tracing::debug_span!("dandelion_routing_result"); let res = match res { Ok(State::Fluff) => { - tracing::debug!("Transaction was fluffed upgrading it to the public pool."); - self.promote_tx(tx_id).await + tracing::debug!(parent: &span, "Transaction was fluffed upgrading it to the public pool."); + self.promote_tx(tx_id).instrument(span.clone()).await } Err(tx_state) => { - tracing::debug!("Error routing transaction, trying again."); + tracing::debug!(parent: &span, "Error routing transaction, trying again."); - match self.get_tx_from_pool(tx_id.clone()).await { + match self.get_tx_from_pool(tx_id.clone()).instrument(span.clone()).await { Ok(Some(tx)) => match tx_state { - TxState::Fluff => self.fluff_tx(tx, tx_id).await, - TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).await, - TxState::Local => self.stem_tx(tx, tx_id, None).await, + TxState::Fluff => self.fluff_tx(tx, tx_id).instrument(span.clone()).await, + TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).instrument(span.clone()).await, + TxState::Local => self.stem_tx(tx, tx_id, None).instrument(span.clone()).await, } Err(e) => Err(e), _ => continue, @@ -269,22 +274,24 @@ where }; if let Err(e) = res { - tracing::error!("Error handling transaction routing return: {e}"); + tracing::error!(parent: &span, "Error handling transaction routing return: {e}"); return; } } req = rx.recv() => { - tracing::debug!("Received new tx to route."); - - let Some((IncomingTx { tx, tx_id, routing_state }, res_tx)) = req else { + let Some(((IncomingTx { tx, tx_id, routing_state }, span), res_tx)) = req else { return; }; - if let Err(e) = self.handle_incoming_tx(tx, routing_state, tx_id).await { + let span = tracing::debug_span!(parent: &span, "dandelion_pool_manager"); + + tracing::debug!(parent: &span, "Received new tx to route."); + + if let Err(e) = self.handle_incoming_tx(tx, routing_state, tx_id).instrument(span.clone()).await { #[expect(clippy::let_underscore_must_use, reason = "dropped receivers can be ignored")] let _ = res_tx.send(()); - tracing::error!("Error handling transaction in dandelion pool: {e}"); + tracing::error!(parent: &span, "Error handling transaction in dandelion pool: {e}"); return; } diff --git a/p2p/dandelion-tower/src/pool/mod.rs b/p2p/dandelion-tower/src/pool/mod.rs index 90eb555..381d012 100644 --- a/p2p/dandelion-tower/src/pool/mod.rs +++ b/p2p/dandelion-tower/src/pool/mod.rs @@ -34,7 +34,6 @@ use tokio::{ }; use tokio_util::{sync::PollSender, time::DelayQueue}; use tower::Service; -use tracing::Instrument; use crate::{ pool::manager::DandelionPoolShutDown, @@ -93,9 +92,7 @@ where _tx: PhantomData, }; - let span = tracing::debug_span!("dandelion_pool"); - - tokio::spawn(pool.run(rx).instrument(span)); + tokio::spawn(pool.run(rx)); DandelionPoolService { tx: PollSender::new(tx), @@ -107,8 +104,12 @@ where /// Used to send [`IncomingTx`]s to the [`DandelionPoolManager`] #[derive(Clone)] pub struct DandelionPoolService { + #[expect(clippy::type_complexity)] /// The channel to [`DandelionPoolManager`]. - tx: PollSender<(IncomingTx, oneshot::Sender<()>)>, + tx: PollSender<( + (IncomingTx, tracing::Span), + oneshot::Sender<()>, + )>, } impl Service> @@ -132,7 +133,7 @@ where let res = self .tx - .send_item((req, tx)) + .send_item(((req, tracing::Span::current()), tx)) .map_err(|_| DandelionPoolShutDown); async move { diff --git a/storage/txpool/Cargo.toml b/storage/txpool/Cargo.toml index 63d5557..602cc35 100644 --- a/storage/txpool/Cargo.toml +++ b/storage/txpool/Cargo.toml @@ -21,7 +21,7 @@ serde = ["dep:serde", "cuprate-database/serde", "cuprate-database-service/ cuprate-database = { workspace = true, features = ["heed"] } cuprate-database-service = { workspace = true } cuprate-types = { workspace = true, features = ["rpc"] } -cuprate-helper = { workspace = true, default-features = false, features = ["constants"] } +cuprate-helper = { workspace = true, default-features = false, features = ["constants", "time"] } monero-serai = { workspace = true, features = ["std"] } bytemuck = { workspace = true, features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] } @@ -29,6 +29,7 @@ bitflags = { workspace = true, features = ["std", "serde", "byte thiserror = { workspace = true } hex = { workspace = true, features = ["std"] } blake3 = { workspace = true, features = ["std"] } +tracing = { workspace = true } tower = { workspace = true } rayon = { workspace = true } diff --git a/storage/txpool/README.md b/storage/txpool/README.md index ca4f737..3bb2fad 100644 --- a/storage/txpool/README.md +++ b/storage/txpool/README.md @@ -82,7 +82,7 @@ use cuprate_txpool::{ .build(); // Initialize the database environment. - let env = cuprate_txpool::open(config)?; + let env = cuprate_txpool::open(&config)?; // Open up a transaction + tables for writing. let env_inner = env.env_inner(); diff --git a/storage/txpool/src/free.rs b/storage/txpool/src/free.rs index d0f9a31..affeb80 100644 --- a/storage/txpool/src/free.rs +++ b/storage/txpool/src/free.rs @@ -1,9 +1,23 @@ //! General free functions (related to the tx-pool database). -//---------------------------------------------------------------------------------------------------- Import -use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw}; +use std::borrow::Cow; -use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash}; +use cuprate_database::{ + ConcreteEnv, DatabaseRo, Env, EnvInner, InitError, RuntimeError, StorableStr, TxRw, +}; +use cuprate_database::{DatabaseRw, TxRo}; + +use crate::{ + config::Config, + tables::{Metadata, OpenTables}, + types::TransactionBlobHash, +}; + +/// The current version of the database format. +pub const DATABASE_VERSION: StorableStr = StorableStr(Cow::Borrowed("0.1")); + +/// The key used to store the database version in the [`Metadata`] table. +pub const VERSION_KEY: StorableStr = StorableStr(Cow::Borrowed("version")); //---------------------------------------------------------------------------------------------------- Free functions /// Open the txpool database using the passed [`Config`]. @@ -22,9 +36,9 @@ use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash}; /// - A table could not be created/opened #[cold] #[inline(never)] // only called once -pub fn open(config: Config) -> Result { +pub fn open(config: &Config) -> Result { // Attempt to open the database environment. - let env = ::open(config.db_config)?; + let env = ::open(config.db_config.clone())?; /// Convert runtime errors to init errors. /// @@ -36,20 +50,28 @@ pub fn open(config: Config) -> Result { fn runtime_to_init_error(runtime: RuntimeError) -> InitError { match runtime { RuntimeError::Io(io_error) => io_error.into(), + RuntimeError::KeyNotFound => InitError::InvalidVersion, // These errors shouldn't be happening here. - RuntimeError::KeyExists - | RuntimeError::KeyNotFound - | RuntimeError::ResizeNeeded - | RuntimeError::TableNotFound => unreachable!(), + RuntimeError::KeyExists | RuntimeError::ResizeNeeded | RuntimeError::TableNotFound => { + unreachable!() + } } } + let fresh_db; + // INVARIANT: We must ensure that all tables are created, // `cuprate_database` has no way of knowing _which_ tables // we want since it is agnostic, so we are responsible for this. { let env_inner = env.env_inner(); + + // Store if this DB has been used before by checking if the metadata table exists. + let tx_ro = env_inner.tx_ro().map_err(runtime_to_init_error)?; + fresh_db = env_inner.open_db_ro::(&tx_ro).is_err(); + TxRo::commit(tx_ro).map_err(runtime_to_init_error)?; + let tx_rw = env_inner.tx_rw().map_err(runtime_to_init_error)?; // Create all tables. @@ -58,6 +80,43 @@ pub fn open(config: Config) -> Result { TxRw::commit(tx_rw).map_err(runtime_to_init_error)?; } + { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw().map_err(runtime_to_init_error)?; + + let mut metadata = env_inner + .open_db_rw::(&tx_rw) + .map_err(runtime_to_init_error)?; + + if fresh_db { + // If the database is new, add the version. + metadata + .put(&VERSION_KEY, &DATABASE_VERSION) + .map_err(runtime_to_init_error)?; + } + + let print_version_err = || { + tracing::error!( + "The database follows an old format, please delete the database at: {}", + config.db_config.db_directory().display() + ); + }; + + let version = metadata + .get(&VERSION_KEY) + .inspect_err(|_| print_version_err()) + .map_err(runtime_to_init_error)?; + + if version != DATABASE_VERSION { + // TODO: database migration when stable? This is the tx-pool so is not critical. + print_version_err(); + return Err(InitError::InvalidVersion); + } + + drop(metadata); + TxRw::commit(tx_rw).map_err(runtime_to_init_error)?; + } + Ok(env) } diff --git a/storage/txpool/src/lib.rs b/storage/txpool/src/lib.rs index 53e53ec..a176461 100644 --- a/storage/txpool/src/lib.rs +++ b/storage/txpool/src/lib.rs @@ -16,7 +16,7 @@ mod tx; pub mod types; pub use config::Config; -pub use free::{open, transaction_blob_hash}; +pub use free::{open, transaction_blob_hash, DATABASE_VERSION, VERSION_KEY}; pub use tx::TxEntry; //re-exports diff --git a/storage/txpool/src/ops.rs b/storage/txpool/src/ops.rs index badc4f6..3424d61 100644 --- a/storage/txpool/src/ops.rs +++ b/storage/txpool/src/ops.rs @@ -55,7 +55,7 @@ //! .build(); //! //! // Initialize the database environment. -//! let env = cuprate_txpool::open(config)?; +//! let env = cuprate_txpool::open(&config)?; //! //! // Open up a transaction + tables for writing. //! let env_inner = env.env_inner(); diff --git a/storage/txpool/src/ops/tx_write.rs b/storage/txpool/src/ops/tx_write.rs index f1f43b2..b876724 100644 --- a/storage/txpool/src/ops/tx_write.rs +++ b/storage/txpool/src/ops/tx_write.rs @@ -5,6 +5,7 @@ use bytemuck::TransparentWrapper; use monero_serai::transaction::{NotPruned, Transaction}; use cuprate_database::{DatabaseRw, DbResult, StorableVec}; +use cuprate_helper::time::current_unix_timestamp; use cuprate_types::TransactionVerificationData; use crate::{ @@ -42,6 +43,7 @@ pub fn add_transaction( &TransactionInfo { fee: tx.fee, weight: tx.tx_weight, + received_at: current_unix_timestamp(), flags, _padding: [0; 7], }, diff --git a/storage/txpool/src/service.rs b/storage/txpool/src/service.rs index 2a13f1c..35311df 100644 --- a/storage/txpool/src/service.rs +++ b/storage/txpool/src/service.rs @@ -87,7 +87,7 @@ //! .build(); //! //! // Initialize the database thread-pool. -//! let (mut read_handle, mut write_handle, _) = cuprate_txpool::service::init(config)?; +//! let (mut read_handle, mut write_handle, _) = cuprate_txpool::service::init(&config)?; //! //! // Prepare a request to write block. //! let tx = TX_V1_SIG2.clone(); diff --git a/storage/txpool/src/service/free.rs b/storage/txpool/src/service/free.rs index 1bb15cd..62eaa83 100644 --- a/storage/txpool/src/service/free.rs +++ b/storage/txpool/src/service/free.rs @@ -24,7 +24,7 @@ use crate::{ /// # Errors /// This will forward the error if [`crate::open`] failed. pub fn init( - config: Config, + config: &Config, ) -> Result<(TxpoolReadHandle, TxpoolWriteHandle, Arc), InitError> { let reader_threads = config.reader_threads; @@ -51,7 +51,7 @@ pub fn init( /// # Errors /// This will forward the error if [`crate::open`] failed. pub fn init_with_pool( - config: Config, + config: &Config, pool: Arc, ) -> Result<(TxpoolReadHandle, TxpoolWriteHandle, Arc), InitError> { // Initialize the database itself. diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 95f5da7..83d608c 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -191,9 +191,13 @@ pub enum TxpoolWriteResponse { /// Response to: /// - [`TxpoolWriteRequest::RemoveTransaction`] /// - [`TxpoolWriteRequest::Promote`] - /// - [`TxpoolWriteRequest::NewBlock`] Ok, + /// Response to [`TxpoolWriteRequest::NewBlock`]. + /// + /// The inner values are the transactions removed from the pool. + NewBlock(Vec), + /// Response to [`TxpoolWriteRequest::AddTransaction`]. /// /// If the inner value is [`Some`] the tx was not added to the pool as it double spends a tx with the given hash. diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 5b6bf59..c0b699f 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -13,7 +13,9 @@ use std::{ use rayon::ThreadPool; -use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError}; +use cuprate_database::{ + ConcreteEnv, DatabaseIter, DatabaseRo, DbResult, Env, EnvInner, RuntimeError, +}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use crate::{ @@ -24,6 +26,7 @@ use crate::{ }, tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos}, types::{TransactionBlobHash, TransactionHash}, + TxEntry, }; // TODO: update the docs here @@ -229,7 +232,27 @@ fn txs_for_block(env: &ConcreteEnv, txs: Vec) -> ReadResponseRe /// [`TxpoolReadRequest::Backlog`]. #[inline] fn backlog(env: &ConcreteEnv) -> ReadResponseResult { - Ok(TxpoolReadResponse::Backlog(todo!())) + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tx_infos_table = inner_env.open_db_ro::(&tx_ro)?; + + let backlog = tx_infos_table + .iter()? + .map(|info| { + let (id, info) = info?; + + Ok(TxEntry { + id, + weight: info.weight, + fee: info.fee, + private: info.flags.private(), + received_at: info.received_at, + }) + }) + .collect::>()?; + + Ok(TxpoolReadResponse::Backlog(backlog)) } /// [`TxpoolReadRequest::Size`]. diff --git a/storage/txpool/src/service/write.rs b/storage/txpool/src/service/write.rs index 3df3535..737bdf4 100644 --- a/storage/txpool/src/service/write.rs +++ b/storage/txpool/src/service/write.rs @@ -4,6 +4,7 @@ use cuprate_database::{ ConcreteEnv, DatabaseRo, DatabaseRw, DbResult, Env, EnvInner, RuntimeError, TxRw, }; use cuprate_database_service::DatabaseWriteHandle; +use cuprate_helper::time::current_unix_timestamp; use cuprate_types::TransactionVerificationData; use crate::{ @@ -115,6 +116,7 @@ fn promote(env: &ConcreteEnv, tx_hash: &TransactionHash) -> DbResult(&tx_rw)?; tx_infos.update(tx_hash, |mut info| { + info.received_at = current_unix_timestamp(); info.flags.remove(TxStateFlags::STATE_STEM); Some(info) }) @@ -137,8 +139,10 @@ fn new_block(env: &ConcreteEnv, spent_key_images: &[KeyImage]) -> DbResult DbResult (), Err(e) => return Err(e), } @@ -162,5 +168,5 @@ fn new_block(env: &ConcreteEnv, spent_key_images: &[KeyImage]) -> DbResult KnownBlobHashes, TransactionBlobHash => TransactionHash, + + /// Current database version. + 5 => Metadata, + StorableStr => StorableStr, } diff --git a/storage/txpool/src/tx.rs b/storage/txpool/src/tx.rs index 29afae8..42f0fa5 100644 --- a/storage/txpool/src/tx.rs +++ b/storage/txpool/src/tx.rs @@ -8,9 +8,11 @@ pub struct TxEntry { /// The transaction's ID (hash). pub id: [u8; 32], /// The transaction's weight. - pub weight: u64, + pub weight: usize, /// The transaction's fee. pub fee: u64, - /// How long the transaction has been in the pool. - pub time_in_pool: std::time::Duration, + /// If the tx is in the private pool. + pub private: bool, + /// The UNIX timestamp when the transaction was received. + pub received_at: u64, } diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index b6478b4..56e67c0 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -31,6 +31,12 @@ bitflags::bitflags! { } } +impl TxStateFlags { + pub const fn private(&self) -> bool { + self.contains(Self::STATE_STEM) + } +} + /// Information on a tx-pool transaction. #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)] #[repr(C)] @@ -39,6 +45,8 @@ pub struct TransactionInfo { pub fee: u64, /// The transaction's weight. pub weight: usize, + /// The UNIX timestamp of when this tx was received. + pub received_at: u64, /// [`TxStateFlags`] of this transaction. pub flags: TxStateFlags, #[expect(clippy::pub_underscore_fields)]