feat: HistoryStateProvider (#1057)

This commit is contained in:
rakita
2023-01-27 11:28:30 +01:00
committed by GitHub
parent 4e46d58b23
commit 71dc531e68
10 changed files with 272 additions and 50 deletions

View File

@@ -4,7 +4,7 @@ use eyre::eyre;
use reth_db::{
cursor::DbCursorRO,
database::Database,
mdbx::{test_utils::create_test_rw_db, WriteMap},
mdbx::test_utils::create_test_rw_db,
tables,
transaction::{DbTx, DbTxMut},
Error as DbError,
@@ -128,7 +128,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<TestOutcome> {
let has_block_reward = chain_spec.paris_status().block_number().is_some();
// Create db and acquire transaction
let db = create_test_rw_db::<WriteMap>();
let db = create_test_rw_db();
let tx = db.tx_mut()?;
// insert genesis

View File

@@ -1,4 +1,4 @@
use reth_primitives::{BlockHash, BlockNumber};
use reth_primitives::{Address, BlockHash, BlockNumber, TransitionId, H256};
/// KV error type. They are using u32 to represent error code.
#[allow(missing_docs)]
@@ -12,7 +12,6 @@ pub enum Error {
BlockBody { block_number: BlockNumber, block_hash: BlockHash },
#[error("Block transition id does not exist for block #{block_number}")]
BlockTransition { block_number: BlockNumber },
#[error("Block number {block_number} from block hash #{block_hash} does not exist in canonical chain")]
BlockCanonical { block_number: BlockNumber, block_hash: BlockHash },
#[error("Block number {block_number} with hash #{received_hash:?} is not canonical block. Canonical block hash is #{expected_hash:?}")]
@@ -21,4 +20,8 @@ pub enum Error {
expected_hash: BlockHash,
received_hash: BlockHash,
},
#[error("Storage ChangeSet address: ({address:?} key: {storage_key:?}) for transition:#{transition_id} does not exist")]
StorageChangeset { transition_id: TransitionId, address: Address, storage_key: H256 },
#[error("Account {address:?} ChangeSet for transition #{transition_id} does not exist")]
AccountChangeset { transition_id: TransitionId, address: Address },
}

View File

@@ -3,7 +3,7 @@ use reth_codecs::Compact;
use serde::Serialize;
/// Account storage entry.
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)]
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Serialize)]
pub struct StorageEntry {
/// Storage key.
pub key: H256,

View File

@@ -73,7 +73,6 @@ pub fn init_genesis<DB: Database>(db: Arc<DB>, chain: ChainSpec) -> Result<H256,
mod tests {
use super::init_genesis;
use crate::utils::init::WriteMap;
use reth_db::mdbx::test_utils::create_test_rw_db;
use reth_primitives::{
GOERLI, GOERLI_GENESIS, MAINNET, MAINNET_GENESIS, SEPOLIA, SEPOLIA_GENESIS,
@@ -81,7 +80,7 @@ mod tests {
#[test]
fn success_init_genesis_mainnet() {
let db = create_test_rw_db::<WriteMap>();
let db = create_test_rw_db();
let genesis_hash = init_genesis(db.clone(), MAINNET.clone()).unwrap();
// actual, expected
@@ -90,7 +89,7 @@ mod tests {
#[test]
fn success_init_genesis_goerli() {
let db = create_test_rw_db::<WriteMap>();
let db = create_test_rw_db();
let genesis_hash = init_genesis(db.clone(), GOERLI.clone()).unwrap();
// actual, expected
@@ -99,7 +98,7 @@ mod tests {
#[test]
fn success_init_genesis_sepolia() {
let db = create_test_rw_db::<WriteMap>();
let db = create_test_rw_db();
let genesis_hash = init_genesis(db.clone(), SEPOLIA.clone()).unwrap();
// actual, expected

View File

@@ -230,6 +230,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let mut cursor_storage_changeset =
tx.cursor_write::<tables::StorageChangeSet>()?;
cursor_storage_changeset.seek_exact(storage_id)?;
if wipe_storage {
// iterate over storage and save them before entry is deleted.
@@ -240,7 +241,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
})
.try_for_each(|entry| {
let (_, old_value) = entry?;
cursor_storage_changeset.append(storage_id.clone(), old_value)
cursor_storage_changeset.append(storage_id, old_value)
})?;
// delete all entries
@@ -262,8 +263,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let old_entry = StorageEntry { key, value: old_value };
let new_entry = StorageEntry { key, value: new_value };
// insert into StorageChangeSet
cursor_storage_changeset
.append(storage_id.clone(), old_entry.clone())?;
cursor_storage_changeset.append(storage_id, old_entry)?;
// Always delete old value as duplicate table, put will not override it
tx.delete::<tables::PlainStorageState>(address, Some(old_entry))?;
@@ -361,7 +361,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// revert all changes to PlainStorage
for (key, storage) in storage_changeset_batch.into_iter().rev() {
let address = key.address();
tx.put::<tables::PlainStorageState>(address, storage.clone())?;
tx.put::<tables::PlainStorageState>(address, storage)?;
if storage.value == U256::ZERO {
// delete value that is zero
tx.delete::<tables::PlainStorageState>(address, Some(storage))?;

View File

@@ -61,7 +61,7 @@ pub trait DbDupCursorRO<'tx, T: DupSort> {
fn next_dup_val(&mut self) -> ValueOnlyResult<T>;
/// Seek by key and subkey
fn seek_by_key_subkey(&mut self, key: T::Key, value: T::SubKey) -> ValueOnlyResult<T>;
fn seek_by_key_subkey(&mut self, key: T::Key, subkey: T::SubKey) -> ValueOnlyResult<T>;
/// Returns an iterator starting at a key greater or equal than `start_key` of a DupSort
/// table.

View File

@@ -110,6 +110,8 @@ impl<E: EnvironmentKind> Deref for Env<E> {
/// Collection of database test utilities
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils {
use reth_libmdbx::WriteMap;
use super::{Env, EnvKind, EnvironmentKind, Path};
use std::sync::Arc;
@@ -121,7 +123,7 @@ pub mod test_utils {
pub const ERROR_TEMPDIR: &str = "Not able to create a temporary directory.";
/// Create rw database for testing
pub fn create_test_rw_db<E: EnvironmentKind>() -> Arc<Env<E>> {
pub fn create_test_rw_db() -> Arc<Env<WriteMap>> {
create_test_db(EnvKind::RW)
}
/// Create database for testing

View File

@@ -42,7 +42,7 @@ impl Compact for AccountBeforeTx {
/// [`TxNumber`] concatenated with [`Address`]. Used as a key for [`StorageChangeSet`]
///
/// Since it's used as a key, it isn't compressed when encoding it.
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd)]
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd)]
pub struct TransitionIdAddress(pub (TransitionId, Address));
impl TransitionIdAddress {

View File

@@ -34,7 +34,11 @@ heapless = "0.7.16"
# feature test-utils
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }
secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"], optional = true }
secp256k1 = { version = "0.24.2", default-features = false, features = [
"alloc",
"recovery",
"rand",
], optional = true }
parking_lot = { version = "0.12", optional = true }
[dev-dependencies]
@@ -42,9 +46,13 @@ reth-db = { path = "../db", features = ["test-utils"] }
test-fuzz = "3.0.4"
tokio = { version = "1.21.2", features = ["full"] }
tokio-stream = { version = "0.1.11", features = ["sync"] }
arbitrary = { version = "1.1.7", features = ["derive"]}
arbitrary = { version = "1.1.7", features = ["derive"] }
hex-literal = "0.3"
secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"] }
secp256k1 = { version = "0.24.2", default-features = false, features = [
"alloc",
"recovery",
"rand",
] }
parking_lot = "0.12"
[features]

View File

@@ -1,6 +1,9 @@
use crate::{AccountProvider, BlockHashProvider, StateProvider};
use crate::{AccountProvider, BlockHashProvider, Error, StateProvider};
use reth_db::{
cursor::DbCursorRO, models::storage_sharded_key::StorageShardedKey, tables, transaction::DbTx,
cursor::{DbCursorRO, DbDupCursorRO},
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
tables,
transaction::DbTx,
};
use reth_interfaces::Result;
use reth_primitives::{
@@ -31,12 +34,35 @@ impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> {
Self { tx, transition, _phantom: PhantomData {} }
}
}
impl<'a, 'b, TX: DbTx<'a>> AccountProvider for HistoricalStateProviderRef<'a, 'b, TX> {
/// Get basic account information.
fn basic_account(&self, _address: Address) -> Result<Option<Account>> {
// TODO add when AccountHistory is defined
Ok(None)
fn basic_account(&self, address: Address) -> Result<Option<Account>> {
// history key to search IntegerList of transition id changesets.
let history_key = ShardedKey::new(address, self.transition);
let Some(changeset_transition_id) = self.tx.cursor_read::<tables::AccountHistory>()?
.seek(history_key)?
.filter(|(key,_)| key.key == address)
.map(|(_,list)| list.0.enable_rank().successor(self.transition as usize).map(|i| i as u64)) else {
return Ok(None)
};
// if changeset transition id is present we are getting value from changeset
if let Some(changeset_transition_id) = changeset_transition_id {
let account = self
.tx
.cursor_dup_read::<tables::AccountChangeSet>()?
.seek_by_key_subkey(changeset_transition_id, address)?
.ok_or(Error::AccountChangeset {
transition_id: changeset_transition_id,
address,
})?;
Ok(account.info)
} else {
// if changeset is not present that means that there was history shard but we need to
// use newest value from plain state
Ok(self.tx.get::<tables::PlainAccountState>(address)?)
}
}
}
@@ -49,33 +75,38 @@ impl<'a, 'b, TX: DbTx<'a>> BlockHashProvider for HistoricalStateProviderRef<'a,
impl<'a, 'b, TX: DbTx<'a>> StateProvider for HistoricalStateProviderRef<'a, 'b, TX> {
/// Get storage.
fn storage(&self, account: Address, storage_key: StorageKey) -> Result<Option<StorageValue>> {
// TODO when StorageHistory is defined
let transition_id = StorageShardedKey::new(account, storage_key, self.transition);
let transaction_number =
self.tx.get::<tables::StorageHistory>(transition_id)?.map(|_integer_list|
// TODO select integer that is one less from transaction_number <- // TODO: (rkrasiuk) not sure this comment is still relevant
self.transition);
fn storage(&self, address: Address, storage_key: StorageKey) -> Result<Option<StorageValue>> {
// history key to search IntegerList of transition id changesets.
let history_key = StorageShardedKey::new(address, storage_key, self.transition);
if transaction_number.is_none() {
return Ok(None)
let Some(changeset_transition_id) = self.tx.cursor_read::<tables::StorageHistory>()?
.seek(history_key)?
.filter(|(key,_)| key.address == address && key.sharded_key.key == storage_key)
.map(|(_,list)| list.0.enable_rank().successor(self.transition as usize).map(|i| i as u64)) else {
return Ok(None)
};
// if changeset transition id is present we are getting value from changeset
if let Some(changeset_transition_id) = changeset_transition_id {
let storage_entry = self
.tx
.cursor_dup_read::<tables::StorageChangeSet>()?
.seek_by_key_subkey((changeset_transition_id, address).into(), storage_key)?
.ok_or(Error::StorageChangeset {
transition_id: changeset_transition_id,
address,
storage_key,
})?;
Ok(Some(storage_entry.value))
} else {
// if changeset is not present that means that there was history shard but we need to
// use newest value from plain state
Ok(self
.tx
.cursor_dup_read::<tables::PlainStorageState>()?
.seek_by_key_subkey(address, storage_key)
.map(|r| r.map(|entry| entry.value))?)
}
let num = transaction_number.unwrap();
let mut cursor = self.tx.cursor_dup_read::<tables::StorageChangeSet>()?;
if let Some((_, entry)) = cursor.seek_exact((num, account).into())? {
if entry.key == storage_key {
return Ok(Some(entry.value))
}
// TODO(rakita) this will be reworked shortly in StorageHistory PR.
// if let Some((_, entry)) = cursor.seek(storage_key)? {
// if entry.key == storage_key {
// return Ok(Some(entry.value))
// }
// }
}
Ok(None)
}
/// Get account code by its hash
@@ -122,3 +153,182 @@ derive_from_ref!(
fn storage(&self, account: Address, storage_key: StorageKey) -> Result<Option<StorageValue>>,
fn bytecode_by_hash(&self, code_hash: H256) -> Result<Option<Bytes>>
);
#[cfg(test)]
mod tests {
use reth_db::{
database::Database,
mdbx::test_utils::create_test_rw_db,
models::{storage_sharded_key::StorageShardedKey, AccountBeforeTx, ShardedKey},
tables,
transaction::{DbTx, DbTxMut},
TransitionList,
};
use reth_primitives::{hex_literal::hex, Account, StorageEntry, H160, H256, U256};
use crate::{AccountProvider, HistoricalStateProviderRef, StateProvider};
const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001"));
const STORAGE: H256 =
H256(hex!("0000000000000000000000000000000000000000000000000000000000000001"));
#[test]
fn history_provider_get_account() {
let db = create_test_rw_db();
let tx = db.tx_mut().unwrap();
tx.put::<tables::AccountHistory>(
ShardedKey { key: ADDRESS, highest_transition_id: 7 },
TransitionList::new([3, 7]).unwrap(),
)
.unwrap();
tx.put::<tables::AccountHistory>(
ShardedKey { key: ADDRESS, highest_transition_id: u64::MAX },
TransitionList::new([10, 15]).unwrap(),
)
.unwrap();
let acc_plain = Account { nonce: 100, balance: U256::ZERO, bytecode_hash: None };
let acc_at15 = Account { nonce: 15, balance: U256::ZERO, bytecode_hash: None };
let acc_at10 = Account { nonce: 10, balance: U256::ZERO, bytecode_hash: None };
let acc_at7 = Account { nonce: 7, balance: U256::ZERO, bytecode_hash: None };
let acc_at3 = Account { nonce: 3, balance: U256::ZERO, bytecode_hash: None };
// setup
tx.put::<tables::AccountChangeSet>(
3,
AccountBeforeTx { address: ADDRESS, info: Some(acc_at3) },
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
7,
AccountBeforeTx { address: ADDRESS, info: Some(acc_at7) },
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
10,
AccountBeforeTx { address: ADDRESS, info: Some(acc_at10) },
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
15,
AccountBeforeTx { address: ADDRESS, info: Some(acc_at15) },
)
.unwrap();
// setup plain state
tx.put::<tables::PlainAccountState>(ADDRESS, acc_plain).unwrap();
tx.commit().unwrap();
let tx = db.tx().unwrap();
// run
assert_eq!(
HistoricalStateProviderRef::new(&tx, 1).basic_account(ADDRESS),
Ok(Some(acc_at3))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 3).basic_account(ADDRESS),
Ok(Some(acc_at3))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 4).basic_account(ADDRESS),
Ok(Some(acc_at7))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 7).basic_account(ADDRESS),
Ok(Some(acc_at7))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 9).basic_account(ADDRESS),
Ok(Some(acc_at10))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 10).basic_account(ADDRESS),
Ok(Some(acc_at10))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 11).basic_account(ADDRESS),
Ok(Some(acc_at15))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 16).basic_account(ADDRESS),
Ok(Some(acc_plain))
);
}
#[test]
fn history_provider_get_storage() {
let db = create_test_rw_db();
let tx = db.tx_mut().unwrap();
tx.put::<tables::StorageHistory>(
StorageShardedKey {
address: ADDRESS,
sharded_key: ShardedKey { key: STORAGE, highest_transition_id: 7 },
},
TransitionList::new([3, 7]).unwrap(),
)
.unwrap();
tx.put::<tables::StorageHistory>(
StorageShardedKey {
address: ADDRESS,
sharded_key: ShardedKey { key: STORAGE, highest_transition_id: u64::MAX },
},
TransitionList::new([10, 15]).unwrap(),
)
.unwrap();
let entry_plain = StorageEntry { key: STORAGE, value: U256::from(100) };
let entry_at15 = StorageEntry { key: STORAGE, value: U256::from(15) };
let entry_at10 = StorageEntry { key: STORAGE, value: U256::from(10) };
let entry_at7 = StorageEntry { key: STORAGE, value: U256::from(7) };
let entry_at3 = StorageEntry { key: STORAGE, value: U256::from(0) };
// setup
tx.put::<tables::StorageChangeSet>((3, ADDRESS).into(), entry_at3).unwrap();
tx.put::<tables::StorageChangeSet>((7, ADDRESS).into(), entry_at7).unwrap();
tx.put::<tables::StorageChangeSet>((10, ADDRESS).into(), entry_at10).unwrap();
tx.put::<tables::StorageChangeSet>((15, ADDRESS).into(), entry_at15).unwrap();
// setup plain state
tx.put::<tables::PlainStorageState>(ADDRESS, entry_plain).unwrap();
tx.commit().unwrap();
let tx = db.tx().unwrap();
// run
assert_eq!(
HistoricalStateProviderRef::new(&tx, 0).storage(ADDRESS, STORAGE),
Ok(Some(entry_at3.value))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 3).storage(ADDRESS, STORAGE),
Ok(Some(entry_at3.value))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 4).storage(ADDRESS, STORAGE),
Ok(Some(entry_at7.value))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 7).storage(ADDRESS, STORAGE),
Ok(Some(entry_at7.value))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 9).storage(ADDRESS, STORAGE),
Ok(Some(entry_at10.value))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 10).storage(ADDRESS, STORAGE),
Ok(Some(entry_at10.value))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 11).storage(ADDRESS, STORAGE),
Ok(Some(entry_at15.value))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 16).storage(ADDRESS, STORAGE),
Ok(Some(entry_plain.value))
);
}
}