consensus: moved pending txs from memory a distinct sled store

This commit is contained in:
aggstam
2023-03-07 23:14:09 +02:00
parent ce26ad15a7
commit e06b6fa1df
6 changed files with 171 additions and 37 deletions

View File

@@ -31,7 +31,7 @@ pub mod slot_checkpoint_store;
pub use slot_checkpoint_store::SlotCheckpointStore;
pub mod tx_store;
pub use tx_store::TxStore;
pub use tx_store::{PendingTxStore, TxStore};
pub mod contract_store;
pub use contract_store::{ContractStateStore, WasmStore};
@@ -51,6 +51,8 @@ pub struct Blockchain {
pub slot_checkpoints: SlotCheckpointStore,
/// Transactions sled tree
pub transactions: TxStore,
/// Pending transactions sled tree
pub pending_txs: PendingTxStore,
/// Contract states
pub contracts: ContractStateStore,
/// Wasm bincodes
@@ -65,6 +67,7 @@ impl Blockchain {
let order = BlockOrderStore::new(db, genesis_ts, genesis_data)?;
let slot_checkpoints = SlotCheckpointStore::new(db)?;
let transactions = TxStore::new(db)?;
let pending_txs = PendingTxStore::new(db)?;
let contracts = ContractStateStore::new(db)?;
let wasm_bincode = WasmStore::new(db)?;
@@ -75,6 +78,7 @@ impl Blockchain {
order,
slot_checkpoints,
transactions,
pending_txs,
contracts,
wasm_bincode,
})

View File

@@ -21,6 +21,7 @@ use darkfi_serial::{deserialize, serialize};
use crate::{tx::Transaction, Error, Result};
const SLED_TX_TREE: &[u8] = b"_transactions";
const SLED_PENDING_TX_TREE: &[u8] = b"_pending_transactions";
/// The `TxStore` is a `sled` tree storing all the blockchain's
/// transactions where the key is the transaction hash, and the value is
@@ -105,3 +106,83 @@ impl TxStore {
Ok(txs)
}
}
/// The `PendingTxStore` is a `sled` tree storing all the node pending
/// transactions where the key is the transaction hash, and the value is
/// the serialized transaction.
#[derive(Clone)]
pub struct PendingTxStore(sled::Tree);
impl PendingTxStore {
/// Opens a new or existing `PendingTxStore` on the given sled database.
pub fn new(db: &sled::Db) -> Result<Self> {
let tree = db.open_tree(SLED_PENDING_TX_TREE)?;
Ok(Self(tree))
}
/// Insert a slice of [`Transaction`] into the pending tx store.
/// With sled, the operation is done as a batch.
/// The transactions are hashed with BLAKE3 and this hash is used as
/// the key, while the value is the serialized [`Transaction`] itself.
/// On success, the function returns the transaction hashes in the same
/// order as the input transactions.
pub fn insert(&self, transactions: &[Transaction]) -> Result<Vec<blake3::Hash>> {
let mut ret = Vec::with_capacity(transactions.len());
let mut batch = sled::Batch::default();
for tx in transactions {
let serialized = serialize(tx);
let tx_hash = blake3::hash(&serialized);
batch.insert(tx_hash.as_bytes(), serialized);
ret.push(tx_hash);
}
self.0.apply_batch(batch)?;
Ok(ret)
}
/// Check if the pending tx store contains a given transaction hash.
pub fn contains(&self, tx_hash: &blake3::Hash) -> Result<bool> {
Ok(self.0.contains_key(tx_hash.as_bytes())?)
}
/// Retrieve all transactions from the pending tx store in the form of a tuple
/// (`tx_hash`, `tx`).
/// Be careful as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<(blake3::Hash, Transaction)>> {
let mut txs = vec![];
for tx in self.0.iter() {
let (key, value) = tx.unwrap();
let hash_bytes: [u8; 32] = key.as_ref().try_into().unwrap();
let tx = deserialize(&value)?;
txs.push((hash_bytes.into(), tx));
}
Ok(txs)
}
/// Retrieve all transactions from the pending tx store.
/// Be careful as this will try to load everything in memory.
pub fn get_all_txs(&self) -> Result<Vec<Transaction>> {
let txs = self.get_all()?;
Ok(txs.iter().map(|x| x.1.clone()).rev().collect())
}
/// Remove a slice of [`Transaction`] from the pending tx store.
/// With sled, the operation is done as a batch.
/// The transactions are hashed with BLAKE3 and this hash is used as
/// the key to remove.
pub fn remove(&self, transactions: &[Transaction]) -> Result<()> {
let mut batch = sled::Batch::default();
for tx in transactions {
let serialized = serialize(tx);
let tx_hash = blake3::hash(&serialized);
batch.remove(tx_hash.as_bytes());
}
self.0.apply_batch(batch)?;
Ok(())
}
}

View File

@@ -99,7 +99,17 @@ impl ProtocolSyncConsensus {
for fork in &lock.consensus.forks {
forks.push(fork.clone().into());
}
let unconfirmed_txs = lock.unconfirmed_txs.clone();
let pending_txs = match lock.blockchain.pending_txs.get_all_txs() {
Ok(v) => v,
Err(e) => {
debug!(
target: "consensus::protocol_sync_consensus::handle_receive_request()",
"Failed querying pending txs store: {}",
e
);
vec![]
}
};
let slot_checkpoints = lock.consensus.slot_checkpoints.clone();
let mut f_history = vec![];
for f in &lock.consensus.f_history {
@@ -116,7 +126,7 @@ impl ProtocolSyncConsensus {
bootstrap_slot,
current_slot,
forks,
unconfirmed_txs,
pending_txs,
slot_checkpoints,
f_history,
err_history,

View File

@@ -665,7 +665,7 @@ pub struct ConsensusResponse {
/// Hot/live data used by the consensus algorithm
pub forks: Vec<ForkInfo>,
/// Pending transactions
pub unconfirmed_txs: Vec<Transaction>,
pub pending_txs: Vec<Transaction>,
/// Hot/live slot checkpoints
pub slot_checkpoints: Vec<SlotCheckpoint>,
// TODO: When Float10 supports encoding/decoding this should be

View File

@@ -145,7 +145,7 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul
}
lock.consensus.bootstrap_slot = response.bootstrap_slot;
lock.consensus.forks = forks;
lock.unconfirmed_txs = response.unconfirmed_txs.clone();
lock.append_pending_txs(&response.pending_txs).await;
lock.consensus.slot_checkpoints = response.slot_checkpoints.clone();
lock.consensus.previous_leaders = 1;
let mut f_history = vec![];

View File

@@ -72,8 +72,6 @@ pub struct ValidatorState {
pub consensus: ConsensusState,
/// Canonical (finalized) blockchain
pub blockchain: Blockchain,
/// Pending transactions
pub unconfirmed_txs: Vec<Transaction>,
/// A map of various subscribers exporting live info from the blockchain
/// TODO: Instead of JsonNotification, it can be an enum of internal objects,
/// and then we don't have to deal with json in this module but only
@@ -135,8 +133,6 @@ impl ValidatorState {
single_node,
)?;
let unconfirmed_txs = vec![];
// -----NATIVE WASM CONTRACTS-----
// This is the current place where native contracts are being deployed.
// When the `Blockchain` object is created, it doesn't care whether it
@@ -191,7 +187,6 @@ impl ValidatorState {
lead_verifying_key,
consensus,
blockchain,
unconfirmed_txs,
subscribers,
wallet,
synced: false,
@@ -202,7 +197,7 @@ impl ValidatorState {
}
/// The node retrieves a transaction, validates its state transition,
/// and appends it to the unconfirmed transactions list.
/// and appends it to the pending txs store.
pub async fn append_tx(&mut self, tx: Transaction) -> bool {
let tx_hash = blake3::hash(&serialize(&tx));
let tx_in_txstore = match self.blockchain.transactions.contains(&tx_hash) {
@@ -213,7 +208,15 @@ impl ValidatorState {
}
};
if self.unconfirmed_txs.contains(&tx) || tx_in_txstore {
let tx_in_pending_txs_store = match self.blockchain.pending_txs.contains(&tx_hash) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::validator", "append_tx(): Failed querying pending txs store: {}", e);
return false
}
};
if tx_in_txstore || tx_in_pending_txs_store {
info!(target: "consensus::validator", "append_tx(): We have already seen this tx.");
return false
}
@@ -224,13 +227,60 @@ impl ValidatorState {
return false
};
info!(target: "consensus::validator", "append_tx(): Appended tx to mempool");
self.unconfirmed_txs.push(tx);
if let Err(e) = self.blockchain.pending_txs.insert(&[tx]) {
error!(target: "consensus::validator", "append_tx(): Failed to insert transaction to pending txs store: {}", e);
return false
}
info!(target: "consensus::validator", "append_tx(): Appended tx to pending txs store");
true
}
/// The node retrieves transactions vector, validates their state transition,
/// and appends successfull ones to the pending txs store.
pub async fn append_pending_txs(&mut self, txs: &[Transaction]) {
let mut filtered_txs = vec![];
for tx in txs {
let tx_hash = blake3::hash(&serialize(tx));
let tx_in_txstore = match self.blockchain.transactions.contains(&tx_hash) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::validator", "append_pending_txs(): Failed querying txstore: {}", e);
continue
}
};
let tx_in_pending_txs_store = match self.blockchain.pending_txs.contains(&tx_hash) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::validator", "append_pending_txs(): Failed querying pending txs store: {}", e);
continue
}
};
if tx_in_txstore || tx_in_pending_txs_store {
info!(target: "consensus::validator", "append_pending_txs(): We have already seen this tx.");
continue
}
filtered_txs.push(tx.clone());
}
info!(target: "consensus::validator", "append_pending_txs(): Starting state transition validation");
// TODO: verify_transactions should return erroneous txs of the set to ignore them
if let Err(e) = self.verify_transactions(&filtered_txs[..], false).await {
error!(target: "consensus::validator", "append_pending_txs(): Failed to verify transaction: {}", e);
return
};
if let Err(e) = self.blockchain.pending_txs.insert(&filtered_txs) {
error!(target: "consensus::validator", "append_pending_txs(): Failed to insert transactions to pending txs store: {}", e);
return
}
info!(target: "consensus::validator", "append_pending_txs(): Appended tx to pending txs store");
}
/// Generate a block proposal for the current slot, containing all
/// unconfirmed transactions. Proposal extends the longest fork
/// pending transactions. Proposal extends the longest fork
/// chain the node is holding.
pub fn propose(
&mut self,
@@ -247,7 +297,7 @@ impl ValidatorState {
}
// Generate proposal
let unproposed_txs = self.unproposed_txs(fork_index);
let unproposed_txs = self.unproposed_txs(fork_index)?;
let mut tree = BridgeTree::<MerkleNode, MERKLE_DEPTH>::new(100);
// The following is pretty weird, so something better should be done.
for tx in &unproposed_txs {
@@ -304,17 +354,17 @@ impl ValidatorState {
Ok(Some((BlockProposal::new(header, unproposed_txs, lead_info), coin, derived_blind)))
}
/// Retrieve all unconfirmed transactions not proposed in previous blocks
/// Retrieve all pending transactions not proposed in previous blocks
/// of provided index chain.
pub fn unproposed_txs(&self, index: i64) -> Vec<Transaction> {
pub fn unproposed_txs(&self, index: i64) -> Result<Vec<Transaction>> {
let unproposed_txs = if index == -1 {
// If index is -1 (canonical blockchain) a new fork will be generated,
// therefore all unproposed transactions can be included in the proposal.
self.unconfirmed_txs.clone()
self.blockchain.pending_txs.get_all_txs()?
} else {
// We iterate over the fork chain proposals to find already proposed
// transactions and remove them from the local unproposed_txs vector.
let mut filtered_txs = self.unconfirmed_txs.clone();
let mut filtered_txs = self.blockchain.pending_txs.get_all_txs()?;
let chain = &self.consensus.forks[index as usize];
for state_checkpoint in &chain.sequence {
for tx in &state_checkpoint.proposal.block.txs {
@@ -329,10 +379,10 @@ impl ValidatorState {
// Check if transactions exceed configured cap
let cap = constants::TXS_CAP;
if unproposed_txs.len() > cap {
return unproposed_txs[0..cap].to_vec()
return Ok(unproposed_txs[0..cap].to_vec())
}
unproposed_txs
Ok(unproposed_txs)
}
/// Given a proposal, the node verify its sender (slot leader) and finds which blockchain
@@ -574,17 +624,6 @@ impl ValidatorState {
Ok(true)
}
/// Remove provided transactions vector from unconfirmed_txs if they exist.
pub fn remove_txs(&mut self, transactions: &Vec<Transaction>) -> Result<()> {
for tx in transactions {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| txs == tx) {
self.unconfirmed_txs.remove(pos);
}
}
Ok(())
}
/// Node checks if any of the fork chains can be finalized.
/// Consensus finalization logic:
/// - If the node has observed the creation of a fork chain and no other forks exists at same or greater height,
@@ -669,8 +708,8 @@ impl ValidatorState {
return Err(e)
}
// Remove proposal transactions from memory pool
if let Err(e) = self.remove_txs(&proposal.txs) {
// Remove proposal transactions from pending txs store
if let Err(e) = self.blockchain.pending_txs.remove(&proposal.txs) {
error!(target: "consensus::validator", "Removing finalized block transactions failed: {}", e);
return Err(e)
}
@@ -778,8 +817,8 @@ impl ValidatorState {
info!(target: "consensus::validator", "consensus: Sending notification about finalized block");
blocks_subscriber.notify(notif).await;
info!(target: "consensus::validator", "receive_finalized_block(): Removing block transactions from unconfirmed_txs");
self.remove_txs(&block.txs)?;
info!(target: "consensus::validator", "receive_finalized_block(): Removing block transactions from pending txs store");
self.blockchain.pending_txs.remove(&block.txs)?;
Ok(true)
}