consensus: Implement BlockOrderStore to store block order by slot id.

This commit is contained in:
aggstam
2022-04-20 09:04:27 +02:00
committed by parazyd
parent 7fb58b3d2b
commit bf6d4881d2
3 changed files with 86 additions and 15 deletions

View File

@@ -7,6 +7,7 @@ use crate::{
};
const SLED_BLOCK_TREE: &[u8] = b"_blocks";
const SLED_BLOCK_ORDER_TREE: &[u8] = b"_block_order";
pub struct BlockStore(sled::Tree);
@@ -80,3 +81,66 @@ impl BlockStore {
Ok(blocks)
}
}
pub struct BlockOrderStore(sled::Tree);
impl BlockOrderStore {
/// Opens a new or existing `BlockOderStore` on the given sled database.
pub fn new(db: &sled::Db, genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result<Self> {
let tree = db.open_tree(SLED_BLOCK_ORDER_TREE)?;
let store = Self(tree);
// In case the store is empty, create the genesis block.
if store.0.is_empty() {
let block = Block::genesis_block(genesis_ts, genesis_data);
let blockhash = blake3::hash(&serialize(&block));
store.insert(&[block.sl], &[blockhash])?;
}
Ok(store)
}
/// Insert a slice of slots and blockhashes into the store.
/// The block slot is used as the key, and the hash as value.
pub fn insert(&self, slots: &[u64], hashes: &[blake3::Hash]) -> Result<()> {
assert_eq!(slots.len(), hashes.len());
let mut batch = Batch::default();
for (i, sl) in slots.iter().enumerate() {
batch.insert(&sl.to_be_bytes(), hashes[i].as_bytes());
}
self.0.apply_batch(batch)?;
Ok(())
}
/// Retrieve the last block hash in the tree, based on the Ord
/// implementation for Vec<u8>.
pub fn get_last(&self) -> Result<Option<(u64, blake3::Hash)>> {
if let Some(found) = self.0.last()? {
let slot_bytes: [u8; 8] = found.0.as_ref().try_into().unwrap();
let hash_bytes: [u8; 32] = found.1.as_ref().try_into().unwrap();
let slot = u64::from_be_bytes(slot_bytes);
let hash = blake3::Hash::from(hash_bytes);
return Ok(Some((slot, hash)))
}
Ok(None)
}
/// Retrieve all block hashes.
/// Be careful as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<Option<(u64, blake3::Hash)>>> {
let mut ret = vec![];
let iterator = self.0.into_iter().enumerate();
for (_, r) in iterator {
let (k, v) = r.unwrap();
let slot_bytes: [u8; 8] = k.as_ref().try_into().unwrap();
let hash_bytes: [u8; 32] = v.as_ref().try_into().unwrap();
let slot = u64::from_be_bytes(slot_bytes);
let hash = blake3::Hash::from(hash_bytes);
ret.push(Some((slot, hash)));
}
Ok(ret)
}
}

View File

@@ -8,7 +8,7 @@ use crate::{
};
pub mod blockstore;
pub use blockstore::BlockStore;
pub use blockstore::{BlockOrderStore, BlockStore};
pub mod metadatastore;
pub use metadatastore::StreamletMetadataStore;
@@ -25,6 +25,8 @@ pub use txstore::TxStore;
pub struct Blockchain {
/// Blocks sled tree
pub blocks: BlockStore,
/// Block order sled tree
pub order: BlockOrderStore,
/// Transactions sled tree
pub transactions: TxStore,
/// Streamlet metadata sled tree
@@ -38,12 +40,13 @@ pub struct Blockchain {
impl Blockchain {
pub fn new(db: &sled::Db, genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result<Self> {
let blocks = BlockStore::new(db, genesis_ts, genesis_data)?;
let order = BlockOrderStore::new(db, genesis_ts, genesis_data)?;
let transactions = TxStore::new(db)?;
let streamlet_metadata = StreamletMetadataStore::new(db)?;
let nullifiers = NullifierStore::new(db)?;
let merkle_roots = RootStore::new(db)?;
Ok(Self { blocks, transactions, streamlet_metadata, nullifiers, merkle_roots })
Ok(Self { blocks, order, transactions, streamlet_metadata, nullifiers, merkle_roots })
}
/// Batch insert [`BlockProposal`]s.
@@ -58,15 +61,23 @@ impl Blockchain {
// Store block
let block =
Block { st: prop.st, sl: prop.sl, txs: tx_hashes, metadata: prop.metadata.clone() };
let blockhash = self.blocks.insert(&[block])?;
let blockhash = self.blocks.insert(&[block.clone()])?;
ret.push(blockhash[0]);
// Store block order
self.order.insert(&[block.sl], &[blockhash[0]])?;
// Store streamlet metadata
self.streamlet_metadata.insert(&[blockhash[0]], &[prop.sm.clone()])?;
}
Ok(ret)
}
/// Retrieve the last block slot and hash
pub fn last(&self) -> Result<Option<(u64, blake3::Hash)>> {
self.order.get_last()
}
}
impl Encodable for blake3::Hash {

View File

@@ -34,10 +34,6 @@ pub struct ConsensusState {
pub genesis_ts: Timestamp,
/// Genesis block hash
pub genesis_block: blake3::Hash,
/// Last finalized block hash,
pub last_block: blake3::Hash,
/// Last finalized block slot,
pub last_sl: u64,
/// Fork chains containing block proposals
pub proposals: Vec<ProposalChain>,
/// Orphan votes pool, in case a vote reaches a node before the
@@ -57,8 +53,6 @@ impl ConsensusState {
Ok(Self {
genesis_ts,
genesis_block,
last_block: genesis_block,
last_sl: 0,
proposals: vec![],
orphan_votes: vec![],
participants: FxIndexMap::with_hasher(FxBuildHasher::default()),
@@ -225,7 +219,7 @@ impl ValidatorState {
longest_notarized_chain.proposals.last().unwrap().hash()
} else {
self.consensus.last_block
self.blockchain.last()?.unwrap().1
};
Ok(hash)
@@ -337,7 +331,8 @@ impl ValidatorState {
}
}
if proposal.st != self.consensus.last_block || proposal.sl <= self.consensus.last_sl {
let (last_sl, last_block) = self.blockchain.last()?.unwrap();
if proposal.st != last_block || proposal.sl <= last_sl {
debug!("find_extended_chain_index(): Proposal doesn't extend any known chain");
return Ok(-2)
}
@@ -518,13 +513,14 @@ impl ValidatorState {
return Err(e)
}
};
self.consensus.last_block = *blockhashes.last().unwrap();
self.consensus.last_sl = finalized.last().unwrap().sl;
let last_block = *blockhashes.last().unwrap();
let last_sl = finalized.last().unwrap().sl;
let mut dropped = vec![];
for chain in self.consensus.proposals.iter() {
let first = chain.proposals.first().unwrap();
if first.st != self.consensus.last_block || first.sl <= self.consensus.last_sl {
if first.st != last_block || first.sl <= last_sl {
dropped.push(chain.clone());
}
}
@@ -536,7 +532,7 @@ impl ValidatorState {
// Remove orphan votes
let mut orphans = vec![];
for vote in self.consensus.orphan_votes.iter() {
if vote.sl <= self.consensus.last_sl {
if vote.sl <= last_sl {
orphans.push(vote.clone());
}
}