From 2ef8a6fd65c53d09770592d60bfae151304f9a50 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sun, 6 Jun 2021 19:35:35 +0300 Subject: [PATCH] create generic struct RocksColumn to handle different columns in rocksdb --- src/bin/gatewayd.rs | 10 +++-- src/blockchain/mod.rs | 2 +- src/blockchain/rocks.rs | 80 +++++++++++++++++++++++++++++++------ src/blockchain/slabstore.rs | 32 +++++---------- src/service/gateway.rs | 15 ++----- src/service/mod.rs | 1 + 6 files changed, 88 insertions(+), 52 deletions(-) diff --git a/src/bin/gatewayd.rs b/src/bin/gatewayd.rs index 1588f7427..f7df5a768 100644 --- a/src/bin/gatewayd.rs +++ b/src/bin/gatewayd.rs @@ -1,13 +1,14 @@ use std::net::SocketAddr; use std::sync::Arc; +use drk::service::{GatewayService, ProgramOptions}; +use drk::Result; +use drk::blockchain::{Rocks, RocksColumn, rocks::columns}; + extern crate clap; use async_executor::Executor; use easy_parallel::Parallel; -use drk::service::{GatewayService, ProgramOptions}; -use drk::{blockchain::Rocks, Result}; - fn setup_addr(address: Option, default: SocketAddr) -> SocketAddr { match address { Some(addr) => addr, @@ -21,8 +22,9 @@ async fn start(executor: Arc>, options: ProgramOptions) -> Result<( let database_path = options.database_path.as_path(); let rocks = Rocks::new(database_path)?; + let rocks_slabstore_column = RocksColumn::::new(rocks); - let gateway = GatewayService::new(accept_addr, pub_addr, rocks)?; + let gateway = GatewayService::new(accept_addr, pub_addr, rocks_slabstore_column)?; gateway.start(executor.clone()).await?; Ok(()) diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index 717ef54e2..d86dc76ea 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -2,7 +2,7 @@ pub mod slab; pub mod slabstore; pub mod rocks; -pub use rocks::Rocks; +pub use rocks::{Rocks, RocksColumn}; pub use slab::Slab; pub use slabstore::SlabStore; diff --git a/src/blockchain/rocks.rs b/src/blockchain/rocks.rs index d1bb7501d..4c295e8b5 100644 --- a/src/blockchain/rocks.rs +++ b/src/blockchain/rocks.rs @@ -1,5 +1,8 @@ +use std::marker::PhantomData; use std::path::Path; +use async_std::sync::Arc; +use crate::serial::{deserialize, serialize, Decodable, Encodable}; use crate::{Error, Result}; use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, Options, DB}; @@ -16,7 +19,7 @@ pub trait Column { pub mod columns { pub struct Slabs; pub struct Nullifiers; - pub struct MerkleTree; + pub struct MerkleRoots; } impl Column for columns::Slabs { @@ -27,8 +30,8 @@ impl Column for columns::Nullifiers { const NAME: &'static str = "nullifiers"; } -impl Column for columns::MerkleTree { - const NAME: &'static str = "merkletree"; +impl Column for columns::MerkleRoots { + const NAME: &'static str = "merkleroots"; } pub struct Rocks { @@ -36,7 +39,7 @@ pub struct Rocks { } impl Rocks { - pub fn new(path: &Path) -> Result { + pub fn new(path: &Path) -> Result> { // column family options let cf_opts = Options::default(); @@ -47,11 +50,11 @@ impl Rocks { let slab_cf = ColumnFamilyDescriptor::new(columns::Slabs::NAME, cf_opts.clone()); // nullifiers column family let nullifiers_cf = ColumnFamilyDescriptor::new(columns::Nullifiers::NAME, cf_opts.clone()); - // merkletree column family - let merkletree_cf = ColumnFamilyDescriptor::new(columns::MerkleTree::NAME, cf_opts); + // merkleroots column family + let merkleroots_cf = ColumnFamilyDescriptor::new(columns::MerkleRoots::NAME, cf_opts); // column families - let cfs = vec![default_cf, slab_cf, nullifiers_cf, merkletree_cf]; + let cfs = vec![default_cf, slab_cf, nullifiers_cf, merkleroots_cf]; // database options let mut opt = Options::default(); @@ -61,7 +64,7 @@ impl Rocks { // open database with following options and cf let db = DB::open_cf_descriptors(&opt, path, cfs)?; - Ok(Self { db }) + Ok(Arc::new(Self { db })) } pub fn cf_handle(&self) -> Result<&ColumnFamily> @@ -85,10 +88,7 @@ impl Rocks { pub fn key_exist_cf(&self, cf: &ColumnFamily, key: Vec) -> Result { let val = self.db.get_cf(cf, key)?; - if let None = val { - return Ok(false); - }; - Ok(true) + Ok(val.is_some()) } pub fn iterator(&self, cf: &ColumnFamily, iterator_mode: IteratorMode) -> rocksdb::DBIterator { @@ -104,3 +104,59 @@ impl Rocks { Ok(()) } } + +pub struct RocksColumn { + rocks: Arc, + column: PhantomData, +} + +impl RocksColumn { + pub fn new(rocks: Arc) -> RocksColumn { + RocksColumn { + rocks, + column: PhantomData, + } + } + fn cf_handle(&self) -> Result<&ColumnFamily> { + self.rocks.cf_handle::() + } + + pub fn put(&self, key: impl Encodable, value: impl Encodable) -> Result<()> { + let key = serialize(&key); + let value = serialize(&value); + let cf = self.cf_handle()?; + self.rocks.put_cf(cf, key, value)?; + Ok(()) + } + + pub fn get(&self, key: impl Encodable) -> Result>> { + let key = serialize(&key); + let cf = self.cf_handle()?; + let val = self.rocks.get_cf(cf, key)?; + Ok(val) + } + + pub fn get_value_deserialized(&self, key: Vec) -> Result> { + let value = self.get(key)?; + match value { + Some(v) => { + let v: D = deserialize(&v)?; + Ok(Some(v)) + } + None => Ok(None), + } + } + + pub fn key_exist(&self, key: impl Encodable) -> Result { + let key = serialize(&key); + let cf = self.cf_handle()?; + let val = self.rocks.key_exist_cf(cf, key)?; + Ok(val) + } + + pub fn iterator(&self, iterator_mode: IteratorMode) -> Result { + let cf = self.cf_handle()?; + let iter = self.rocks.iterator(cf, iterator_mode); + Ok(iter) + } +} diff --git a/src/blockchain/slabstore.rs b/src/blockchain/slabstore.rs index 26e11293d..6aa6943d0 100644 --- a/src/blockchain/slabstore.rs +++ b/src/blockchain/slabstore.rs @@ -4,34 +4,29 @@ use crate::serial::{deserialize, serialize}; use crate::Result; use super::slab::Slab; -use super::rocks::{columns, IteratorMode, Rocks}; +use super::rocks::{columns, IteratorMode, RocksColumn}; pub struct SlabStore { - rocks: Rocks, + rocks: RocksColumn, } impl SlabStore { - pub fn new(rocks: Rocks) -> Result> { + pub fn new(rocks: RocksColumn) -> Result> { Ok(Arc::new(SlabStore { rocks })) } pub fn get(&self, key: Vec) -> Result>> { - let cf = self.rocks.cf_handle::()?; - let value = self.rocks.get_cf(cf, key)?; + let value = self.rocks.get(key)?; Ok(value) } - pub fn put(&self, value: Vec) -> Result>> { + pub fn put(&self, value: Vec) -> Result> { let slab: Slab = deserialize(&value)?; let last_index = self.get_last_index()?; let key = last_index + 1; if slab.get_index() == key { - let key = serialize(&key); - - let cf = self.rocks.cf_handle::()?; - self.rocks.put_cf(cf, key.clone(), value)?; - + self.rocks.put(key.clone(), value)?; Ok(Some(key)) } else { Ok(None) @@ -39,19 +34,11 @@ impl SlabStore { } pub fn get_value_deserialized(&self, key: Vec) -> Result> { - let value = self.get(key)?; - match value { - Some(v) => { - let v: Slab = deserialize(&v)?; - Ok(Some(v)) - } - None => Ok(None), - } + self.rocks.get_value_deserialized::(key) } pub fn get_last_index(&self) -> Result { - let cf = self.rocks.cf_handle::()?; - let last_index = self.rocks.iterator(cf, IteratorMode::End).next(); + let last_index = self.rocks.iterator(IteratorMode::End)?.next(); match last_index { Some((index, _)) => Ok(deserialize(&index)?), None => Ok(0), @@ -59,8 +46,7 @@ impl SlabStore { } pub fn get_last_index_as_bytes(&self) -> Result> { - let cf = self.rocks.cf_handle::()?; - let last_index = self.rocks.iterator(cf, IteratorMode::End).next(); + let last_index = self.rocks.iterator(IteratorMode::End)?.next(); match last_index { Some((index, _)) => Ok(index.to_vec()), None => Ok(serialize::(&0)), diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 426faea93..81817b674 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -6,8 +6,7 @@ use super::reqrep::{PeerId, Publisher, RepProtocol, Reply, ReqProtocol, Request, use crate::{serial::deserialize, serial::serialize, Error, Result, }; -use crate::blockchain::{Rocks, Slab, SlabStore}; - +use crate::blockchain::{Slab, SlabStore, RocksColumn, rocks::columns}; use async_executor::Executor; use log::*; @@ -37,7 +36,7 @@ impl GatewayService { pub fn new( addr: SocketAddr, pub_addr: SocketAddr, - rocks: Rocks, + rocks: RocksColumn, ) -> Result> { let slabstore = SlabStore::new(rocks)?; @@ -185,7 +184,7 @@ pub struct GatewayClient { } impl GatewayClient { - pub fn new(addr: SocketAddr, rocks: Rocks) -> Result { + pub fn new(addr: SocketAddr, rocks: RocksColumn) -> Result { let protocol = ReqProtocol::new(addr, String::from("GATEWAY CLIENT")); let slabstore = SlabStore::new(rocks)?; @@ -275,12 +274,4 @@ impl GatewayClient { subscriber.start().await?; Ok(subscriber) } - - pub async fn subscribe(mut subscriber: Subscriber, slabstore: Arc) -> Result<()> { - loop { - let slab: Vec; - slab = subscriber.fetch().await?; - slabstore.put(slab)?; - } - } } diff --git a/src/service/mod.rs b/src/service/mod.rs index d7aa71d4a..88558a7d2 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -4,3 +4,4 @@ pub mod reqrep; pub use gateway::{GatewayClient, GatewayService}; pub use options::{ClientProgramOptions, ProgramOptions}; +pub use reqrep::Subscriber;