mirror of
https://github.com/akula-bft/akula.git
synced 2026-04-19 03:00:13 -04:00
various fixes
This commit is contained in:
@@ -15,7 +15,7 @@ bytes = { package = "lifetimed-bytes", git = "https://github.com/vorot93/lifetim
|
||||
ethereum = "0.7"
|
||||
ethereum-interfaces = { git = "https://github.com/rust-ethereum/interfaces", default-features = false, features = ["remotekv"] }
|
||||
ethereum-types = "0.11"
|
||||
futures = "0.3"
|
||||
futures-core = "0.3"
|
||||
hex = "0.4"
|
||||
hex-literal = "0.3"
|
||||
maplit = "1"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{common, dbutils::*, models::*, tables, txdb, txutil, Transaction};
|
||||
use crate::{common, dbutils::*, models::*, tables, txdb, Transaction};
|
||||
use anyhow::{bail, Context};
|
||||
use arrayref::array_ref;
|
||||
use ethereum::Header;
|
||||
@@ -22,7 +22,7 @@ pub mod canonical_hash {
|
||||
hex::encode(&key)
|
||||
);
|
||||
|
||||
if let Some(b) = txutil::get_one::<_, tables::HeaderCanonical>(tx, &key).await? {
|
||||
if let Some(b) = tx.get_one::<tables::HeaderCanonical>(&key).await? {
|
||||
match b.len() {
|
||||
common::HASH_LENGTH => return Ok(Some(H256::from_slice(&*b))),
|
||||
other => bail!("invalid length: {}", other),
|
||||
@@ -42,8 +42,9 @@ pub mod header_number {
|
||||
) -> anyhow::Result<Option<u64>> {
|
||||
trace!("Reading block number for hash {:?}", hash);
|
||||
|
||||
if let Some(b) =
|
||||
txutil::get_one::<_, tables::HeaderNumber>(tx, &hash.to_fixed_bytes()).await?
|
||||
if let Some(b) = tx
|
||||
.get_one::<tables::HeaderNumber>(&hash.to_fixed_bytes())
|
||||
.await?
|
||||
{
|
||||
match b.len() {
|
||||
common::BLOCK_NUMBER_LENGTH => {
|
||||
@@ -67,8 +68,9 @@ pub mod header {
|
||||
) -> anyhow::Result<Option<Header>> {
|
||||
trace!("Reading header for block {}/{:?}", number, hash);
|
||||
|
||||
if let Some(b) =
|
||||
txutil::get_one::<_, tables::Headers>(tx, &header_key(number, hash)).await?
|
||||
if let Some(b) = tx
|
||||
.get_one::<tables::Headers>(&header_key(number, hash))
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(rlp::decode(&b)?));
|
||||
}
|
||||
@@ -128,8 +130,9 @@ pub mod storage_body {
|
||||
) -> anyhow::Result<Option<Bytes<'tx>>> {
|
||||
trace!("Reading storage body for block {}/{:?}", number, hash);
|
||||
|
||||
if let Some(b) =
|
||||
txutil::get_one::<_, tables::BlockBody>(tx, &header_key(number, hash)).await?
|
||||
if let Some(b) = tx
|
||||
.get_one::<tables::BlockBody>(&header_key(number, hash))
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(b));
|
||||
}
|
||||
@@ -168,8 +171,9 @@ pub mod td {
|
||||
) -> anyhow::Result<Option<U256>> {
|
||||
trace!("Reading totatl difficulty at block {}/{:?}", number, hash);
|
||||
|
||||
if let Some(b) =
|
||||
txutil::get_one::<_, tables::HeaderTD>(tx, &header_key(number, hash)).await?
|
||||
if let Some(b) = tx
|
||||
.get_one::<tables::HeaderTD>(&header_key(number, hash))
|
||||
.await?
|
||||
{
|
||||
trace!("Reading TD RLP: {}", hex::encode(&b));
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{models::*, tables, txutil, Transaction};
|
||||
use crate::{models::*, tables, Transaction};
|
||||
use anyhow::Context;
|
||||
use ethereum_types::H256;
|
||||
use tracing::*;
|
||||
@@ -15,7 +15,7 @@ pub async fn read_chain_config<'tx, Tx: Transaction<'tx>>(
|
||||
hex::encode(&key)
|
||||
);
|
||||
|
||||
if let Some(b) = txutil::get_one::<_, tables::Config>(tx, &key).await? {
|
||||
if let Some(b) = tx.get_one::<tables::Config>(&key).await? {
|
||||
trace!("Read chain config data: {}", hex::encode(&b));
|
||||
|
||||
return Ok(Some(serde_json::from_slice(&*b).context("invalid JSON")?));
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{changeset::*, dbutils::*, models::*, txutil, Cursor, Transaction};
|
||||
use crate::{changeset::*, dbutils::*, models::*, Cursor, Transaction};
|
||||
use anyhow::bail;
|
||||
use ethereum_types::Address;
|
||||
|
||||
|
||||
@@ -2,8 +2,7 @@ use super::*;
|
||||
use crate::common;
|
||||
use common::HASH_LENGTH;
|
||||
use ethereum_types::H256;
|
||||
use futures::io::copy;
|
||||
use std::{io::Write, mem::size_of};
|
||||
use std::io::Write;
|
||||
|
||||
pub const HEADER_KEY_LEN: usize = common::BLOCK_NUMBER_LENGTH + common::HASH_LENGTH;
|
||||
|
||||
|
||||
@@ -9,19 +9,11 @@ use mdbx::{
|
||||
Transaction as MdbxTransaction, TransactionKind, WriteFlags, WriteMap, RO, RW,
|
||||
};
|
||||
|
||||
fn filter_not_found<T>(res: Result<T, mdbx::Error>) -> anyhow::Result<Option<T>> {
|
||||
match res {
|
||||
Ok(v) => Ok(Some(v)),
|
||||
Err(MdbxError::NotFound) => Ok(None),
|
||||
Err(other) => Err(other.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn set<'txn, K: TransactionKind>(
|
||||
c: &mut MdbxCursor<'txn, K>,
|
||||
k: &[u8],
|
||||
) -> anyhow::Result<Option<(Bytes<'txn>, Bytes<'txn>)>> {
|
||||
filter_not_found(MdbxCursor::set_key(c, k))
|
||||
Ok(MdbxCursor::set_key(c, k)?)
|
||||
}
|
||||
|
||||
fn get_both_range<'txn, K: TransactionKind>(
|
||||
@@ -29,7 +21,7 @@ fn get_both_range<'txn, K: TransactionKind>(
|
||||
k: &[u8],
|
||||
v: &[u8],
|
||||
) -> anyhow::Result<Option<Bytes<'txn>>> {
|
||||
filter_not_found(MdbxCursor::get_both_range(c, k, v))
|
||||
Ok(MdbxCursor::get_both_range(c, k, v)?)
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
@@ -124,7 +116,7 @@ where
|
||||
B: Table,
|
||||
{
|
||||
async fn first(&mut self) -> anyhow::Result<Option<(Bytes<'txn>, Bytes<'txn>)>> {
|
||||
filter_not_found(MdbxCursor::first(self))
|
||||
Ok(MdbxCursor::first(self)?)
|
||||
}
|
||||
|
||||
async fn seek(&mut self, key: &[u8]) -> anyhow::Result<Option<(Bytes<'txn>, Bytes<'txn>)>> {
|
||||
@@ -172,11 +164,11 @@ where
|
||||
}
|
||||
|
||||
async fn next_dup(&mut self) -> anyhow::Result<Option<(Bytes<'txn>, Bytes<'txn>)>> {
|
||||
filter_not_found(MdbxCursor::next_dup(self))
|
||||
Ok(MdbxCursor::next_dup(self)?)
|
||||
}
|
||||
|
||||
async fn next_no_dup(&mut self) -> anyhow::Result<Option<(Bytes<'txn>, Bytes<'txn>)>> {
|
||||
filter_not_found(MdbxCursor::next_nodup(self))
|
||||
Ok(MdbxCursor::next_nodup(self)?)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,8 +3,8 @@ use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use dbutils::{DupSort, Table};
|
||||
use ethereum_types::Address;
|
||||
use futures::{future::LocalBoxFuture, stream::LocalBoxStream};
|
||||
use std::{cmp::Ordering, future::Future, pin::Pin};
|
||||
use futures_core::stream::LocalBoxStream;
|
||||
use std::{cmp::Ordering, pin::Pin};
|
||||
|
||||
pub type ComparatorFunc = Pin<Box<dyn Fn(&[u8], &[u8], &[u8], &[u8]) -> Ordering>>;
|
||||
|
||||
@@ -35,17 +35,9 @@ pub trait Transaction<'tx>: Sized {
|
||||
/// long keys into DupSort key/values.
|
||||
async fn cursor<B: Table>(&'tx self) -> anyhow::Result<Self::Cursor<B>>;
|
||||
async fn cursor_dup_sort<B: DupSort>(&'tx self) -> anyhow::Result<Self::CursorDupSort<B>>;
|
||||
}
|
||||
|
||||
/// Temporary as module due to current Rust type system limitations
|
||||
pub mod txutil {
|
||||
use super::*;
|
||||
|
||||
pub async fn get_one<'tx, Tx: Transaction<'tx>, B: Table>(
|
||||
tx: &'tx Tx,
|
||||
key: &[u8],
|
||||
) -> anyhow::Result<Option<Bytes<'tx>>> {
|
||||
let mut cursor = tx.cursor::<B>().await?;
|
||||
async fn get_one<B: Table>(&'tx self, key: &[u8]) -> anyhow::Result<Option<Bytes<'tx>>> {
|
||||
let mut cursor = self.cursor::<B>().await?;
|
||||
|
||||
Ok(cursor.seek_exact(key).await?.map(|(k, v)| v))
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ pub use kv::{
|
||||
mdbx::*,
|
||||
remote::{kv_client::KvClient as RemoteKvClient, RemoteCursor, RemoteTransaction},
|
||||
traits::{
|
||||
txutil, ComparatorFunc, Cursor, CursorDupSort, MutableCursor, MutableCursorDupSort,
|
||||
ComparatorFunc, Cursor, CursorDupSort, MutableCursor, MutableCursorDupSort,
|
||||
MutableTransaction, Transaction,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{common, dbutils::*, tables, txutil, Transaction};
|
||||
use crate::{common, dbutils::*, tables, Transaction};
|
||||
use anyhow::{bail, Context};
|
||||
use arrayref::array_ref;
|
||||
use ethereum::Header;
|
||||
@@ -11,7 +11,10 @@ pub async fn get_stage_progress<'tx, Tx: Transaction<'tx>>(
|
||||
) -> anyhow::Result<Option<u64>> {
|
||||
trace!("Reading stage {:?} progress", stage);
|
||||
|
||||
if let Some(b) = txutil::get_one::<_, tables::SyncStageProgress>(tx, stage.as_ref()).await? {
|
||||
if let Some(b) = tx
|
||||
.get_one::<tables::SyncStageProgress>(stage.as_ref())
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(u64::from_be_bytes(*array_ref![
|
||||
b.get(0..common::BLOCK_NUMBER_LENGTH)
|
||||
.context("failed to read block number from bytes")?,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{changeset::*, common, dbutils, dbutils::*, models::*, txutil, Cursor, Transaction};
|
||||
use crate::{changeset::*, common, dbutils, dbutils::*, models::*, Cursor, Transaction};
|
||||
use arrayref::array_ref;
|
||||
use bytes::Bytes;
|
||||
use common::{Hash, Incarnation, ADDRESS_LENGTH};
|
||||
@@ -15,7 +15,7 @@ pub async fn get_account_data_as_of<'tx, Tx: Transaction<'tx>>(
|
||||
return Ok(Some(v));
|
||||
}
|
||||
|
||||
txutil::get_one::<_, tables::PlainState>(tx, &key).await
|
||||
tx.get_one::<tables::PlainState>(&key).await
|
||||
}
|
||||
|
||||
pub async fn get_storage_as_of<'tx, Tx: Transaction<'tx>>(
|
||||
@@ -30,7 +30,7 @@ pub async fn get_storage_as_of<'tx, Tx: Transaction<'tx>>(
|
||||
return Ok(Some(v));
|
||||
}
|
||||
|
||||
txutil::get_one::<_, tables::PlainState>(tx, &key).await
|
||||
tx.get_one::<tables::PlainState>(&key).await
|
||||
}
|
||||
|
||||
pub async fn find_data_by_history<'tx, Tx: Transaction<'tx>>(
|
||||
@@ -66,11 +66,11 @@ pub async fn find_data_by_history<'tx, Tx: Transaction<'tx>>(
|
||||
//restore codehash
|
||||
if let Some(mut acc) = Account::decode_for_storage(&*data)? {
|
||||
if acc.incarnation > 0 && acc.is_empty_code_hash() {
|
||||
if let Some(code_hash) = txutil::get_one::<_, tables::PlainContractCode>(
|
||||
tx,
|
||||
&dbutils::plain_generate_storage_prefix(key, acc.incarnation),
|
||||
)
|
||||
.await?
|
||||
if let Some(code_hash) = tx
|
||||
.get_one::<tables::PlainContractCode>(
|
||||
&dbutils::plain_generate_storage_prefix(key, acc.incarnation),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
acc.code_hash = H256(*array_ref![&*code_hash, 0, 32]);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{changeset::*, common, dbutils::*, models::*, txutil, Cursor, Transaction};
|
||||
use crate::{changeset::*, common, dbutils::*, models::*, Cursor, Transaction};
|
||||
use anyhow::{bail, Context};
|
||||
use arrayref::array_ref;
|
||||
use async_stream::try_stream;
|
||||
@@ -6,7 +6,6 @@ use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use ethereum::Header;
|
||||
use ethereum_types::{Address, H256, U256};
|
||||
use futures::stream::LocalBoxStream;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use tokio::pin;
|
||||
use tokio_stream::Stream;
|
||||
|
||||
Reference in New Issue
Block a user