cuprated: add txpool manager (#483)

* add txpool manager

* add version check for tx-pool

* add docs

* fix merge

* fix test

* fix merge

* fix cargo hack

* clean up imports

* fix typo

* add small buffer to rebroadcasts

* review fixes

* small clean up

* fmt

* fix merge

* review fixes
This commit is contained in:
Boog900
2025-08-15 15:41:23 +01:00
committed by GitHub
parent 32b3572450
commit 807bfafb35
31 changed files with 804 additions and 182 deletions

1
Cargo.lock generated
View File

@@ -1543,6 +1543,7 @@ dependencies = [
"thiserror 2.0.12",
"tokio",
"tower",
"tracing",
]
[[package]]

View File

@@ -28,6 +28,7 @@ use crate::{
types::ConsensusBlockchainReadHandle,
},
constants::PANIC_CRITICAL_SERVICE_ERROR,
txpool::TxpoolManagerHandle,
};
mod commands;
@@ -46,7 +47,7 @@ pub async fn init_blockchain_manager(
clearnet_interface: NetworkInterface<ClearNet>,
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
txpool_write_handle: TxpoolWriteHandle,
txpool_manager_handle: TxpoolManagerHandle,
mut blockchain_context_service: BlockchainContextService,
block_downloader_config: BlockDownloaderConfig,
) {
@@ -72,7 +73,7 @@ pub async fn init_blockchain_manager(
blockchain_read_handle,
BoxError::from,
),
txpool_write_handle,
txpool_manager_handle,
blockchain_context_service,
stop_current_block_downloader,
broadcast_svc: clearnet_interface.broadcast_svc(),
@@ -93,8 +94,8 @@ pub struct BlockchainManager {
blockchain_write_handle: BlockchainWriteHandle,
/// A [`BlockchainReadHandle`].
blockchain_read_handle: ConsensusBlockchainReadHandle,
/// A [`TxpoolWriteHandle`].
txpool_write_handle: TxpoolWriteHandle,
txpool_manager_handle: TxpoolManagerHandle,
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
/// values without needing to go to a [`BlockchainReadHandle`].
blockchain_context_service: BlockchainContextService,

View File

@@ -619,11 +619,8 @@ impl super::BlockchainManager {
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
self.txpool_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::NewBlock { spent_key_images })
self.txpool_manager_handle
.new_block(spent_key_images)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}

View File

@@ -14,9 +14,12 @@ use cuprate_p2p::{block_downloader::BlockBatch, BroadcastSvc};
use cuprate_p2p_core::handles::HandleBuilder;
use cuprate_types::{CachedVerificationState, TransactionVerificationData, TxVersion};
use crate::blockchain::{
check_add_genesis, manager::BlockchainManager, manager::BlockchainManagerCommand,
ConsensusBlockchainReadHandle,
use crate::{
blockchain::{
check_add_genesis, manager::BlockchainManager, manager::BlockchainManagerCommand,
ConsensusBlockchainReadHandle,
},
txpool::TxpoolManagerHandle,
};
async fn mock_manager(data_dir: PathBuf) -> BlockchainManager {
@@ -30,7 +33,7 @@ async fn mock_manager(data_dir: PathBuf) -> BlockchainManager {
let (mut blockchain_read_handle, mut blockchain_write_handle, _) =
cuprate_blockchain::service::init(blockchain_config).unwrap();
let (txpool_read_handle, txpool_write_handle, _) =
cuprate_txpool::service::init(txpool_config).unwrap();
cuprate_txpool::service::init(&txpool_config).unwrap();
check_add_genesis(
&mut blockchain_read_handle,
@@ -56,7 +59,7 @@ async fn mock_manager(data_dir: PathBuf) -> BlockchainManager {
BlockchainManager {
blockchain_write_handle,
blockchain_read_handle,
txpool_write_handle,
txpool_manager_handle: TxpoolManagerHandle::mock(),
blockchain_context_service,
stop_current_block_downloader: Arc::new(Default::default()),
broadcast_svc: BroadcastSvc::mock(),

View File

@@ -45,7 +45,7 @@ use fs::FileSystemConfig;
use p2p::P2PConfig;
use rayon::RayonConfig;
pub use rpc::RpcConfig;
use storage::StorageConfig;
pub use storage::{StorageConfig, TxpoolConfig};
use tokio::TokioConfig;
use tor::TorConfig;
use tracing_config::TracingConfig;

View File

@@ -62,7 +62,7 @@ config_struct! {
pub struct BlockchainConfig { }
/// The tx-pool config.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
pub struct TxpoolConfig {
/// The maximum size of the tx-pool.
@@ -71,6 +71,14 @@ config_struct! {
/// Valid values | >= 0
/// Examples | 100_000_000, 50_000_000
pub max_txpool_byte_size: usize,
/// The maximum age of transactions in the pool in seconds.
/// Transactions will be dropped after this time is reached.
///
/// Type | Number
/// Valid values | >= 0
/// Examples | 100_000_000, 50_000_000
pub maximum_age_secs: u64,
}
}
@@ -79,6 +87,7 @@ impl Default for TxpoolConfig {
Self {
sync_mode: SyncMode::default(),
max_txpool_byte_size: 100_000_000,
maximum_age_secs: 60 * 60 * 24,
}
}
}

View File

@@ -92,7 +92,7 @@ fn main() {
.expect(DATABASE_CORRUPT_MSG);
let (txpool_read_handle, txpool_write_handle, _) =
cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool)
cuprate_txpool::service::init_with_pool(&config.txpool_config(), db_thread_pool)
.inspect_err(|e| error!("Txpool database error: {e}"))
.expect(DATABASE_CORRUPT_MSG);
@@ -137,13 +137,15 @@ fn main() {
// Create the incoming tx handler service.
let tx_handler = IncomingTxHandler::init(
config.storage.txpool.clone(),
network_interfaces.clearnet_network_interface.clone(),
network_interfaces.tor_network_interface,
txpool_write_handle.clone(),
txpool_read_handle.clone(),
context_svc.clone(),
blockchain_read_handle.clone(),
);
)
.await;
// Send tx handler sender to all network zones
for zone in tx_handler_subscribers {
@@ -157,7 +159,7 @@ fn main() {
network_interfaces.clearnet_network_interface,
blockchain_write_handle,
blockchain_read_handle.clone(),
txpool_write_handle.clone(),
tx_handler.txpool_manager.clone(),
context_svc.clone(),
config.block_downloader_config(),
)

View File

@@ -14,6 +14,7 @@ use monero_serai::{block::Block, transaction::Transaction};
use tokio::sync::{broadcast, oneshot, watch};
use tokio_stream::wrappers::WatchStream;
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::{
@@ -22,10 +23,9 @@ use cuprate_consensus::{
};
use cuprate_dandelion_tower::TxState;
use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_helper::cast::u64_to_usize;
use cuprate_helper::{
asynch::rayon_spawn_async,
cast::usize_to_u64,
cast::{u64_to_usize, usize_to_u64},
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
};
use cuprate_p2p::constants::{
@@ -363,6 +363,7 @@ async fn new_fluffy_block<A: NetZoneAddress>(
}
/// [`ProtocolRequest::NewTransactions`]
#[instrument(level = "debug", skip_all, fields(txs = request.txs.len(), stem = !request.dandelionpp_fluff))]
async fn new_transactions<A>(
peer_information: PeerInformation<A>,
request: NewTransactions,
@@ -373,16 +374,22 @@ where
A: NetZoneAddress,
InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
{
tracing::debug!("handling new transactions");
let context = blockchain_context_service.blockchain_context();
// If we are more than 2 blocks behind the peer then ignore the txs - we are probably still syncing.
if usize_to_u64(context.chain_height + 2)
< peer_information
.core_sync_data
.lock()
.unwrap()
.current_height
{
let peer_height = peer_information
.core_sync_data
.lock()
.unwrap()
.current_height;
if usize_to_u64(context.chain_height + 2) < peer_height {
tracing::debug!(
our_height = context.chain_height,
peer_height,
"we are too far behind peer, ignoring txs."
);
return Ok(ProtocolResponse::NA);
}

View File

@@ -23,6 +23,7 @@ use cuprate_helper::{
cast::{u32_to_usize, u64_to_usize, usize_to_u64},
fmt::HexPrefix,
map::split_u128_into_low_high_bits,
time::current_unix_timestamp,
};
use cuprate_hex::{Hex, HexVec};
use cuprate_p2p_core::{client::handshaker::builder::DummyAddressBook, ClearNet, Network};
@@ -923,13 +924,15 @@ async fn get_transaction_pool_backlog(
mut state: CupratedRpcHandler,
_: GetTransactionPoolBacklogRequest,
) -> Result<GetTransactionPoolBacklogResponse, Error> {
let now = current_unix_timestamp();
let backlog = txpool::backlog(&mut state.txpool_read)
.await?
.into_iter()
.map(|entry| TxBacklogEntry {
weight: entry.weight,
weight: usize_to_u64(entry.weight),
fee: entry.fee,
time_in_pool: entry.time_in_pool.as_secs(),
time_in_pool: now - entry.received_at,
})
.collect();
@@ -968,7 +971,7 @@ async fn get_miner_data(
.into_iter()
.map(|entry| GetMinerDataTxBacklogEntry {
id: Hex(entry.id),
weight: entry.weight,
weight: usize_to_u64(entry.weight),
fee: entry.fee,
})
.collect();

View File

@@ -8,8 +8,10 @@ use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
mod dandelion;
mod incoming_tx;
mod manager;
mod relay_rules;
mod txs_being_handled;
pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs};
pub use manager::TxpoolManagerHandle;
pub use relay_rules::RelayRuleError;

View File

@@ -6,6 +6,9 @@ use std::{
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use tower::{Service, ServiceExt};
use tokio::sync::mpsc;
use tokio_util::sync::PollSender;
use cuprate_dandelion_tower::{
pool::DandelionPoolService, traits::StemRequest, DandelionConfig, DandelionRouteReq,
DandelionRouter, DandelionRouterError, Graph, State, TxState,
@@ -25,6 +28,7 @@ mod stem_service;
mod tx_store;
pub use anon_net_service::AnonTxService;
pub use diffuse_service::DiffuseService;
/// The configuration used for [`cuprate_dandelion_tower`].
///
@@ -39,7 +43,7 @@ const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
/// A [`DandelionRouter`] with all generic types defined.
pub(super) type ConcreteDandelionRouter<Z> = DandelionRouter<
stem_service::OutboundPeerStream<Z>,
diffuse_service::DiffuseService<Z>,
DiffuseService<Z>,
CrossNetworkInternalPeerId,
stem_service::StemPeerService<Z>,
DandelionTx,
@@ -106,7 +110,7 @@ impl Service<DandelionRouteReq<DandelionTx, CrossNetworkInternalPeerId>> for Mai
pub fn start_dandelion_pool_manager(
router: MainDandelionRouter,
txpool_read_handle: TxpoolReadHandle,
txpool_write_handle: TxpoolWriteHandle,
promote_tx: mpsc::Sender<[u8; 32]>,
) -> DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId> {
cuprate_dandelion_tower::pool::start_dandelion_pool_manager(
// TODO: make this constant configurable?
@@ -114,7 +118,7 @@ pub fn start_dandelion_pool_manager(
router,
tx_store::TxStoreService {
txpool_read_handle,
txpool_write_handle,
promote_tx: PollSender::new(promote_tx),
},
DANDELION_CONFIG,
)
@@ -128,7 +132,7 @@ where
InternalPeerID<Z::Addr>: Into<CrossNetworkInternalPeerId>,
{
DandelionRouter::new(
diffuse_service::DiffuseService {
DiffuseService {
clear_net_broadcast_service: network_interface.broadcast_svc(),
},
stem_service::OutboundPeerStream::<Z>::new(network_interface),

View File

@@ -1,7 +1,12 @@
use std::task::{Context, Poll};
use std::{
future::ready,
task::{Context, Poll},
};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt};
use tokio::sync::mpsc;
use tokio_util::sync::PollSender;
use tower::{Service, ServiceExt};
use cuprate_dandelion_tower::{
@@ -21,7 +26,7 @@ use super::{DandelionTx, TxId};
/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides.
pub struct TxStoreService {
pub txpool_read_handle: TxpoolReadHandle,
pub txpool_write_handle: TxpoolWriteHandle,
pub promote_tx: PollSender<[u8; 32]>,
}
impl Service<TxStoreRequest<TxId>> for TxStoreService {
@@ -29,8 +34,8 @@ impl Service<TxStoreRequest<TxId>> for TxStoreService {
type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.promote_tx.poll_reserve(cx).map_err(Into::into)
}
fn call(&mut self, req: TxStoreRequest<TxId>) -> Self::Future {
@@ -60,15 +65,13 @@ impl Service<TxStoreRequest<TxId>> for TxStoreService {
Ok(_) => unreachable!(),
})
.boxed(),
TxStoreRequest::Promote(tx_id) => self
.txpool_write_handle
.clone()
.oneshot(TxpoolWriteRequest::Promote(tx_id))
.map(|res| match res {
Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok),
Err(e) => Err(e.into()),
})
.boxed(),
TxStoreRequest::Promote(tx_id) => ready(
self.promote_tx
.send_item(tx_id)
.map_err(Into::into)
.map(|()| TxStoreResponse::Ok),
)
.boxed(),
}
}
}

View File

@@ -7,13 +7,15 @@ use std::{
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt};
use monero_serai::transaction::Transaction;
use tokio::sync::mpsc;
use tower::{BoxError, Service, ServiceExt};
use tracing::instrument;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions};
use cuprate_consensus::{
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
BlockchainContextService, ExtendedConsensusError,
transactions::{new_tx_verification_data, start_tx_verification, PrepTransactions},
BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
ExtendedConsensusError,
};
use cuprate_dandelion_tower::{
pool::{DandelionPoolService, IncomingTxBuilder},
@@ -35,11 +37,15 @@ use cuprate_types::TransactionVerificationData;
use crate::{
blockchain::ConsensusBlockchainReadHandle,
config::TxpoolConfig,
constants::PANIC_CRITICAL_SERVICE_ERROR,
p2p::CrossNetworkInternalPeerId,
signals::REORG_LOCK,
txpool::{
dandelion::{self, AnonTxService, ConcreteDandelionRouter, MainDandelionRouter},
dandelion::{
self, AnonTxService, ConcreteDandelionRouter, DiffuseService, MainDandelionRouter,
},
manager::{start_txpool_manager, TxpoolManagerHandle},
relay_rules::{check_tx_relay_rules, RelayRuleError},
txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
},
@@ -92,8 +98,7 @@ pub struct IncomingTxHandler {
/// The dandelion txpool manager.
pub(super) dandelion_pool_manager:
DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
/// The txpool write handle.
pub(super) txpool_write_handle: TxpoolWriteHandle,
pub txpool_manager: TxpoolManagerHandle,
/// The txpool read handle.
pub(super) txpool_read_handle: TxpoolReadHandle,
/// The blockchain read handle.
@@ -103,7 +108,9 @@ pub struct IncomingTxHandler {
impl IncomingTxHandler {
/// Initialize the [`IncomingTxHandler`].
#[expect(clippy::significant_drop_tightening)]
pub fn init(
#[instrument(level = "info", skip_all, name = "start_txpool")]
pub async fn init(
txpool_config: TxpoolConfig,
clear_net: NetworkInterface<ClearNet>,
tor_net: Option<NetworkInterface<Tor>>,
txpool_write_handle: TxpoolWriteHandle,
@@ -111,22 +118,37 @@ impl IncomingTxHandler {
blockchain_context_cache: BlockchainContextService,
blockchain_read_handle: BlockchainReadHandle,
) -> Self {
let diffuse_service = DiffuseService {
clear_net_broadcast_service: clear_net.broadcast_svc(),
};
let clearnet_router = dandelion::dandelion_router(clear_net);
let tor_router = tor_net.map(AnonTxService::new);
let dandelion_router = MainDandelionRouter::new(clearnet_router, tor_router);
let (promote_tx, promote_rx) = mpsc::channel(25);
let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
dandelion_router,
txpool_read_handle.clone(),
txpool_write_handle.clone(),
promote_tx,
);
let txpool_manager = start_txpool_manager(
txpool_write_handle,
txpool_read_handle.clone(),
promote_rx,
diffuse_service,
dandelion_pool_manager.clone(),
txpool_config,
)
.await;
Self {
txs_being_handled: TxsBeingHandled::new(),
blockchain_context_cache,
dandelion_pool_manager,
txpool_write_handle,
txpool_manager,
txpool_read_handle,
blockchain_read_handle: ConsensusBlockchainReadHandle::new(
blockchain_read_handle,
@@ -151,8 +173,8 @@ impl Service<IncomingTxs> for IncomingTxHandler {
self.txs_being_handled.clone(),
self.blockchain_context_cache.clone(),
self.blockchain_read_handle.clone(),
self.txpool_write_handle.clone(),
self.txpool_read_handle.clone(),
self.txpool_manager.clone(),
self.dandelion_pool_manager.clone(),
)
.boxed()
@@ -170,8 +192,8 @@ async fn handle_incoming_txs(
txs_being_handled: TxsBeingHandled,
mut blockchain_context_cache: BlockchainContextService,
blockchain_read_handle: ConsensusBlockchainReadHandle,
mut txpool_write_handle: TxpoolWriteHandle,
mut txpool_read_handle: TxpoolReadHandle,
mut txpool_manager_handle: TxpoolManagerHandle,
mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
) -> Result<(), IncomingTxError> {
let _reorg_guard = REORG_LOCK.read().await;
@@ -209,28 +231,33 @@ async fn handle_incoming_txs(
return Err(IncomingTxError::RelayRule(e));
}
if !do_not_relay {
handle_valid_tx(
tx,
state.clone(),
&mut txpool_write_handle,
&mut dandelion_pool_manager,
)
.await;
tracing::debug!(
tx = hex::encode(tx.tx_hash),
"passing tx to tx-pool manager"
);
// TODO: take into account `do_not_relay` in the tx-pool manager.
if txpool_manager_handle
.tx_tx
.send((tx, state.clone()))
.await
.is_err()
{
tracing::warn!("The txpool manager has been stopped, dropping incoming txs");
return Ok(());
}
}
// Re-relay any txs we got in the block that were already in our stem pool.
if !do_not_relay {
for stem_tx in stem_pool_txs {
rerelay_stem_tx(
&stem_tx,
state.clone(),
&mut txpool_read_handle,
&mut dandelion_pool_manager,
)
.await;
}
for stem_tx in stem_pool_txs {
rerelay_stem_tx(
&stem_tx,
state.clone(),
&mut txpool_read_handle,
&mut dandelion_pool_manager,
)
.await;
}
Ok(())
@@ -267,6 +294,7 @@ async fn prepare_incoming_txs(
// If a duplicate is in here the incoming tx batch contained the same tx twice.
if !tx_blob_hashes.insert(tx_blob_hash) {
tracing::debug!("peer sent duplicate tx in batch, ignoring batch.");
return Some(Err(IncomingTxError::DuplicateTransaction));
}
@@ -321,58 +349,6 @@ async fn prepare_incoming_txs(
.await
}
/// Handle a verified tx.
///
/// This will add the tx to the txpool and route it to the network.
async fn handle_valid_tx(
tx: TransactionVerificationData,
state: TxState<CrossNetworkInternalPeerId>,
txpool_write_handle: &mut TxpoolWriteHandle,
dandelion_pool_manager: &mut DandelionPoolService<
DandelionTx,
TxId,
CrossNetworkInternalPeerId,
>,
) {
let incoming_tx =
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::AddTransaction {
tx: Box::new(tx),
state_stem: state.is_stem_stage(),
})
.await
.expect("TODO")
else {
unreachable!()
};
// TODO: track double spends to quickly ignore them from their blob hash.
if let Some(tx_hash) = double_spend {
return;
}
// TODO: There is a race condition possible if a tx and block come in at the same time: <https://github.com/Cuprate/cuprate/issues/314>.
let incoming_tx = incoming_tx
.with_routing_state(state)
.with_state_in_db(None)
.build()
.unwrap();
dandelion_pool_manager
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(incoming_tx)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
/// Re-relay a tx that was already in our stem pool.
async fn rerelay_stem_tx(
tx_hash: &TxId,

View File

@@ -0,0 +1,497 @@
use std::{
cmp::min,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use bytes::Bytes;
use futures::StreamExt;
use indexmap::IndexMap;
use rand::Rng;
use tokio::sync::{mpsc, oneshot};
use tokio_util::{time::delay_queue, time::DelayQueue};
use tower::{Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use cuprate_dandelion_tower::{
pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder},
traits::DiffuseRequest,
TxState,
};
use cuprate_helper::time::current_unix_timestamp;
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
TxpoolReadHandle, TxpoolWriteHandle,
};
use cuprate_types::TransactionVerificationData;
use crate::{
config::TxpoolConfig,
constants::PANIC_CRITICAL_SERVICE_ERROR,
p2p::{CrossNetworkInternalPeerId, NetworkInterfaces},
txpool::{
dandelion::DiffuseService,
incoming_tx::{DandelionTx, TxId},
},
};
const INCOMING_TX_QUEUE_SIZE: usize = 100;
/// Starts the transaction pool manager service.
///
/// # Panics
///
/// This function may panic if any inner service has an unrecoverable error.
pub async fn start_txpool_manager(
mut txpool_write_handle: TxpoolWriteHandle,
mut txpool_read_handle: TxpoolReadHandle,
promote_tx_channel: mpsc::Receiver<[u8; 32]>,
diffuse_service: DiffuseService<ClearNet>,
dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
config: TxpoolConfig,
) -> TxpoolManagerHandle {
let TxpoolReadResponse::Backlog(backlog) = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::Backlog)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
tracing::info!(txs_in_pool = backlog.len(), "starting txpool manager");
let mut stem_txs = Vec::new();
let mut tx_timeouts = DelayQueue::with_capacity(backlog.len());
let current_txs = backlog
.into_iter()
.map(|tx| {
let timeout_key = if tx.private {
stem_txs.push(tx.id);
None
} else {
let next_timeout = calculate_next_timeout(tx.received_at, config.maximum_age_secs);
Some(tx_timeouts.insert(tx.id, Duration::from_secs(next_timeout)))
};
(
tx.id,
TxInfo {
weight: tx.weight,
fee: tx.fee,
received_at: tx.received_at,
private: tx.private,
timeout_key,
},
)
})
.collect();
let mut manager = TxpoolManager {
current_txs,
tx_timeouts,
txpool_write_handle,
txpool_read_handle,
dandelion_pool_manager,
promote_tx_channel,
diffuse_service,
config,
};
tracing::info!(stem_txs = stem_txs.len(), "promoting stem txs");
for tx in stem_txs {
manager.promote_tx(tx).await;
}
let (tx_tx, tx_rx) = mpsc::channel(INCOMING_TX_QUEUE_SIZE);
let (spent_kis_tx, spent_kis_rx) = mpsc::channel(1);
tokio::spawn(manager.run(tx_rx, spent_kis_rx));
TxpoolManagerHandle {
tx_tx,
spent_kis_tx,
}
}
/// A handle to the tx-pool manager.
#[derive(Clone)]
pub struct TxpoolManagerHandle {
/// The incoming tx channel.
pub tx_tx: mpsc::Sender<(
TransactionVerificationData,
TxState<CrossNetworkInternalPeerId>,
)>,
/// The spent key images in a new block tx.
spent_kis_tx: mpsc::Sender<(Vec<[u8; 32]>, oneshot::Sender<()>)>,
}
impl TxpoolManagerHandle {
/// Create a mock [`TxpoolManagerHandle`] that does nothing.
///
/// Useful for testing.
#[expect(clippy::let_underscore_must_use)]
pub fn mock() -> Self {
let (spent_kis_tx, mut spent_kis_rx) = mpsc::channel(1);
let (tx_tx, mut tx_rx) = mpsc::channel(100);
tokio::spawn(async move {
loop {
let Some(rec): Option<(_, oneshot::Sender<()>)> = spent_kis_rx.recv().await else {
return;
};
let _ = rec.1.send(());
}
});
tokio::spawn(async move {
loop {
if tx_rx.recv().await.is_none() {
return;
}
}
});
Self {
tx_tx,
spent_kis_tx,
}
}
/// Tell the tx-pool about spent key images in an incoming block.
pub async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
drop(self.spent_kis_tx.send((spent_key_images, tx)).await);
rx.await
.map_err(|_| anyhow::anyhow!("txpool manager stopped"))
}
}
/// Information on a transaction in the tx-pool.
struct TxInfo {
/// The weight of the transaction.
weight: usize,
/// The fee the transaction paid.
fee: u64,
/// The UNIX timestamp when the tx was received.
received_at: u64,
/// Whether the tx is in the private pool.
private: bool,
/// The [`delay_queue::Key`] for the timeout queue in the manager.
///
/// This will be [`None`] if the tx is private as timeouts for them are handled in the dandelion pool.
timeout_key: Option<delay_queue::Key>,
}
struct TxpoolManager {
current_txs: IndexMap<[u8; 32], TxInfo>,
/// A [`DelayQueue`] for waiting on tx timeouts.
///
/// Timeouts can be for re-relaying or removal from the pool.
tx_timeouts: DelayQueue<[u8; 32]>,
txpool_write_handle: TxpoolWriteHandle,
txpool_read_handle: TxpoolReadHandle,
dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
/// The channel the dandelion manager will use to communicate that a tx should be promoted to the
/// public pool.
promote_tx_channel: mpsc::Receiver<[u8; 32]>,
/// The [`DiffuseService`] to diffuse txs to the p2p network.
///
/// Used for re-relays.
diffuse_service: DiffuseService<ClearNet>,
config: TxpoolConfig,
}
impl TxpoolManager {
/// Removes a transaction from the tx-pool manager, and optionally the database too.
///
/// # Panics
///
/// This function will panic if the tx is not in the tx-pool manager.
#[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
async fn remove_tx_from_pool(&mut self, tx: [u8; 32], remove_from_db: bool) {
tracing::debug!("removing tx from pool");
let tx_info = self.current_txs.swap_remove(&tx).unwrap();
tx_info
.timeout_key
.and_then(|key| self.tx_timeouts.try_remove(&key));
if remove_from_db {
self.txpool_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::RemoveTransaction(tx))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
}
/// Re-relay a tx to the network.
///
/// # Panics
///
/// This function will panic if the tx is not in the tx-pool.
#[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
async fn rerelay_tx(&mut self, tx: [u8; 32]) {
tracing::debug!("re-relaying tx to network");
let TxpoolReadResponse::TxBlob {
tx_blob,
state_stem: _,
} = self
.txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::TxBlob(tx))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
self.diffuse_service
.call(DiffuseRequest(DandelionTx(Bytes::from(tx_blob))))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
/// Handles a transaction timeout, be either rebroadcasting or dropping the tx from the pool.
/// If a rebroadcast happens, this function will handle adding another timeout to the queue.
#[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
async fn handle_tx_timeout(&mut self, tx: [u8; 32]) {
let Some(tx_info) = self.current_txs.get(&tx) else {
tracing::warn!("tx timed out, but tx not in pool");
return;
};
let time_in_pool = current_unix_timestamp() - tx_info.received_at;
// Check if the tx has timed out, with a small buffer to prevent rebroadcasting if the time is
// slightly off.
if time_in_pool + 10 > self.config.maximum_age_secs {
tracing::warn!("tx has been in pool too long, removing from pool");
self.remove_tx_from_pool(tx, true).await;
return;
}
let received_at = tx_info.received_at;
tracing::debug!(time_in_pool, "tx timed out, resending to network");
self.rerelay_tx(tx).await;
let tx_info = self.current_txs.get_mut(&tx).unwrap();
let next_timeout = calculate_next_timeout(received_at, self.config.maximum_age_secs);
tracing::trace!(in_secs = next_timeout, "setting next tx timeout");
tx_info.timeout_key = Some(
self.tx_timeouts
.insert(tx, Duration::from_secs(next_timeout)),
);
}
/// Adds a tx to the tx-pool manager.
#[instrument(level = "trace", skip_all, fields(tx_id = hex::encode(tx)))]
fn track_tx(&mut self, tx: [u8; 32], weight: usize, fee: u64, private: bool) {
let now = current_unix_timestamp();
let timeout_key = if private {
// The dandelion pool handles stem tx embargo.
None
} else {
let timeout = calculate_next_timeout(now, self.config.maximum_age_secs);
tracing::trace!(in_secs = timeout, "setting next tx timeout");
Some(self.tx_timeouts.insert(tx, Duration::from_secs(timeout)))
};
self.current_txs.insert(
tx,
TxInfo {
weight,
fee,
received_at: now,
private,
timeout_key,
},
);
}
/// Handles an incoming tx, adding it to the pool and routing it.
#[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx.tx_hash), state))]
async fn handle_incoming_tx(
&mut self,
tx: TransactionVerificationData,
state: TxState<CrossNetworkInternalPeerId>,
) {
tracing::debug!("handling new tx");
let incoming_tx =
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
let (tx_hash, tx_weight, tx_fee) = (tx.tx_hash, tx.tx_weight, tx.fee);
let TxpoolWriteResponse::AddTransaction(double_spend) = self
.txpool_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::AddTransaction {
tx: Box::new(tx),
state_stem: state.is_stem_stage(),
})
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
if let Some(tx_hash) = double_spend {
tracing::debug!(
double_spent = hex::encode(tx_hash),
"transaction is a double spend, ignoring"
);
return;
}
self.track_tx(tx_hash, tx_weight, tx_fee, state.is_stem_stage());
let incoming_tx = incoming_tx
.with_routing_state(state)
.with_state_in_db(None)
.build()
.unwrap();
self.dandelion_pool_manager
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(incoming_tx)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
/// Promote a tx to the public pool.
#[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))]
async fn promote_tx(&mut self, tx: [u8; 32]) {
let Some(tx_info) = self.current_txs.get_mut(&tx) else {
tracing::debug!("not promoting tx, tx not in pool");
return;
};
if !tx_info.private {
tracing::trace!("not promoting tx, tx is already public");
return;
}
tracing::debug!("promoting tx");
// It's now in the public pool, pretend we just saw it.
tx_info.received_at = current_unix_timestamp();
let next_timeout =
calculate_next_timeout(tx_info.received_at, self.config.maximum_age_secs);
tracing::trace!(in_secs = next_timeout, "setting next tx timeout");
tx_info.timeout_key = Some(
self.tx_timeouts
.insert(tx, Duration::from_secs(next_timeout)),
);
self.txpool_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::Promote(tx))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
/// Handles removing all transactions that have been included/double spent in an incoming block.
#[instrument(level = "debug", skip_all)]
async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) {
tracing::debug!("handling new block");
let TxpoolWriteResponse::NewBlock(removed_txs) = self
.txpool_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::NewBlock { spent_key_images })
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
for tx in removed_txs {
self.remove_tx_from_pool(tx, false).await;
}
}
#[expect(clippy::let_underscore_must_use)]
async fn run(
mut self,
mut tx_rx: mpsc::Receiver<(
TransactionVerificationData,
TxState<CrossNetworkInternalPeerId>,
)>,
mut block_rx: mpsc::Receiver<(Vec<[u8; 32]>, oneshot::Sender<()>)>,
) {
loop {
tokio::select! {
Some(tx) = self.tx_timeouts.next() => {
self.handle_tx_timeout(tx.into_inner()).await;
}
Some((tx, state)) = tx_rx.recv() => {
self.handle_incoming_tx(tx, state).await;
}
Some(tx) = self.promote_tx_channel.recv() => {
self.promote_tx(tx).await;
}
Some((spent_kis, tx)) = block_rx.recv() => {
self.new_block(spent_kis).await;
let _ = tx.send(());
}
}
}
}
}
/// Calculates the amount of time to wait before resending a tx to the network.
fn calculate_next_timeout(received_at: u64, max_time_in_pool: u64) -> u64 {
/// The base time between re-relays to the p2p network.
const TX_RERELAY_TIME: u64 = 300;
/*
This is a simple exponential backoff.
The first timeout is TX_RERELAY_TIME seconds, the second is 2 * TX_RERELAY_TIME seconds, then 4, 8, 16, etc.
*/
let now = current_unix_timestamp();
let time_in_pool = now - received_at;
let time_till_max_timeout = max_time_in_pool.saturating_sub(time_in_pool);
let timeouts = time_in_pool / TX_RERELAY_TIME;
min((timeouts + 1) * TX_RERELAY_TIME, time_till_max_timeout)
}

View File

@@ -11,7 +11,7 @@ txpool = ["dep:rand_distr", "dep:tokio-util", "dep:tokio"]
[dependencies]
tower = { workspace = true, features = ["util"] }
tracing = { workspace = true, features = ["std"] }
tracing = { workspace = true, features = ["std", "attributes"] }
futures = { workspace = true, features = ["std"] }
tokio = { workspace = true, features = ["rt", "sync", "macros"], optional = true}

View File

@@ -14,6 +14,7 @@ use tokio::{
};
use tokio_util::time::{delay_queue, DelayQueue};
use tower::{Service, ServiceExt};
use tracing::Instrument;
use crate::{
pool::IncomingTx,
@@ -223,43 +224,47 @@ where
Ok(tx.map(|tx| tx.0))
}
#[expect(clippy::type_complexity)]
/// Starts the [`DandelionPoolManager`].
pub(crate) async fn run(
mut self,
mut rx: mpsc::Receiver<(IncomingTx<Tx, TxId, PeerId>, oneshot::Sender<()>)>,
mut rx: mpsc::Receiver<(
(IncomingTx<Tx, TxId, PeerId>, tracing::Span),
oneshot::Sender<()>,
)>,
) {
tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config);
loop {
tracing::trace!("Waiting for next event.");
tokio::select! {
// biased to handle current txs before routing new ones.
biased;
Some(fired) = self.embargo_timers.next() => {
tracing::debug!("Embargo timer fired, did not see stem tx in time.");
let span = tracing::debug_span!("embargo_timer_fired");
tracing::debug!(parent: &span,"Embargo timer fired, did not see stem tx in time.");
let tx_id = fired.into_inner();
if let Err(e) = self.promote_and_fluff_tx(tx_id).await {
tracing::error!("Error handling fired embargo timer: {e}");
if let Err(e) = self.promote_and_fluff_tx(tx_id).instrument(span.clone()).await {
tracing::error!(parent: &span, "Error handling fired embargo timer: {e}");
return;
}
}
Some(Ok((tx_id, res))) = self.routing_set.join_next() => {
tracing::trace!("Received d++ routing result.");
let span = tracing::debug_span!("dandelion_routing_result");
let res = match res {
Ok(State::Fluff) => {
tracing::debug!("Transaction was fluffed upgrading it to the public pool.");
self.promote_tx(tx_id).await
tracing::debug!(parent: &span, "Transaction was fluffed upgrading it to the public pool.");
self.promote_tx(tx_id).instrument(span.clone()).await
}
Err(tx_state) => {
tracing::debug!("Error routing transaction, trying again.");
tracing::debug!(parent: &span, "Error routing transaction, trying again.");
match self.get_tx_from_pool(tx_id.clone()).await {
match self.get_tx_from_pool(tx_id.clone()).instrument(span.clone()).await {
Ok(Some(tx)) => match tx_state {
TxState::Fluff => self.fluff_tx(tx, tx_id).await,
TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).await,
TxState::Local => self.stem_tx(tx, tx_id, None).await,
TxState::Fluff => self.fluff_tx(tx, tx_id).instrument(span.clone()).await,
TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).instrument(span.clone()).await,
TxState::Local => self.stem_tx(tx, tx_id, None).instrument(span.clone()).await,
}
Err(e) => Err(e),
_ => continue,
@@ -269,22 +274,24 @@ where
};
if let Err(e) = res {
tracing::error!("Error handling transaction routing return: {e}");
tracing::error!(parent: &span, "Error handling transaction routing return: {e}");
return;
}
}
req = rx.recv() => {
tracing::debug!("Received new tx to route.");
let Some((IncomingTx { tx, tx_id, routing_state }, res_tx)) = req else {
let Some(((IncomingTx { tx, tx_id, routing_state }, span), res_tx)) = req else {
return;
};
if let Err(e) = self.handle_incoming_tx(tx, routing_state, tx_id).await {
let span = tracing::debug_span!(parent: &span, "dandelion_pool_manager");
tracing::debug!(parent: &span, "Received new tx to route.");
if let Err(e) = self.handle_incoming_tx(tx, routing_state, tx_id).instrument(span.clone()).await {
#[expect(clippy::let_underscore_must_use, reason = "dropped receivers can be ignored")]
let _ = res_tx.send(());
tracing::error!("Error handling transaction in dandelion pool: {e}");
tracing::error!(parent: &span, "Error handling transaction in dandelion pool: {e}");
return;
}

View File

@@ -34,7 +34,6 @@ use tokio::{
};
use tokio_util::{sync::PollSender, time::DelayQueue};
use tower::Service;
use tracing::Instrument;
use crate::{
pool::manager::DandelionPoolShutDown,
@@ -93,9 +92,7 @@ where
_tx: PhantomData,
};
let span = tracing::debug_span!("dandelion_pool");
tokio::spawn(pool.run(rx).instrument(span));
tokio::spawn(pool.run(rx));
DandelionPoolService {
tx: PollSender::new(tx),
@@ -107,8 +104,12 @@ where
/// Used to send [`IncomingTx`]s to the [`DandelionPoolManager`]
#[derive(Clone)]
pub struct DandelionPoolService<Tx, TxId, PeerId> {
#[expect(clippy::type_complexity)]
/// The channel to [`DandelionPoolManager`].
tx: PollSender<(IncomingTx<Tx, TxId, PeerId>, oneshot::Sender<()>)>,
tx: PollSender<(
(IncomingTx<Tx, TxId, PeerId>, tracing::Span),
oneshot::Sender<()>,
)>,
}
impl<Tx, TxId, PeerId> Service<IncomingTx<Tx, TxId, PeerId>>
@@ -132,7 +133,7 @@ where
let res = self
.tx
.send_item((req, tx))
.send_item(((req, tracing::Span::current()), tx))
.map_err(|_| DandelionPoolShutDown);
async move {

View File

@@ -21,7 +21,7 @@ serde = ["dep:serde", "cuprate-database/serde", "cuprate-database-service/
cuprate-database = { workspace = true, features = ["heed"] }
cuprate-database-service = { workspace = true }
cuprate-types = { workspace = true, features = ["rpc"] }
cuprate-helper = { workspace = true, default-features = false, features = ["constants"] }
cuprate-helper = { workspace = true, default-features = false, features = ["constants", "time"] }
monero-serai = { workspace = true, features = ["std"] }
bytemuck = { workspace = true, features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] }
@@ -29,6 +29,7 @@ bitflags = { workspace = true, features = ["std", "serde", "byte
thiserror = { workspace = true }
hex = { workspace = true, features = ["std"] }
blake3 = { workspace = true, features = ["std"] }
tracing = { workspace = true }
tower = { workspace = true }
rayon = { workspace = true }

View File

@@ -82,7 +82,7 @@ use cuprate_txpool::{
.build();
// Initialize the database environment.
let env = cuprate_txpool::open(config)?;
let env = cuprate_txpool::open(&config)?;
// Open up a transaction + tables for writing.
let env_inner = env.env_inner();

View File

@@ -1,9 +1,23 @@
//! General free functions (related to the tx-pool database).
//---------------------------------------------------------------------------------------------------- Import
use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw};
use std::borrow::Cow;
use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash};
use cuprate_database::{
ConcreteEnv, DatabaseRo, Env, EnvInner, InitError, RuntimeError, StorableStr, TxRw,
};
use cuprate_database::{DatabaseRw, TxRo};
use crate::{
config::Config,
tables::{Metadata, OpenTables},
types::TransactionBlobHash,
};
/// The current version of the database format.
pub const DATABASE_VERSION: StorableStr = StorableStr(Cow::Borrowed("0.1"));
/// The key used to store the database version in the [`Metadata`] table.
pub const VERSION_KEY: StorableStr = StorableStr(Cow::Borrowed("version"));
//---------------------------------------------------------------------------------------------------- Free functions
/// Open the txpool database using the passed [`Config`].
@@ -22,9 +36,9 @@ use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash};
/// - A table could not be created/opened
#[cold]
#[inline(never)] // only called once
pub fn open(config: Config) -> Result<ConcreteEnv, InitError> {
pub fn open(config: &Config) -> Result<ConcreteEnv, InitError> {
// Attempt to open the database environment.
let env = <ConcreteEnv as Env>::open(config.db_config)?;
let env = <ConcreteEnv as Env>::open(config.db_config.clone())?;
/// Convert runtime errors to init errors.
///
@@ -36,20 +50,28 @@ pub fn open(config: Config) -> Result<ConcreteEnv, InitError> {
fn runtime_to_init_error(runtime: RuntimeError) -> InitError {
match runtime {
RuntimeError::Io(io_error) => io_error.into(),
RuntimeError::KeyNotFound => InitError::InvalidVersion,
// These errors shouldn't be happening here.
RuntimeError::KeyExists
| RuntimeError::KeyNotFound
| RuntimeError::ResizeNeeded
| RuntimeError::TableNotFound => unreachable!(),
RuntimeError::KeyExists | RuntimeError::ResizeNeeded | RuntimeError::TableNotFound => {
unreachable!()
}
}
}
let fresh_db;
// INVARIANT: We must ensure that all tables are created,
// `cuprate_database` has no way of knowing _which_ tables
// we want since it is agnostic, so we are responsible for this.
{
let env_inner = env.env_inner();
// Store if this DB has been used before by checking if the metadata table exists.
let tx_ro = env_inner.tx_ro().map_err(runtime_to_init_error)?;
fresh_db = env_inner.open_db_ro::<Metadata>(&tx_ro).is_err();
TxRo::commit(tx_ro).map_err(runtime_to_init_error)?;
let tx_rw = env_inner.tx_rw().map_err(runtime_to_init_error)?;
// Create all tables.
@@ -58,6 +80,43 @@ pub fn open(config: Config) -> Result<ConcreteEnv, InitError> {
TxRw::commit(tx_rw).map_err(runtime_to_init_error)?;
}
{
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw().map_err(runtime_to_init_error)?;
let mut metadata = env_inner
.open_db_rw::<Metadata>(&tx_rw)
.map_err(runtime_to_init_error)?;
if fresh_db {
// If the database is new, add the version.
metadata
.put(&VERSION_KEY, &DATABASE_VERSION)
.map_err(runtime_to_init_error)?;
}
let print_version_err = || {
tracing::error!(
"The database follows an old format, please delete the database at: {}",
config.db_config.db_directory().display()
);
};
let version = metadata
.get(&VERSION_KEY)
.inspect_err(|_| print_version_err())
.map_err(runtime_to_init_error)?;
if version != DATABASE_VERSION {
// TODO: database migration when stable? This is the tx-pool so is not critical.
print_version_err();
return Err(InitError::InvalidVersion);
}
drop(metadata);
TxRw::commit(tx_rw).map_err(runtime_to_init_error)?;
}
Ok(env)
}

View File

@@ -16,7 +16,7 @@ mod tx;
pub mod types;
pub use config::Config;
pub use free::{open, transaction_blob_hash};
pub use free::{open, transaction_blob_hash, DATABASE_VERSION, VERSION_KEY};
pub use tx::TxEntry;
//re-exports

View File

@@ -55,7 +55,7 @@
//! .build();
//!
//! // Initialize the database environment.
//! let env = cuprate_txpool::open(config)?;
//! let env = cuprate_txpool::open(&config)?;
//!
//! // Open up a transaction + tables for writing.
//! let env_inner = env.env_inner();

View File

@@ -5,6 +5,7 @@ use bytemuck::TransparentWrapper;
use monero_serai::transaction::{NotPruned, Transaction};
use cuprate_database::{DatabaseRw, DbResult, StorableVec};
use cuprate_helper::time::current_unix_timestamp;
use cuprate_types::TransactionVerificationData;
use crate::{
@@ -42,6 +43,7 @@ pub fn add_transaction(
&TransactionInfo {
fee: tx.fee,
weight: tx.tx_weight,
received_at: current_unix_timestamp(),
flags,
_padding: [0; 7],
},

View File

@@ -87,7 +87,7 @@
//! .build();
//!
//! // Initialize the database thread-pool.
//! let (mut read_handle, mut write_handle, _) = cuprate_txpool::service::init(config)?;
//! let (mut read_handle, mut write_handle, _) = cuprate_txpool::service::init(&config)?;
//!
//! // Prepare a request to write block.
//! let tx = TX_V1_SIG2.clone();

View File

@@ -24,7 +24,7 @@ use crate::{
/// # Errors
/// This will forward the error if [`crate::open`] failed.
pub fn init(
config: Config,
config: &Config,
) -> Result<(TxpoolReadHandle, TxpoolWriteHandle, Arc<ConcreteEnv>), InitError> {
let reader_threads = config.reader_threads;
@@ -51,7 +51,7 @@ pub fn init(
/// # Errors
/// This will forward the error if [`crate::open`] failed.
pub fn init_with_pool(
config: Config,
config: &Config,
pool: Arc<ThreadPool>,
) -> Result<(TxpoolReadHandle, TxpoolWriteHandle, Arc<ConcreteEnv>), InitError> {
// Initialize the database itself.

View File

@@ -191,9 +191,13 @@ pub enum TxpoolWriteResponse {
/// Response to:
/// - [`TxpoolWriteRequest::RemoveTransaction`]
/// - [`TxpoolWriteRequest::Promote`]
/// - [`TxpoolWriteRequest::NewBlock`]
Ok,
/// Response to [`TxpoolWriteRequest::NewBlock`].
///
/// The inner values are the transactions removed from the pool.
NewBlock(Vec<TransactionHash>),
/// Response to [`TxpoolWriteRequest::AddTransaction`].
///
/// If the inner value is [`Some`] the tx was not added to the pool as it double spends a tx with the given hash.

View File

@@ -13,7 +13,9 @@ use std::{
use rayon::ThreadPool;
use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError};
use cuprate_database::{
ConcreteEnv, DatabaseIter, DatabaseRo, DbResult, Env, EnvInner, RuntimeError,
};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use crate::{
@@ -24,6 +26,7 @@ use crate::{
},
tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
types::{TransactionBlobHash, TransactionHash},
TxEntry,
};
// TODO: update the docs here
@@ -229,7 +232,27 @@ fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseRe
/// [`TxpoolReadRequest::Backlog`].
#[inline]
fn backlog(env: &ConcreteEnv) -> ReadResponseResult {
Ok(TxpoolReadResponse::Backlog(todo!()))
let inner_env = env.env_inner();
let tx_ro = inner_env.tx_ro()?;
let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
let backlog = tx_infos_table
.iter()?
.map(|info| {
let (id, info) = info?;
Ok(TxEntry {
id,
weight: info.weight,
fee: info.fee,
private: info.flags.private(),
received_at: info.received_at,
})
})
.collect::<Result<_, RuntimeError>>()?;
Ok(TxpoolReadResponse::Backlog(backlog))
}
/// [`TxpoolReadRequest::Size`].

View File

@@ -4,6 +4,7 @@ use cuprate_database::{
ConcreteEnv, DatabaseRo, DatabaseRw, DbResult, Env, EnvInner, RuntimeError, TxRw,
};
use cuprate_database_service::DatabaseWriteHandle;
use cuprate_helper::time::current_unix_timestamp;
use cuprate_types::TransactionVerificationData;
use crate::{
@@ -115,6 +116,7 @@ fn promote(env: &ConcreteEnv, tx_hash: &TransactionHash) -> DbResult<TxpoolWrite
let mut tx_infos = env_inner.open_db_rw::<TransactionInfos>(&tx_rw)?;
tx_infos.update(tx_hash, |mut info| {
info.received_at = current_unix_timestamp();
info.flags.remove(TxStateFlags::STATE_STEM);
Some(info)
})
@@ -137,8 +139,10 @@ fn new_block(env: &ConcreteEnv, spent_key_images: &[KeyImage]) -> DbResult<Txpoo
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw()?;
let mut txs_removed = Vec::new();
// FIXME: use try blocks once stable.
let result = || {
let mut result = || {
let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
// Remove all txs which spend key images that were spent in the new block.
@@ -146,8 +150,10 @@ fn new_block(env: &ConcreteEnv, spent_key_images: &[KeyImage]) -> DbResult<Txpoo
match tables_mut
.spent_key_images()
.get(key_image)
.and_then(|tx_hash| ops::remove_transaction(&tx_hash, &mut tables_mut))
{
.and_then(|tx_hash| {
txs_removed.push(tx_hash);
ops::remove_transaction(&tx_hash, &mut tables_mut)
}) {
Ok(()) | Err(RuntimeError::KeyNotFound) => (),
Err(e) => return Err(e),
}
@@ -162,5 +168,5 @@ fn new_block(env: &ConcreteEnv, spent_key_images: &[KeyImage]) -> DbResult<Txpoo
}
TxRw::commit(tx_rw)?;
Ok(TxpoolWriteResponse::Ok)
Ok(TxpoolWriteResponse::NewBlock(txs_removed))
}

View File

@@ -14,7 +14,7 @@
//! # Traits
//! This module also contains a set of traits for
//! accessing _all_ tables defined here at once.
use cuprate_database::{define_tables, StorableVec};
use cuprate_database::{define_tables, StorableStr, StorableVec};
use crate::types::{
KeyImage, RawCachedVerificationState, TransactionBlobHash, TransactionHash, TransactionInfo,
@@ -48,4 +48,8 @@ define_tables! {
/// Transaction blob hashes that are in the pool.
4 => KnownBlobHashes,
TransactionBlobHash => TransactionHash,
/// Current database version.
5 => Metadata,
StorableStr => StorableStr,
}

View File

@@ -8,9 +8,11 @@ pub struct TxEntry {
/// The transaction's ID (hash).
pub id: [u8; 32],
/// The transaction's weight.
pub weight: u64,
pub weight: usize,
/// The transaction's fee.
pub fee: u64,
/// How long the transaction has been in the pool.
pub time_in_pool: std::time::Duration,
/// If the tx is in the private pool.
pub private: bool,
/// The UNIX timestamp when the transaction was received.
pub received_at: u64,
}

View File

@@ -31,6 +31,12 @@ bitflags::bitflags! {
}
}
impl TxStateFlags {
pub const fn private(&self) -> bool {
self.contains(Self::STATE_STEM)
}
}
/// Information on a tx-pool transaction.
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)]
#[repr(C)]
@@ -39,6 +45,8 @@ pub struct TransactionInfo {
pub fee: u64,
/// The transaction's weight.
pub weight: usize,
/// The UNIX timestamp of when this tx was received.
pub received_at: u64,
/// [`TxStateFlags`] of this transaction.
pub flags: TxStateFlags,
#[expect(clippy::pub_underscore_fields)]