create generic struct RocksColumn to handle different columns in rocksdb

This commit is contained in:
ghassmo
2021-06-06 19:35:35 +03:00
parent ea7db79afa
commit 2ef8a6fd65
6 changed files with 88 additions and 52 deletions

View File

@@ -1,13 +1,14 @@
use std::net::SocketAddr;
use std::sync::Arc;
use drk::service::{GatewayService, ProgramOptions};
use drk::Result;
use drk::blockchain::{Rocks, RocksColumn, rocks::columns};
extern crate clap;
use async_executor::Executor;
use easy_parallel::Parallel;
use drk::service::{GatewayService, ProgramOptions};
use drk::{blockchain::Rocks, Result};
fn setup_addr(address: Option<SocketAddr>, default: SocketAddr) -> SocketAddr {
match address {
Some(addr) => addr,
@@ -21,8 +22,9 @@ async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<(
let database_path = options.database_path.as_path();
let rocks = Rocks::new(database_path)?;
let rocks_slabstore_column = RocksColumn::<columns::Slabs>::new(rocks);
let gateway = GatewayService::new(accept_addr, pub_addr, rocks)?;
let gateway = GatewayService::new(accept_addr, pub_addr, rocks_slabstore_column)?;
gateway.start(executor.clone()).await?;
Ok(())

View File

@@ -2,7 +2,7 @@ pub mod slab;
pub mod slabstore;
pub mod rocks;
pub use rocks::Rocks;
pub use rocks::{Rocks, RocksColumn};
pub use slab::Slab;
pub use slabstore::SlabStore;

View File

@@ -1,5 +1,8 @@
use std::marker::PhantomData;
use std::path::Path;
use async_std::sync::Arc;
use crate::serial::{deserialize, serialize, Decodable, Encodable};
use crate::{Error, Result};
use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, Options, DB};
@@ -16,7 +19,7 @@ pub trait Column {
pub mod columns {
pub struct Slabs;
pub struct Nullifiers;
pub struct MerkleTree;
pub struct MerkleRoots;
}
impl Column for columns::Slabs {
@@ -27,8 +30,8 @@ impl Column for columns::Nullifiers {
const NAME: &'static str = "nullifiers";
}
impl Column for columns::MerkleTree {
const NAME: &'static str = "merkletree";
impl Column for columns::MerkleRoots {
const NAME: &'static str = "merkleroots";
}
pub struct Rocks {
@@ -36,7 +39,7 @@ pub struct Rocks {
}
impl Rocks {
pub fn new(path: &Path) -> Result<Self> {
pub fn new(path: &Path) -> Result<Arc<Self>> {
// column family options
let cf_opts = Options::default();
@@ -47,11 +50,11 @@ impl Rocks {
let slab_cf = ColumnFamilyDescriptor::new(columns::Slabs::NAME, cf_opts.clone());
// nullifiers column family
let nullifiers_cf = ColumnFamilyDescriptor::new(columns::Nullifiers::NAME, cf_opts.clone());
// merkletree column family
let merkletree_cf = ColumnFamilyDescriptor::new(columns::MerkleTree::NAME, cf_opts);
// merkleroots column family
let merkleroots_cf = ColumnFamilyDescriptor::new(columns::MerkleRoots::NAME, cf_opts);
// column families
let cfs = vec![default_cf, slab_cf, nullifiers_cf, merkletree_cf];
let cfs = vec![default_cf, slab_cf, nullifiers_cf, merkleroots_cf];
// database options
let mut opt = Options::default();
@@ -61,7 +64,7 @@ impl Rocks {
// open database with following options and cf
let db = DB::open_cf_descriptors(&opt, path, cfs)?;
Ok(Self { db })
Ok(Arc::new(Self { db }))
}
pub fn cf_handle<C>(&self) -> Result<&ColumnFamily>
@@ -85,10 +88,7 @@ impl Rocks {
pub fn key_exist_cf(&self, cf: &ColumnFamily, key: Vec<u8>) -> Result<bool> {
let val = self.db.get_cf(cf, key)?;
if let None = val {
return Ok(false);
};
Ok(true)
Ok(val.is_some())
}
pub fn iterator(&self, cf: &ColumnFamily, iterator_mode: IteratorMode) -> rocksdb::DBIterator {
@@ -104,3 +104,59 @@ impl Rocks {
Ok(())
}
}
pub struct RocksColumn<T: Column> {
rocks: Arc<Rocks>,
column: PhantomData<T>,
}
impl<T: Column> RocksColumn<T> {
pub fn new(rocks: Arc<Rocks>) -> RocksColumn<T> {
RocksColumn {
rocks,
column: PhantomData,
}
}
fn cf_handle(&self) -> Result<&ColumnFamily> {
self.rocks.cf_handle::<T>()
}
pub fn put(&self, key: impl Encodable, value: impl Encodable) -> Result<()> {
let key = serialize(&key);
let value = serialize(&value);
let cf = self.cf_handle()?;
self.rocks.put_cf(cf, key, value)?;
Ok(())
}
pub fn get(&self, key: impl Encodable) -> Result<Option<Vec<u8>>> {
let key = serialize(&key);
let cf = self.cf_handle()?;
let val = self.rocks.get_cf(cf, key)?;
Ok(val)
}
pub fn get_value_deserialized<D: Decodable>(&self, key: Vec<u8>) -> Result<Option<D>> {
let value = self.get(key)?;
match value {
Some(v) => {
let v: D = deserialize(&v)?;
Ok(Some(v))
}
None => Ok(None),
}
}
pub fn key_exist(&self, key: impl Encodable) -> Result<bool> {
let key = serialize(&key);
let cf = self.cf_handle()?;
let val = self.rocks.key_exist_cf(cf, key)?;
Ok(val)
}
pub fn iterator(&self, iterator_mode: IteratorMode) -> Result<rocksdb::DBIterator> {
let cf = self.cf_handle()?;
let iter = self.rocks.iterator(cf, iterator_mode);
Ok(iter)
}
}

View File

@@ -4,34 +4,29 @@ use crate::serial::{deserialize, serialize};
use crate::Result;
use super::slab::Slab;
use super::rocks::{columns, IteratorMode, Rocks};
use super::rocks::{columns, IteratorMode, RocksColumn};
pub struct SlabStore {
rocks: Rocks,
rocks: RocksColumn<columns::Slabs>,
}
impl SlabStore {
pub fn new(rocks: Rocks) -> Result<Arc<Self>> {
pub fn new(rocks: RocksColumn<columns::Slabs>) -> Result<Arc<Self>> {
Ok(Arc::new(SlabStore { rocks }))
}
pub fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>> {
let cf = self.rocks.cf_handle::<columns::Slabs>()?;
let value = self.rocks.get_cf(cf, key)?;
let value = self.rocks.get(key)?;
Ok(value)
}
pub fn put(&self, value: Vec<u8>) -> Result<Option<Vec<u8>>> {
pub fn put(&self, value: Vec<u8>) -> Result<Option<u64>> {
let slab: Slab = deserialize(&value)?;
let last_index = self.get_last_index()?;
let key = last_index + 1;
if slab.get_index() == key {
let key = serialize(&key);
let cf = self.rocks.cf_handle::<columns::Slabs>()?;
self.rocks.put_cf(cf, key.clone(), value)?;
self.rocks.put(key.clone(), value)?;
Ok(Some(key))
} else {
Ok(None)
@@ -39,19 +34,11 @@ impl SlabStore {
}
pub fn get_value_deserialized(&self, key: Vec<u8>) -> Result<Option<Slab>> {
let value = self.get(key)?;
match value {
Some(v) => {
let v: Slab = deserialize(&v)?;
Ok(Some(v))
}
None => Ok(None),
}
self.rocks.get_value_deserialized::<Slab>(key)
}
pub fn get_last_index(&self) -> Result<u64> {
let cf = self.rocks.cf_handle::<columns::Slabs>()?;
let last_index = self.rocks.iterator(cf, IteratorMode::End).next();
let last_index = self.rocks.iterator(IteratorMode::End)?.next();
match last_index {
Some((index, _)) => Ok(deserialize(&index)?),
None => Ok(0),
@@ -59,8 +46,7 @@ impl SlabStore {
}
pub fn get_last_index_as_bytes(&self) -> Result<Vec<u8>> {
let cf = self.rocks.cf_handle::<columns::Slabs>()?;
let last_index = self.rocks.iterator(cf, IteratorMode::End).next();
let last_index = self.rocks.iterator(IteratorMode::End)?.next();
match last_index {
Some((index, _)) => Ok(index.to_vec()),
None => Ok(serialize::<u64>(&0)),

View File

@@ -6,8 +6,7 @@ use super::reqrep::{PeerId, Publisher, RepProtocol, Reply, ReqProtocol, Request,
use crate::{serial::deserialize, serial::serialize, Error,
Result,
};
use crate::blockchain::{Rocks, Slab, SlabStore};
use crate::blockchain::{Slab, SlabStore, RocksColumn, rocks::columns};
use async_executor::Executor;
use log::*;
@@ -37,7 +36,7 @@ impl GatewayService {
pub fn new(
addr: SocketAddr,
pub_addr: SocketAddr,
rocks: Rocks,
rocks: RocksColumn<columns::Slabs>,
) -> Result<Arc<GatewayService>> {
let slabstore = SlabStore::new(rocks)?;
@@ -185,7 +184,7 @@ pub struct GatewayClient {
}
impl GatewayClient {
pub fn new(addr: SocketAddr, rocks: Rocks) -> Result<Self> {
pub fn new(addr: SocketAddr, rocks: RocksColumn<columns::Slabs>) -> Result<Self> {
let protocol = ReqProtocol::new(addr, String::from("GATEWAY CLIENT"));
let slabstore = SlabStore::new(rocks)?;
@@ -275,12 +274,4 @@ impl GatewayClient {
subscriber.start().await?;
Ok(subscriber)
}
pub async fn subscribe(mut subscriber: Subscriber, slabstore: Arc<SlabStore>) -> Result<()> {
loop {
let slab: Vec<u8>;
slab = subscriber.fetch().await?;
slabstore.put(slab)?;
}
}
}

View File

@@ -4,3 +4,4 @@ pub mod reqrep;
pub use gateway::{GatewayClient, GatewayService};
pub use options::{ClientProgramOptions, ProgramOptions};
pub use reqrep::Subscriber;