This commit is contained in:
Artem Vorotnikov
2021-01-13 07:04:57 +03:00
parent 4a943d5ac4
commit 2cd0bf7a52
15 changed files with 934 additions and 281 deletions

View File

@@ -12,7 +12,7 @@ arrayref = "0.3"
async-stream = "0.3"
async-trait = "0.1"
bytes = "1"
ethereum = { git = "https://github.com/rust-blockchain/ethereum", branch = "vorot93/dev" }
ethereum = { git = "https://github.com/rust-blockchain/ethereum" }
ethereum-types = "0.10"
futures = "0.3"
hex = "0.4"
@@ -32,3 +32,7 @@ tracing = "0.1"
[build-dependencies]
tonic-build = "0.3"
[dev-dependencies]
bytes-literal = { git = "https://github.com/vorot93/bytes-literal" }
tokio = { version = "1", features = ["full"] }

View File

@@ -1,12 +1,27 @@
use crate::{dbutils::*, Cursor};
use crate::{common, dbutils::*, Cursor};
use anyhow::bail;
use arrayref::array_ref;
use bytes::{Buf, Bytes};
use ethereum_types::H256;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use maplit::hashmap;
use std::{collections::HashMap, lazy::SyncLazy, mem::size_of};
use std::{collections::HashMap, lazy::SyncLazy};
mod account;
mod account_utils;
mod storage;
mod storage_utils;
pub use self::{account::*, account_utils::*, storage::*, storage_utils::*};
#[async_trait]
pub trait Walker2: Send {
fn walk(&mut self, from: u64, to: u64) -> BoxStream<'_, anyhow::Result<(u64, Bytes, Bytes)>>;
fn walk_reverse(
&mut self,
from: u64,
to: u64,
) -> BoxStream<'_, anyhow::Result<(u64, Bytes, Bytes)>>;
async fn find(&mut self, block_number: u64, k: &[u8]) -> anyhow::Result<Option<Bytes>>;
}
pub static MAPPER: SyncLazy<HashMap<Bucket, BucketMap>> = SyncLazy::new(|| {
vec![
@@ -14,17 +29,28 @@ pub static MAPPER: SyncLazy<HashMap<Bucket, BucketMap>> = SyncLazy::new(|| {
Bucket::AccountChangeSet,
BucketMap {
index_bucket: Bucket::AccountsHistory,
walker_adapter: Box::new(|cursor| todo!()),
key_size: H256::len_bytes(),
walker_adapter: Box::new(|c| todo!()),
key_size: common::HASH_LENGTH,
template: "acc-ind-",
new: ChangeSet::new_account,
encode: account::encode_accounts,
decode: Box::new(from_db_format(H256::len_bytes())),
decode: Box::new(from_db_format(common::HASH_LENGTH)),
},
),
(Bucket::StorageChangeSet, todo!()),
(Bucket::PlainAccountChangeSet, todo!()),
(Bucket::PlainStorageChangeSet, todo!()),
// (Bucket::StorageChangeSet, todo!()),
(
Bucket::PlainAccountChangeSet,
BucketMap {
index_bucket: Bucket::AccountsHistory,
walker_adapter: Box::new(|c| todo!()),
key_size: common::ADDRESS_LENGTH,
template: "acc-ind-",
new: ChangeSet::new_account_plain,
encode: account::encode_accounts_plain,
decode: Box::new(from_db_format(common::ADDRESS_LENGTH)),
},
),
// (Bucket::PlainStorageChangeSet, todo!()),
]
.into_iter()
.collect()
@@ -38,48 +64,84 @@ pub struct BucketMap {
pub key_size: usize,
pub template: &'static str,
pub new: fn() -> ChangeSet,
pub encode: fn(
u64,
s: &ChangeSet,
f: Box<dyn Fn(Bytes, Bytes) -> anyhow::Result<()> + Send + Sync>,
) -> anyhow::Result<()>,
pub encode: fn(u64, s: ChangeSet) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + Send>,
pub decode: Box<dyn Fn(Bytes, Bytes) -> (u64, Bytes, Bytes) + Send + Sync>,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Change {
pub key: Bytes,
pub value: Bytes,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct ChangeSet {
pub changes: Vec<Change>,
key_len: usize,
}
impl ChangeSet {
pub fn len(&self) -> usize {
self.changes.len()
}
pub fn is_empty(&self) -> bool {
self.changes.is_empty()
}
pub fn key_size(&self) -> usize {
if self.key_len != 0 {
return self.key_len;
}
self.changes.first().map(|c| c.key.len()).unwrap_or(0)
}
fn check_key_size(&self, key: &[u8]) -> anyhow::Result<()> {
if (self.is_empty() && self.key_size() == 0)
|| (key.len() == self.key_size() && !key.is_empty())
{
Ok(())
} else {
bail!(
"wrong key size in AccountChangeSet: expected {}, actual {}",
self.key_size(),
key.len()
)
}
}
pub fn insert(&mut self, key: Bytes, value: Bytes) -> anyhow::Result<()> {
self.check_key_size(&key)?;
self.changes.push(Change { key, value });
Ok(())
}
}
pub fn from_db_format(addr_size: usize) -> impl Fn(Bytes, Bytes) -> (u64, Bytes, Bytes) {
const BLOCK_N_SIZE: usize = 8;
let st_sz = addr_size + size_of::<Incarnation>() + H256::len_bytes();
let st_sz = addr_size + common::INCARNATION_LENGTH + common::HASH_LENGTH;
move |mut db_key, mut db_value| {
let block_n = u64::from_be_bytes(*array_ref!(db_key, 0, size_of::<u64>()));
move |db_key, mut db_value| {
let block_n = u64::from_be_bytes(*array_ref!(db_key, 0, common::BLOCK_NUMBER_LENGTH));
let k: Bytes;
let mut k: Bytes;
let v: Bytes;
if db_key.len() == 8 {
k = db_value.clone();
v = db_value.split_off(addr_size);
k = db_value;
v = k.split_off(addr_size);
} else {
let mut k_tmp = vec![0_u8; st_sz];
db_key.advance(BLOCK_N_SIZE); // remove block_n bytes
let mut k_tmp = vec![0; st_sz];
let db_key = &db_key[BLOCK_N_SIZE..]; // remove block_n bytes
k_tmp[..db_key.len()].copy_from_slice(&db_key[..]);
k_tmp[db_key.len()..].copy_from_slice(&db_value[..H256::len_bytes()]);
k_tmp[db_key.len()..].copy_from_slice(&db_value[..common::HASH_LENGTH]);
k = k_tmp.into();
v = db_value.split_off(H256::len_bytes());
v = db_value.split_off(common::HASH_LENGTH);
}
(block_n, k, v)

View File

@@ -1,18 +1,147 @@
use self::account_utils::{encode_accounts_2, find_in_account_changeset};
pub use super::*;
use crate::CursorDupSort;
use async_trait::async_trait;
pub type EncodedStream = Box<dyn Iterator<Item = (Bytes, Bytes)> + Send>;
pub type Encoder = fn(u64, ChangeSet) -> EncodedStream;
pub type Decoder = fn(Bytes, Bytes) -> (u64, Bytes, Bytes);
/* Hashed changesets (key is a hash of common.Address) */
impl ChangeSet {
pub fn new_account() -> Self {
Self {
changes: vec![],
key_len: H256::len_bytes(),
key_len: common::HASH_LENGTH,
}
}
}
pub fn encode_accounts(
block_n: u64,
s: &ChangeSet,
f: Box<dyn Fn(Bytes, Bytes) -> anyhow::Result<()> + Send + Sync>,
) -> anyhow::Result<()> {
todo!()
pub fn encode_accounts(block_n: u64, s: ChangeSet) -> EncodedStream {
Box::new(encode_accounts_2(block_n, s))
}
pub struct AccountChangeSet<C: CursorDupSort> {
pub c: C,
}
#[async_trait]
impl<C: CursorDupSort> Walker2 for AccountChangeSet<C> {
fn walk(&mut self, from: u64, to: u64) -> BoxStream<'_, anyhow::Result<(u64, Bytes, Bytes)>> {
super::storage_utils::walk(&mut self.c, from, to, common::HASH_LENGTH)
}
fn walk_reverse(
&mut self,
from: u64,
to: u64,
) -> BoxStream<'_, anyhow::Result<(u64, Bytes, Bytes)>> {
super::storage_utils::walk_reverse(&mut self.c, from, to, common::HASH_LENGTH)
}
async fn find(&mut self, block_number: u64, k: &[u8]) -> anyhow::Result<Option<Bytes>> {
find_in_account_changeset(&mut self.c, block_number, k, common::HASH_LENGTH).await
}
}
/* Plain changesets (key is a common.Address) */
impl ChangeSet {
pub fn new_account_plain() -> Self {
Self {
changes: vec![],
key_len: common::ADDRESS_LENGTH,
}
}
}
#[allow(non_upper_case_globals)]
pub const encode_accounts_plain: Encoder = encode_accounts;
pub struct AccountChangeSetPlain<C: CursorDupSort> {
pub c: C,
}
#[async_trait]
impl<C: CursorDupSort> Walker2 for AccountChangeSetPlain<C> {
fn walk(&mut self, from: u64, to: u64) -> BoxStream<'_, anyhow::Result<(u64, Bytes, Bytes)>> {
super::storage_utils::walk(&mut self.c, from, to, common::ADDRESS_LENGTH)
}
fn walk_reverse(
&mut self,
from: u64,
to: u64,
) -> BoxStream<'_, anyhow::Result<(u64, Bytes, Bytes)>> {
super::storage_utils::walk_reverse(&mut self.c, from, to, common::ADDRESS_LENGTH)
}
async fn find(&mut self, block_number: u64, k: &[u8]) -> anyhow::Result<Option<Bytes>> {
find_in_account_changeset(&mut self.c, block_number, k, common::ADDRESS_LENGTH).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dbutils;
use bytes_literal::bytes;
use ethereum_types::{Address, H256};
use sha3::{Digest, Keccak256};
#[test]
fn encoding_account_hashed() {
let m = &MAPPER[&dbutils::Bucket::AccountChangeSet];
run_test_account_encoding(true, m.new, m.encode, &m.decode);
}
#[test]
fn encoding_account_plain() {
let m = &MAPPER[&dbutils::Bucket::PlainAccountChangeSet];
run_test_account_encoding(false, m.new, m.encode, &m.decode)
}
#[tokio::main]
async fn run_test_account_encoding<
Decoder: Fn(Bytes, Bytes) -> (u64, Bytes, Bytes) + Send + Sync,
>(
is_hashed: bool,
new: fn() -> ChangeSet,
enc: Encoder,
dec: Decoder,
) {
let mut ch = (new)();
for (i, val) in vec![
bytes!["f7f6db1eb17c6d582078e0ffdd0c"],
bytes!["b1e9b5c16355eede662031dd621d08faf4ea"],
bytes!["862cf52b74f1cea41ddd8ffa4b3e7c7790"],
]
.into_iter()
.enumerate()
{
let address = format!("0xBe828AD8B538D1D691891F6c725dEdc5989abBc{}", i)
.parse::<Address>()
.unwrap();
if is_hashed {
let addr_hash = common::hash_data(address.as_bytes());
ch.insert(addr_hash.as_bytes().to_vec().into(), val)
.unwrap();
} else {
ch.insert(address.as_bytes().to_vec().into(), val).unwrap();
}
}
let mut ch2 = (new)();
for (k, v) in (enc)(1, ch.clone()) {
let (_, k, v) = (dec)(k, v);
ch2.insert(k, v).unwrap();
}
assert_eq!(ch, ch2);
}
}

View File

@@ -0,0 +1,45 @@
use super::{from_db_format, ChangeSet};
use crate::{dbutils, CursorDupSort};
use bytes::{Bytes, BytesMut};
pub async fn find_in_account_changeset<C: CursorDupSort>(
c: &mut C,
block_number: u64,
key: &[u8],
key_len: usize,
) -> anyhow::Result<Option<Bytes>> {
let from_db_format = from_db_format(key_len);
let (k, v) = c
.seek_both_range(&dbutils::encode_block_number(block_number), key)
.await?;
if k.is_empty() {
return Ok(None);
}
let (_, k, v) = from_db_format(k, v);
if !k.starts_with(key) {
return Ok(None);
}
Ok(Some(v))
}
pub fn encode_accounts_2(
block_number: u64,
mut s: ChangeSet,
) -> impl Iterator<Item = (Bytes, Bytes)> {
s.changes.sort_unstable();
let new_k = Bytes::copy_from_slice(&dbutils::encode_block_number(block_number));
s.changes.into_iter().map(move |cs| {
let mut new_v = BytesMut::with_capacity(cs.key.len() + cs.value.len());
new_v.extend_from_slice(&*cs.key);
new_v.extend_from_slice(&*cs.value);
(new_k.clone(), new_v.freeze())
})
}

218
src/changeset/storage.rs Normal file
View File

@@ -0,0 +1,218 @@
use super::*;
use crate::{common, CursorDupSort};
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use std::pin::Pin;
/* Hashed changesets (key is a hash of common.Address) */
impl ChangeSet {
pub fn new_storage() -> Self {
Self {
changes: vec![],
key_len: common::HASH_LENGTH + common::HASH_LENGTH + common::INCARNATION_LENGTH,
}
}
}
pub fn encode_storage(
block_n: u64,
s: ChangeSet,
) -> Pin<Box<dyn Iterator<Item = (Bytes, Bytes)> + Send>> {
Box::pin(encode_storage_2(block_n, s, common::HASH_LENGTH))
}
pub struct StorageChangeSet<C: CursorDupSort> {
pub c: C,
}
#[async_trait]
impl<C: CursorDupSort> Walker2 for StorageChangeSet<C> {
fn walk<'w>(
&'w mut self,
from: u64,
to: u64,
) -> BoxStream<'w, anyhow::Result<(u64, Bytes, Bytes)>> {
super::storage_utils::walk(&mut self.c, from, to, common::HASH_LENGTH)
}
fn walk_reverse<'w>(
&'w mut self,
from: u64,
to: u64,
) -> BoxStream<'w, anyhow::Result<(u64, Bytes, Bytes)>> {
super::storage_utils::walk_reverse(&mut self.c, from, to, common::HASH_LENGTH)
}
async fn find(&mut self, block_number: u64, k: &[u8]) -> anyhow::Result<Option<Bytes>> {
find_without_incarnation_in_storage_changeset_2(
&mut self.c,
block_number,
common::HASH_LENGTH,
&k[..common::HASH_LENGTH],
&k[common::HASH_LENGTH..],
)
.await
}
}
impl<C: CursorDupSort> StorageChangeSet<C> {
pub async fn find_with_incarnation(
&mut self,
block_number: u64,
k: &[u8],
) -> anyhow::Result<Option<Bytes>> {
find_in_storage_changeset_2(&mut self.c, block_number, common::HASH_LENGTH, k).await
}
pub async fn find_without_incarnation(
&mut self,
block_number: u64,
addr_hash_to_find: &[u8],
key_hash_to_find: &[u8],
) -> anyhow::Result<Option<Bytes>> {
find_without_incarnation_in_storage_changeset_2(
&mut self.c,
block_number,
common::HASH_LENGTH,
addr_hash_to_find,
key_hash_to_find,
)
.await
}
}
/* Plain changesets (key is a common.Address) */
impl ChangeSet {
pub fn new_storage_plain() -> Self {
Self {
changes: vec![],
key_len: common::ADDRESS_LENGTH + common::HASH_LENGTH + common::INCARNATION_LENGTH,
}
}
}
pub fn encode_storage_plain(
block_n: u64,
s: ChangeSet,
) -> Pin<Box<dyn Iterator<Item = (Bytes, Bytes)> + Send>> {
Box::pin(encode_storage_2(block_n, s, common::ADDRESS_LENGTH))
}
pub struct StorageChangeSetPlain<C: CursorDupSort> {
pub c: C,
}
#[async_trait]
impl<C: CursorDupSort> Walker2 for StorageChangeSetPlain<C> {
fn walk<'w>(
&'w mut self,
from: u64,
to: u64,
) -> BoxStream<'w, anyhow::Result<(u64, Bytes, Bytes)>> {
super::storage_utils::walk(&mut self.c, from, to, common::ADDRESS_LENGTH)
}
fn walk_reverse<'w>(
&'w mut self,
from: u64,
to: u64,
) -> BoxStream<'w, anyhow::Result<(u64, Bytes, Bytes)>> {
super::storage_utils::walk_reverse(&mut self.c, from, to, common::ADDRESS_LENGTH)
}
async fn find(&mut self, block_number: u64, k: &[u8]) -> anyhow::Result<Option<Bytes>> {
find_without_incarnation_in_storage_changeset_2(
&mut self.c,
block_number,
common::ADDRESS_LENGTH,
&k[..common::ADDRESS_LENGTH],
&k[common::ADDRESS_LENGTH..],
)
.await
}
}
impl<C: CursorDupSort> StorageChangeSetPlain<C> {
pub async fn find_with_incarnation(
&mut self,
block_number: u64,
k: &[u8],
) -> anyhow::Result<Option<Bytes>> {
find_in_storage_changeset_2(&mut self.c, block_number, common::ADDRESS_LENGTH, k).await
}
pub async fn find_without_incarnation(
&mut self,
block_number: u64,
addr_hash_to_find: &[u8],
key_hash_to_find: &[u8],
) -> anyhow::Result<Option<Bytes>> {
find_without_incarnation_in_storage_changeset_2(
&mut self.c,
block_number,
common::ADDRESS_LENGTH,
addr_hash_to_find,
key_hash_to_find,
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
fn get_test_data_at_index<
KeyGen: Fn(common::Address, common::Incarnation, common::Hash) -> Bytes,
>(
i: usize,
j: usize,
inc: common::Incarnation,
generator: &KeyGen,
) -> Bytes {
let address = format!("0xBe828AD8B538D1D691891F6c725dEdc5989abBc{}", i)
.parse()
.unwrap();
let key = common::hash_data(format!("key{}", j).as_bytes());
(generator)(address, inc, key)
}
fn do_test_encoding_storage_new<
KeyGen: Fn(common::Address, common::Incarnation, common::Hash) -> Bytes,
IncarnationGen: Fn() -> common::Incarnation,
ValueGen: Fn(usize) -> Bytes,
NewFunc: Fn() -> ChangeSet,
>(
key_gen: &KeyGen,
incarnation_generator: &IncarnationGen,
value_generator: &ValueGen,
new_func: &NewFunc,
encode_func: &Encoder,
decode_func: &Decoder,
) {
let f = move |num_of_elements, num_of_keys| {
let mut ch = (new_func)();
for i in 0..num_of_elements {
let inc = (incarnation_generator)();
for j in 0..num_of_keys {
let key = get_test_data_at_index(i, j, inc, key_gen);
let val = (value_generator)(j);
ch.insert(key, val).unwrap();
}
}
let mut ch2 = (new_func)();
for (k, v) in (encode_func)(0, ch.clone()) {
let (_, k, v) = (decode_func)(k, v);
ch2.insert(k, v).unwrap();
}
assert_eq!(ch, ch2)
};
}
}

View File

@@ -0,0 +1,172 @@
use super::*;
use crate::{common, dbutils, CursorDupSort};
use async_stream::try_stream;
use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::BoxStream;
use std::io::Write;
pub fn walk_reverse<C: CursorDupSort>(
c: &mut C,
from: u64,
to: u64,
key_prefix_len: usize,
) -> BoxStream<'_, anyhow::Result<(u64, Bytes, Bytes)>> {
Box::pin(try_stream! {
let from_db_format = from_db_format(key_prefix_len);
let _ = c.seek(&dbutils::encode_block_number(to + 1)).await?;
loop {
let (k, v) = c.prev().await?;
if k.is_empty() {
break;
}
let (block_num, k, v) = (from_db_format)(k, v);
if block_num < from {
break;
}
yield (block_num, k, v);
}
})
}
pub fn walk<C: CursorDupSort>(
c: &mut C,
from: u64,
to: u64,
key_prefix_len: usize,
) -> BoxStream<'_, anyhow::Result<(u64, Bytes, Bytes)>> {
Box::pin(try_stream! {
let from_db_format = from_db_format(key_prefix_len);
let (mut k, mut v) = c.seek(&dbutils::encode_block_number(from)).await?;
loop {
if k.is_empty() {
break;
}
let (block_num, k1, v1) = from_db_format(k, v);
if block_num > to {
break;
}
yield (block_num, k1, v1);
(k, v) = c.next().await?
}
})
}
pub async fn find_in_storage_changeset_2<C: CursorDupSort>(
c: &mut C,
block_number: u64,
key_prefix_len: usize,
k: &[u8],
) -> anyhow::Result<Option<Bytes>> {
do_search_2(
c,
block_number,
key_prefix_len,
&k[..key_prefix_len],
&k[key_prefix_len + common::INCARNATION_LENGTH
..key_prefix_len + common::HASH_LENGTH + common::INCARNATION_LENGTH],
u64::from_be_bytes(*array_ref!(&k[key_prefix_len..], 0, 8)),
)
.await
}
pub async fn find_without_incarnation_in_storage_changeset_2<C: CursorDupSort>(
c: &mut C,
block_number: u64,
key_prefix_len: usize,
addr_bytes_to_find: &[u8],
key_bytes_to_find: &[u8],
) -> anyhow::Result<Option<Bytes>> {
do_search_2(
c,
block_number,
key_prefix_len,
addr_bytes_to_find,
key_bytes_to_find,
0,
)
.await
}
pub async fn do_search_2<C: CursorDupSort>(
c: &mut C,
block_number: u64,
key_prefix_len: usize,
addr_bytes_to_find: &[u8],
key_bytes_to_find: &[u8],
incarnation: u64,
) -> anyhow::Result<Option<Bytes>> {
let from_db_format = from_db_format(key_prefix_len);
if incarnation == 0 {
let mut seek = vec![0; 8 + key_prefix_len];
seek[..].as_mut().write(&block_number.to_be_bytes());
seek[8..].as_mut().write(addr_bytes_to_find).unwrap();
let (mut k, mut v) = c.seek(&*seek).await?;
loop {
if k.is_empty() {
break;
}
let (_, k1, v1) = (from_db_format)(k, v);
if !k1.starts_with(addr_bytes_to_find) {
break;
}
let st_hash = &k1[key_prefix_len + common::INCARNATION_LENGTH..];
if st_hash == key_bytes_to_find {
return Ok(Some(v1));
}
(k, v) = c.next().await?
}
return Ok(None);
}
let mut seek = vec![0; 8 + key_prefix_len + common::INCARNATION_LENGTH];
seek[..8].copy_from_slice(&block_number.to_be_bytes());
seek[8..].as_mut().write(addr_bytes_to_find).unwrap();
seek[8 + key_prefix_len..].copy_from_slice(&incarnation.to_be_bytes());
let (k, v) = c.seek_both_range(&seek, key_bytes_to_find).await?;
if k.is_empty() {
return Ok(None);
}
if !v.starts_with(key_bytes_to_find) {
return Ok(None);
}
let (_, _, v) = (from_db_format)(k, v);
Ok(Some(v))
}
pub fn encode_storage_2(
block_n: u64,
mut s: ChangeSet,
key_prefix_len: usize,
) -> impl Iterator<Item = (Bytes, Bytes)> {
s.changes.sort_unstable();
let key_part = key_prefix_len + common::INCARNATION_LENGTH;
s.changes.into_iter().map(move |cs| {
let mut new_k = BytesMut::with_capacity(common::BLOCK_NUMBER_LENGTH + key_part);
new_k.put_u64(block_n);
new_k.put_slice(&cs.key[..key_part]);
let mut new_v = BytesMut::with_capacity(common::HASH_LENGTH + cs.value.len());
new_v.put_slice(&cs.key[key_part..]);
new_v.put_slice(&cs.value[..]);
(new_k.freeze(), new_v.freeze())
})
}

16
src/common.rs Normal file
View File

@@ -0,0 +1,16 @@
use ethereum_types::H256;
use sha3::{Digest, Keccak256};
use std::mem::size_of;
pub use ethereum_types::Address;
pub type Hash = H256;
pub type Incarnation = u64;
pub const HASH_LENGTH: usize = Hash::len_bytes();
pub const ADDRESS_LENGTH: usize = Address::len_bytes();
pub const BLOCK_NUMBER_LENGTH: usize = size_of::<u64>();
pub const INCARNATION_LENGTH: usize = size_of::<u64>();
pub fn hash_data(data: &[u8]) -> Hash {
Hash::from_slice(&Keccak256::digest(data)[..])
}

View File

@@ -2,222 +2,10 @@ use ethereum_types::H256;
use maplit::hashmap;
use std::{collections::HashMap, fmt::Display, mem::size_of};
pub type BlockNumber = u64;
pub type Incarnation = u64;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum Bucket {
PlainState,
PlainContractCode,
PlainAccountChangeSet,
PlainStorageChangeSet,
CurrentState,
AccountsHistory,
StorageHistory,
Code,
ContractCode,
IncarnationMap,
AccountChangeSet,
StorageChangeSet,
IntermediateTrieHash,
DatabaseInfo,
SnapshotInfo,
}
impl AsRef<str> for Bucket {
fn as_ref(&self) -> &str {
match self {
Self::PlainState => "PLAIN-CST2",
Self::PlainContractCode => "PLAIN-contractCode",
Self::PlainAccountChangeSet => "PLAIN-ACS",
Self::PlainStorageChangeSet => "PLAIN-SCS",
Self::CurrentState => "CST2",
Self::AccountsHistory => "hAT",
Self::StorageHistory => "hST",
Self::Code => "CODE",
Self::ContractCode => "contractCode",
Self::IncarnationMap => "incarnationMap",
Self::AccountChangeSet => "ACS",
Self::StorageChangeSet => "SCS",
Self::IntermediateTrieHash => "iTh2",
Self::DatabaseInfo => "DBINFO",
Self::SnapshotInfo => "SNINFO",
}
}
}
impl Display for Bucket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_ref())
}
}
pub type BucketFlags = u8;
pub type DBI = u8;
pub type CustomComparator = &'static str;
const NUMBER_HASH_COMPOSITE_LEN: usize = size_of::<u64>() + H256::len_bytes();
pub type NumberHashCompositeKey = [u8; NUMBER_HASH_COMPOSITE_LEN];
pub type HeaderHashKey = [u8; size_of::<u64>() + 1];
pub type HeaderTDKey = [u8; NUMBER_HASH_COMPOSITE_LEN + 1];
#[derive(Clone, Copy, Debug)]
pub enum SyncStage {
Headers,
BlockHashes,
Bodies,
Senders,
Execution,
IntermediateHashes,
HashState,
AccountHistoryIndex,
StorageHistoryIndex,
LogIndex,
CallTraces,
TxLookup,
TxPool,
Finish,
}
impl AsRef<[u8]> for SyncStage {
fn as_ref(&self) -> &[u8] {
match self {
Self::Headers => "Headers",
Self::BlockHashes => "BlockHashes",
Self::Bodies => "Bodies",
Self::Senders => "Senders",
Self::Execution => "Execution",
Self::IntermediateHashes => "IntermediateHashes",
Self::HashState => "HashState",
Self::AccountHistoryIndex => "AccountHistoryIndex",
Self::StorageHistoryIndex => "StorageHistoryIndex",
Self::LogIndex => "LogIndex",
Self::CallTraces => "CallTraces",
Self::TxLookup => "TxLookup",
Self::TxPool => "TxPool",
Self::Finish => "Finish",
}
.as_bytes()
}
}
pub enum BucketFlag {
Default = 0x00,
ReverseKey = 0x02,
DupSort = 0x04,
IntegerKey = 0x08,
DupFixed = 0x10,
IntegerDup = 0x20,
ReverseDup = 0x40,
}
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
pub const HEADER_PREFIX: &str = "h"; // block_num_u64 + hash -> header
pub const HEADER_TD_SUFFIX: &str = "t"; // block_num_u64 + hash + headerTDSuffix -> td
pub const HEADER_HASH_SUFFIX: &str = "n"; // block_num_u64 + headerHashSuffix -> hash
pub const HEADER_NUMBER_PREFIX: &str = "H"; // headerNumberPrefix + hash -> num (uint64 big endian)
pub const BLOCK_BODY_PREFIX: &str = "b"; // block_num_u64 + hash -> block body
pub const ETH_TX: &str = "eth_tx"; // tbl_sequence_u64 -> rlp(tx)
pub const BLOCK_RECEIPTS_PREFIX: &str = "r"; // block_num_u64 + hash -> block receipts
pub const LOG: &str = "log"; // block_num_u64 + hash -> block receipts
pub const CONFIG_PREFIX: &str = "ethereum-config-";
pub const SYNC_STAGE_PROGRESS: &str = "SSP2";
#[derive(Clone, Copy, Default)]
pub struct BucketConfigItem {
pub flags: BucketFlags,
// AutoDupSortKeysConversion - enables some keys transformation - to change db layout without changing app code.
// Use it wisely - it helps to do experiments with DB format faster, but better reduce amount of Magic in app.
// If good DB format found, push app code to accept this format and then disable this property.
pub auto_dup_sort_keys_conversion: bool,
pub is_deprecated: bool,
pub dbi: DBI,
// DupFromLen - if user provide key of this length, then next transformation applied:
// v = append(k[DupToLen:], v...)
// k = k[:DupToLen]
// And opposite at retrieval
// Works only if AutoDupSortKeysConversion enabled
pub dup_from_len: u8,
pub dup_to_len: u8,
pub dup_fixed_size: u8,
pub custom_comparator: CustomComparator,
pub custom_dup_comparator: CustomComparator,
}
pub fn buckets_configs() -> HashMap<&'static str, BucketConfigItem> {
hashmap! {
"CurrentStateBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
auto_dup_sort_keys_conversion: true,
dup_from_len: 72,
dup_to_len: 40,
..Default::default()
},
"PlainAccountChangeSetBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
..Default::default()
},
"PlainStorageChangeSetBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
..Default::default()
},
"AccountChangeSetBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
..Default::default()
},
"StorageChangeSetBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
..Default::default()
},
"PlainStateBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
auto_dup_sort_keys_conversion: true,
dup_from_len: 60,
dup_to_len: 28,
..Default::default()
},
"IntermediateTrieHashBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
custom_dup_comparator: "dup_cmp_suffix32",
..Default::default()
},
}
}
pub fn number_hash_composite_key(number: u64, hash: H256) -> NumberHashCompositeKey {
let mut v: NumberHashCompositeKey = [0; NUMBER_HASH_COMPOSITE_LEN];
const SEPARATOR: usize = size_of::<u64>();
v[..SEPARATOR].copy_from_slice(&number.to_be_bytes());
v[SEPARATOR..].copy_from_slice(&hash.to_fixed_bytes());
v
}
pub fn header_hash_key(block: u64) -> HeaderHashKey {
let mut v: HeaderHashKey = Default::default();
const SEPARATOR: usize = size_of::<u64>();
v[..SEPARATOR].copy_from_slice(&block.to_be_bytes());
v[SEPARATOR..].copy_from_slice(HEADER_HASH_SUFFIX.as_bytes());
v
}
pub fn header_td_key(number: u64, hash: H256) -> HeaderTDKey {
let mut v: HeaderTDKey = [0; NUMBER_HASH_COMPOSITE_LEN + 1];
v[..NUMBER_HASH_COMPOSITE_LEN].copy_from_slice(&number_hash_composite_key(number, hash));
v[NUMBER_HASH_COMPOSITE_LEN..].copy_from_slice(HEADER_TD_SUFFIX.as_bytes());
v
}
mod bucket;
mod composite_keys;
mod helper;
pub use self::{bucket::*, composite_keys::*, helper::*};
pub fn bytes_mask(fixed_bits: u64) -> (u64, u8) {
let fixed_bytes = (fixed_bits + 7) / 8;
@@ -228,11 +16,3 @@ pub fn bytes_mask(fixed_bits: u64) -> (u64, u8) {
}
(fixed_bytes, mask)
}
pub fn change_set_by_index_bucket(storage: bool) -> (Bucket, usize) {
if storage {
(Bucket::PlainStorageChangeSet, 60)
} else {
(Bucket::PlainAccountChangeSet, 20)
}
}

181
src/dbutils/bucket.rs Normal file
View File

@@ -0,0 +1,181 @@
use super::*;
use ethereum_types::H256;
use maplit::hashmap;
use std::{collections::HashMap, fmt::Display, mem::size_of};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum Bucket {
PlainState,
PlainContractCode,
PlainAccountChangeSet,
PlainStorageChangeSet,
CurrentState,
AccountsHistory,
StorageHistory,
Code,
ContractCode,
IncarnationMap,
AccountChangeSet,
StorageChangeSet,
IntermediateTrieHash,
DatabaseInfo,
SnapshotInfo,
}
impl AsRef<str> for Bucket {
fn as_ref(&self) -> &str {
match self {
Self::PlainState => "PLAIN-CST2",
Self::PlainContractCode => "PLAIN-contractCode",
Self::PlainAccountChangeSet => "PLAIN-ACS",
Self::PlainStorageChangeSet => "PLAIN-SCS",
Self::CurrentState => "CST2",
Self::AccountsHistory => "hAT",
Self::StorageHistory => "hST",
Self::Code => "CODE",
Self::ContractCode => "contractCode",
Self::IncarnationMap => "incarnationMap",
Self::AccountChangeSet => "ACS",
Self::StorageChangeSet => "SCS",
Self::IntermediateTrieHash => "iTh2",
Self::DatabaseInfo => "DBINFO",
Self::SnapshotInfo => "SNINFO",
}
}
}
impl Display for Bucket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_ref())
}
}
pub type BucketFlags = u8;
pub type DBI = u8;
pub type CustomComparator = &'static str;
#[derive(Clone, Copy, Debug)]
pub enum SyncStage {
Headers,
BlockHashes,
Bodies,
Senders,
Execution,
IntermediateHashes,
HashState,
AccountHistoryIndex,
StorageHistoryIndex,
LogIndex,
CallTraces,
TxLookup,
TxPool,
Finish,
}
impl AsRef<[u8]> for SyncStage {
fn as_ref(&self) -> &[u8] {
match self {
Self::Headers => "Headers",
Self::BlockHashes => "BlockHashes",
Self::Bodies => "Bodies",
Self::Senders => "Senders",
Self::Execution => "Execution",
Self::IntermediateHashes => "IntermediateHashes",
Self::HashState => "HashState",
Self::AccountHistoryIndex => "AccountHistoryIndex",
Self::StorageHistoryIndex => "StorageHistoryIndex",
Self::LogIndex => "LogIndex",
Self::CallTraces => "CallTraces",
Self::TxLookup => "TxLookup",
Self::TxPool => "TxPool",
Self::Finish => "Finish",
}
.as_bytes()
}
}
pub enum BucketFlag {
Default = 0x00,
ReverseKey = 0x02,
DupSort = 0x04,
IntegerKey = 0x08,
DupFixed = 0x10,
IntegerDup = 0x20,
ReverseDup = 0x40,
}
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
pub const HEADER_PREFIX: &str = "h"; // block_num_u64 + hash -> header
pub const HEADER_TD_SUFFIX: &str = "t"; // block_num_u64 + hash + headerTDSuffix -> td
pub const HEADER_HASH_SUFFIX: &str = "n"; // block_num_u64 + headerHashSuffix -> hash
pub const HEADER_NUMBER_PREFIX: &str = "H"; // headerNumberPrefix + hash -> num (uint64 big endian)
pub const BLOCK_BODY_PREFIX: &str = "b"; // block_num_u64 + hash -> block body
pub const ETH_TX: &str = "eth_tx"; // tbl_sequence_u64 -> rlp(tx)
pub const BLOCK_RECEIPTS_PREFIX: &str = "r"; // block_num_u64 + hash -> block receipts
pub const LOG: &str = "log"; // block_num_u64 + hash -> block receipts
pub const CONFIG_PREFIX: &str = "ethereum-config-";
pub const SYNC_STAGE_PROGRESS: &str = "SSP2";
#[derive(Clone, Copy, Default)]
pub struct BucketConfigItem {
pub flags: BucketFlags,
// AutoDupSortKeysConversion - enables some keys transformation - to change db layout without changing app code.
// Use it wisely - it helps to do experiments with DB format faster, but better reduce amount of Magic in app.
// If good DB format found, push app code to accept this format and then disable this property.
pub auto_dup_sort_keys_conversion: bool,
pub is_deprecated: bool,
pub dbi: DBI,
// DupFromLen - if user provide key of this length, then next transformation applied:
// v = append(k[DupToLen:], v...)
// k = k[:DupToLen]
// And opposite at retrieval
// Works only if AutoDupSortKeysConversion enabled
pub dup_from_len: u8,
pub dup_to_len: u8,
pub dup_fixed_size: u8,
pub custom_comparator: CustomComparator,
pub custom_dup_comparator: CustomComparator,
}
pub fn buckets_configs() -> HashMap<&'static str, BucketConfigItem> {
hashmap! {
"CurrentStateBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
auto_dup_sort_keys_conversion: true,
dup_from_len: 72,
dup_to_len: 40,
..Default::default()
},
"PlainAccountChangeSetBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
..Default::default()
},
"PlainStorageChangeSetBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
..Default::default()
},
"AccountChangeSetBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
..Default::default()
},
"StorageChangeSetBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
..Default::default()
},
"PlainStateBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
auto_dup_sort_keys_conversion: true,
dup_from_len: 60,
dup_to_len: 28,
..Default::default()
},
"IntermediateTrieHashBucket" => BucketConfigItem {
flags: BucketFlag::DupSort as u8,
custom_dup_comparator: "dup_cmp_suffix32",
..Default::default()
},
}
}

View File

@@ -0,0 +1,40 @@
use super::*;
use crate::common;
use ethereum_types::H256;
use std::mem::size_of;
const NUMBER_HASH_COMPOSITE_LEN: usize = common::BLOCK_NUMBER_LENGTH + common::HASH_LENGTH;
pub type NumberHashCompositeKey = [u8; NUMBER_HASH_COMPOSITE_LEN];
pub type HeaderHashKey = [u8; size_of::<u64>() + 1];
pub type HeaderTDKey = [u8; NUMBER_HASH_COMPOSITE_LEN + 1];
#[allow(non_upper_case_globals)]
pub const encode_block_number: fn(u64) -> [u8; 8] = u64::to_be_bytes;
pub fn number_hash_composite_key(number: u64, hash: H256) -> NumberHashCompositeKey {
let mut v: NumberHashCompositeKey = [0; NUMBER_HASH_COMPOSITE_LEN];
v[..common::BLOCK_NUMBER_LENGTH].copy_from_slice(&encode_block_number(number));
v[common::BLOCK_NUMBER_LENGTH..].copy_from_slice(&hash.to_fixed_bytes());
v
}
pub fn header_hash_key(block: u64) -> HeaderHashKey {
let mut v: HeaderHashKey = Default::default();
v[..common::BLOCK_NUMBER_LENGTH].copy_from_slice(&encode_block_number(block));
v[common::BLOCK_NUMBER_LENGTH..].copy_from_slice(HEADER_HASH_SUFFIX.as_bytes());
v
}
pub fn header_td_key(number: u64, hash: H256) -> HeaderTDKey {
let mut v: HeaderTDKey = [0; NUMBER_HASH_COMPOSITE_LEN + 1];
v[..NUMBER_HASH_COMPOSITE_LEN].copy_from_slice(&number_hash_composite_key(number, hash));
v[NUMBER_HASH_COMPOSITE_LEN..].copy_from_slice(HEADER_TD_SUFFIX.as_bytes());
v
}

9
src/dbutils/helper.rs Normal file
View File

@@ -0,0 +1,9 @@
use super::*;
pub fn change_set_by_index_bucket(storage: bool) -> (Bucket, usize) {
if storage {
(Bucket::PlainStorageChangeSet, 60)
} else {
(Bucket::PlainAccountChangeSet, 20)
}
}

View File

@@ -1,4 +1,4 @@
use crate::{dbutils::*, models::*, Cursor, Transaction};
use crate::{common, dbutils::*, models::*, Cursor, Transaction};
use anyhow::{bail, Context};
use arrayref::array_ref;
use async_stream::try_stream;
@@ -30,11 +30,9 @@ pub trait TransactionExt: Transaction {
let b = self.get_one(HEADER_PREFIX, &key).await?;
const L: usize = H256::len_bytes();
match b.len() {
0 => Ok(None),
L => Ok(Some(H256::from_slice(&*b))),
common::HASH_LENGTH => Ok(Some(H256::from_slice(&*b))),
other => bail!("invalid length: {}", other),
}
}
@@ -60,11 +58,9 @@ pub trait TransactionExt: Transaction {
.get_one(HEADER_NUMBER_PREFIX, &hash.to_fixed_bytes())
.await?;
const L: usize = size_of::<u64>();
match b.len() {
0 => Ok(None),
L => Ok(Some(u64::from_be_bytes(*array_ref![b, 0, 8]))),
common::BLOCK_NUMBER_LENGTH => Ok(Some(u64::from_be_bytes(*array_ref![b, 0, 8]))),
other => bail!("invalid length: {}", other),
}
}
@@ -99,13 +95,11 @@ pub trait TransactionExt: Transaction {
return Ok(None);
}
let block_num_byte_len = mem::size_of::<u64>();
Ok(Some(u64::from_be_bytes(*array_ref![
b.get(0..block_num_byte_len)
b.get(0..common::BLOCK_NUMBER_LENGTH)
.context("failed to read block number from bytes")?,
0,
mem::size_of::<u64>()
common::BLOCK_NUMBER_LENGTH
])))
}
@@ -224,7 +218,7 @@ pub trait TransactionExt: Transaction {
// dat = make([]byte, len(v))
// copy(dat, v)
// return dat, nil
todo!()
bail!("TODO")
}
// pub async fn find_by_history(&self, storage: bool, key: &[u8], timestamp: u64) -> anyhow::Result<Option<Bytes>> {
@@ -345,6 +339,6 @@ impl<'tx, Tx: ?Sized> StateReader<'tx, Tx> {
pub async fn read_account_data(&mut self, address: Address) -> anyhow::Result<Option<Account>> {
self.account_reads.insert(address);
todo!()
bail!("TODO")
}
}

View File

@@ -1,13 +1,15 @@
#![feature(generic_associated_types, once_cell)]
#![allow(incomplete_features)]
#![feature(destructuring_assignment, generic_associated_types, once_cell)]
#![allow(incomplete_features, clippy::unused_io_amount)]
mod changeset;
mod common;
mod dbutils;
mod ext;
mod models;
mod remote;
mod traits;
pub use changeset::ChangeSet;
pub use dbutils::SyncStage;
pub use ext::TransactionExt;
pub use remote::{kv_client::KvClient as RemoteKvClient, RemoteCursor, RemoteTransaction};

View File

@@ -4,9 +4,10 @@ use ethereum_types::{H256, U256};
use hex_literal::hex;
use rlp_derive::RlpDecodable;
use serde::Deserialize;
use sha3::{Digest, Keccak256};
use std::collections::BTreeSet;
use crate::common;
#[allow(non_snake_case)]
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -81,13 +82,13 @@ impl Default for Account {
nonce: 0,
balance: U256::zero(),
root: hex!("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421").into(),
code_hash: H256::from_slice(&Keccak256::new().finalize()),
code_hash: common::hash_data(&[]),
incarnation: 0,
}
}
}
fn bytesToUint64(buf: &[u8]) -> u64 {
fn bytes_to_u64(buf: &[u8]) -> u64 {
let mut x = 0;
for (i, b) in buf.iter().enumerate() {
x <<= 8 + *b as u64;
@@ -120,7 +121,7 @@ impl Account {
);
}
a.nonce = bytesToUint64(&enc[pos + 1..pos + decodeLength + 1]);
a.nonce = bytes_to_u64(&enc[pos + 1..pos + decodeLength + 1]);
pos += decodeLength + 1;
}
@@ -150,7 +151,7 @@ impl Account {
)
}
a.incarnation = bytesToUint64(&enc[pos + 1..pos + decodeLength + 1]);
a.incarnation = bytes_to_u64(&enc[pos + 1..pos + decodeLength + 1]);
pos += decodeLength + 1;
}

View File

@@ -204,7 +204,7 @@ impl<'tx> traits::CursorDupSort for RemoteCursor<'tx> {
self.op(Op::NextNoDup, None, None).await
}
async fn last_dup(&mut self, key: &[u8]) -> anyhow::Result<Bytes> {
Ok(self.op(Op::LastDup, None, None).await?.1)
Ok(self.op(Op::LastDup, Some(key), None).await?.1)
}
}