blockchain: Don't be strict on get_blocks_by_slot.

This commit is contained in:
parazyd
2022-04-21 18:15:26 +02:00
parent 797ce2a158
commit 28131b1819
5 changed files with 48 additions and 12 deletions

View File

@@ -1,3 +1,4 @@
use log::warn;
use sled::Batch;
use crate::{
@@ -119,7 +120,8 @@ impl BlockOrderStore {
/// Retrieve all hashes given slots.
pub fn get(&self, slots: &[u64], strict: bool) -> Result<Vec<Option<blake3::Hash>>> {
let mut ret = Vec::with_capacity(slots.len());
//let mut ret = Vec::with_capacity(slots.len());
let mut ret = vec![];
for i in slots {
if let Some(found) = self.0.get(i.to_be_bytes())? {
@@ -128,6 +130,7 @@ impl BlockOrderStore {
ret.push(Some(hash));
} else {
if strict {
warn!("BlockOrderStore::get() Slot {} not found", i);
return Err(Error::SlotNotFound(*i))
}
ret.push(None);

View File

@@ -1,7 +1,7 @@
use sled::Batch;
use crate::{
consensus2::StreamletMetadata,
consensus2::{Block, StreamletMetadata, Timestamp},
util::serial::{deserialize, serialize},
Error, Result,
};
@@ -11,9 +11,26 @@ const SLED_STREAMLET_METADATA_TREE: &[u8] = b"_streamlet_metadata";
pub struct StreamletMetadataStore(sled::Tree);
impl StreamletMetadataStore {
pub fn new(db: &sled::Db) -> Result<Self> {
pub fn new(db: &sled::Db, genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result<Self> {
let tree = db.open_tree(SLED_STREAMLET_METADATA_TREE)?;
Ok(Self(tree))
let store = Self(tree);
// In case the store is empty, add genesis metadata.
if store.0.is_empty() {
let genesis_block = Block::genesis_block(genesis_ts, genesis_data);
let genesis_hash = blake3::hash(&serialize(&genesis_block));
let metadata = StreamletMetadata {
votes: vec![],
notarized: true,
finalized: true,
participants: vec![],
};
store.insert(&[genesis_hash], &[metadata])?;
}
Ok(store)
}
/// Insert [`StreamletMetadata`] into the `MetadataStore`.

View File

@@ -1,5 +1,7 @@
use std::io;
use log::debug;
use crate::{
consensus2::{block::BlockInfo, util::Timestamp, Block, BlockProposal},
impl_vec,
@@ -42,7 +44,7 @@ impl Blockchain {
let blocks = BlockStore::new(db, genesis_ts, genesis_data)?;
let order = BlockOrderStore::new(db, genesis_ts, genesis_data)?;
let transactions = TxStore::new(db)?;
let streamlet_metadata = StreamletMetadataStore::new(db)?;
let streamlet_metadata = StreamletMetadataStore::new(db, genesis_ts, genesis_data)?;
let nullifiers = NullifierStore::new(db)?;
let merkle_roots = RootStore::new(db)?;
@@ -93,11 +95,19 @@ impl Blockchain {
Ok(ret)
}
/// Retrieve blocks by given slots. Fails if any of them are not found.
/// Retrieve blocks by given slots.
pub fn get_blocks_by_slot(&self, slots: &[u64]) -> Result<Vec<BlockInfo>> {
let blockhashes = self.order.get(slots, true)?;
let blockhashes: Vec<blake3::Hash> = blockhashes.iter().map(|x| x.unwrap()).collect();
self.get_blocks_by_hash(&blockhashes)
debug!("get_blocks_by_slot(): {:?}", slots);
let blockhashes = self.order.get(slots, false)?;
let mut hashes = vec![];
for i in blockhashes {
if i.is_some() {
hashes.push(i.unwrap());
}
}
self.get_blocks_by_hash(&hashes)
}
/// Check if the given [`BlockInfo`] is in the database

View File

@@ -60,7 +60,9 @@ impl ProtocolSync {
// Extra validations can be added here
let key = order.sl;
let slot_range: Vec<u64> = (key..=(key + BATCH)).collect();
debug!("ProtocolSync::handle_receive_order(): Querying block range: {:?}", slot_range);
let blocks = self.state.read().await.blockchain.get_blocks_by_slot(&slot_range)?;
debug!("ProtocolSync::handle_receive_order(): Found {} blocks", blocks.len());
let response = BlockResponse { blocks };
self.channel.send(response).await?;
}

View File

@@ -9,8 +9,8 @@ use crate::{
state::ValidatorStatePtr,
},
net::{
ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager,
ProtocolJobsManagerPtr,
ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
ProtocolJobsManager, ProtocolJobsManagerPtr,
},
Result,
};
@@ -23,7 +23,11 @@ pub struct ProtocolSyncForks {
}
impl ProtocolSyncForks {
pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> Result<ProtocolBasePtr> {
pub async fn init(
channel: ChannelPtr,
state: ValidatorStatePtr,
_p2p: P2pPtr,
) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<ForkOrder>().await;