From 28131b1819aeb8611304d3aa7fabef2bf9ad52a2 Mon Sep 17 00:00:00 2001 From: parazyd Date: Thu, 21 Apr 2022 18:15:26 +0200 Subject: [PATCH] blockchain: Don't be strict on get_blocks_by_slot. --- src/blockchain/blockstore.rs | 5 ++++- src/blockchain/metadatastore.rs | 23 ++++++++++++++++++--- src/blockchain/mod.rs | 20 +++++++++++++----- src/consensus2/proto/protocol_sync.rs | 2 ++ src/consensus2/proto/protocol_sync_forks.rs | 10 ++++++--- 5 files changed, 48 insertions(+), 12 deletions(-) diff --git a/src/blockchain/blockstore.rs b/src/blockchain/blockstore.rs index bd509c05f..ba399a89a 100644 --- a/src/blockchain/blockstore.rs +++ b/src/blockchain/blockstore.rs @@ -1,3 +1,4 @@ +use log::warn; use sled::Batch; use crate::{ @@ -119,7 +120,8 @@ impl BlockOrderStore { /// Retrieve all hashes given slots. pub fn get(&self, slots: &[u64], strict: bool) -> Result>> { - let mut ret = Vec::with_capacity(slots.len()); + //let mut ret = Vec::with_capacity(slots.len()); + let mut ret = vec![]; for i in slots { if let Some(found) = self.0.get(i.to_be_bytes())? { @@ -128,6 +130,7 @@ impl BlockOrderStore { ret.push(Some(hash)); } else { if strict { + warn!("BlockOrderStore::get() Slot {} not found", i); return Err(Error::SlotNotFound(*i)) } ret.push(None); diff --git a/src/blockchain/metadatastore.rs b/src/blockchain/metadatastore.rs index 9ffc93273..03a7bcde9 100644 --- a/src/blockchain/metadatastore.rs +++ b/src/blockchain/metadatastore.rs @@ -1,7 +1,7 @@ use sled::Batch; use crate::{ - consensus2::StreamletMetadata, + consensus2::{Block, StreamletMetadata, Timestamp}, util::serial::{deserialize, serialize}, Error, Result, }; @@ -11,9 +11,26 @@ const SLED_STREAMLET_METADATA_TREE: &[u8] = b"_streamlet_metadata"; pub struct StreamletMetadataStore(sled::Tree); impl StreamletMetadataStore { - pub fn new(db: &sled::Db) -> Result { + pub fn new(db: &sled::Db, genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result { let tree = db.open_tree(SLED_STREAMLET_METADATA_TREE)?; - Ok(Self(tree)) + let store = Self(tree); + + // In case the store is empty, add genesis metadata. + if store.0.is_empty() { + let genesis_block = Block::genesis_block(genesis_ts, genesis_data); + let genesis_hash = blake3::hash(&serialize(&genesis_block)); + + let metadata = StreamletMetadata { + votes: vec![], + notarized: true, + finalized: true, + participants: vec![], + }; + + store.insert(&[genesis_hash], &[metadata])?; + } + + Ok(store) } /// Insert [`StreamletMetadata`] into the `MetadataStore`. diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index e83d6f19e..b781c653a 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -1,5 +1,7 @@ use std::io; +use log::debug; + use crate::{ consensus2::{block::BlockInfo, util::Timestamp, Block, BlockProposal}, impl_vec, @@ -42,7 +44,7 @@ impl Blockchain { let blocks = BlockStore::new(db, genesis_ts, genesis_data)?; let order = BlockOrderStore::new(db, genesis_ts, genesis_data)?; let transactions = TxStore::new(db)?; - let streamlet_metadata = StreamletMetadataStore::new(db)?; + let streamlet_metadata = StreamletMetadataStore::new(db, genesis_ts, genesis_data)?; let nullifiers = NullifierStore::new(db)?; let merkle_roots = RootStore::new(db)?; @@ -93,11 +95,19 @@ impl Blockchain { Ok(ret) } - /// Retrieve blocks by given slots. Fails if any of them are not found. + /// Retrieve blocks by given slots. pub fn get_blocks_by_slot(&self, slots: &[u64]) -> Result> { - let blockhashes = self.order.get(slots, true)?; - let blockhashes: Vec = blockhashes.iter().map(|x| x.unwrap()).collect(); - self.get_blocks_by_hash(&blockhashes) + debug!("get_blocks_by_slot(): {:?}", slots); + let blockhashes = self.order.get(slots, false)?; + + let mut hashes = vec![]; + for i in blockhashes { + if i.is_some() { + hashes.push(i.unwrap()); + } + } + + self.get_blocks_by_hash(&hashes) } /// Check if the given [`BlockInfo`] is in the database diff --git a/src/consensus2/proto/protocol_sync.rs b/src/consensus2/proto/protocol_sync.rs index 70a61cb86..aa4833248 100644 --- a/src/consensus2/proto/protocol_sync.rs +++ b/src/consensus2/proto/protocol_sync.rs @@ -60,7 +60,9 @@ impl ProtocolSync { // Extra validations can be added here let key = order.sl; let slot_range: Vec = (key..=(key + BATCH)).collect(); + debug!("ProtocolSync::handle_receive_order(): Querying block range: {:?}", slot_range); let blocks = self.state.read().await.blockchain.get_blocks_by_slot(&slot_range)?; + debug!("ProtocolSync::handle_receive_order(): Found {} blocks", blocks.len()); let response = BlockResponse { blocks }; self.channel.send(response).await?; } diff --git a/src/consensus2/proto/protocol_sync_forks.rs b/src/consensus2/proto/protocol_sync_forks.rs index b8cb8d94c..161bff41b 100644 --- a/src/consensus2/proto/protocol_sync_forks.rs +++ b/src/consensus2/proto/protocol_sync_forks.rs @@ -9,8 +9,8 @@ use crate::{ state::ValidatorStatePtr, }, net::{ - ChannelPtr, MessageSubscription, ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, - ProtocolJobsManagerPtr, + ChannelPtr, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, }, Result, }; @@ -23,7 +23,11 @@ pub struct ProtocolSyncForks { } impl ProtocolSyncForks { - pub async fn init(channel: ChannelPtr, state: ValidatorStatePtr) -> Result { + pub async fn init( + channel: ChannelPtr, + state: ValidatorStatePtr, + _p2p: P2pPtr, + ) -> Result { let msg_subsystem = channel.get_message_subsystem(); msg_subsystem.add_dispatch::().await;