feat: add AccountHashingStage and StorageHashingStage (#756)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
rakita
2023-01-18 19:22:39 +01:00
committed by GitHub
parent a8b7a089b6
commit 899d011bfd
7 changed files with 996 additions and 10 deletions

View File

@@ -23,7 +23,8 @@ use reth_primitives::{BlockNumber, ChainSpec, NodeRecord, H256};
use reth_stages::{
metrics::HeaderMetrics,
stages::{
bodies::BodyStage, execution::ExecutionStage, headers::HeaderStage,
bodies::BodyStage, execution::ExecutionStage, hashing_account::AccountHashingStage,
hashing_storage::StorageHashingStage, headers::HeaderStage, merkle::MerkleStage,
sender_recovery::SenderRecoveryStage, total_difficulty::TotalDifficultyStage,
},
PipelineEvent, StageId,
@@ -181,7 +182,13 @@ impl Command {
.push(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: config.stages.execution.commit_threshold,
});
})
// This Merkle stage is used only on unwind
.push(MerkleStage { is_execute: false })
.push(AccountHashingStage { clean_threshold: 500_000, commit_threshold: 100_000 })
.push(StorageHashingStage { clean_threshold: 500_000, commit_threshold: 100_000 })
// This merkle stage is used only for execute
.push(MerkleStage { is_execute: true });
if let Some(tip) = self.tip {
debug!(target: "reth::cli", %tip, "Tip manually set");

View File

@@ -1,7 +1,7 @@
use rand::{distributions::uniform::SampleRange, thread_rng, Rng};
use reth_primitives::{
proofs, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, Transaction,
TransactionKind, TransactionSigned, TxLegacy, H256, U256,
proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, Transaction,
TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256,
};
use secp256k1::{KeyPair, Message as SecpMessage, Secp256k1, SecretKey};
@@ -164,6 +164,24 @@ pub fn random_block_range(
blocks
}
/// Generate random Externaly Owned Account (EOA account without contract).
pub fn random_eoa_account() -> (Address, Account) {
let nonce: u64 = rand::random();
let balance = U256::from(rand::random::<u32>());
let addr = H160::from(rand::random::<u64>());
(addr, Account { nonce, balance, bytecode_hash: None })
}
/// Docs
pub fn random_eoa_account_range(acc_range: &mut std::ops::Range<u64>) -> Vec<(Address, Account)> {
let mut accounts = Vec::with_capacity(acc_range.end.saturating_sub(acc_range.start) as usize);
for _ in acc_range {
accounts.push(random_eoa_account())
}
accounts
}
#[cfg(test)]
mod test {
use std::str::FromStr;

View File

@@ -0,0 +1,398 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{keccak256, Account, Address, H160};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
};
use tracing::*;
const ACCOUNT_HASHING: StageId = StageId("AccountHashingStage");
/// Account hashing stage hashes plain account.
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
#[derive(Debug)]
pub struct AccountHashingStage {
/// The threshold for switching from incremental hashing
/// of changes to whole storage hashing. Num of transitions.
pub clean_threshold: u64,
/// The size of inserted items after which the control
/// flow will be returned to the pipeline for commit.
pub commit_threshold: u64,
}
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for AccountHashingStage {
/// Return the id of the stage
fn id(&self) -> StageId {
ACCOUNT_HASHING
}
/// Execute the stage.
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let previous_stage_progress = input.previous_stage_progress();
// read account changeset, merge it into one changeset and calculate account hashes.
let from_transition = tx.get_block_transition(stage_progress)?;
let to_transition = tx.get_block_transition(previous_stage_progress)?;
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to
// AccountHashing table
if to_transition - from_transition > self.clean_threshold {
// clear table, load all accounts and hash it
tx.clear::<tables::HashedAccount>()?;
tx.commit()?;
let mut first_key = H160::zero();
loop {
let next_key = {
let mut accounts = tx.cursor_read::<tables::PlainAccountState>()?;
let hashed_batch = accounts
.walk(first_key)?
.take(self.commit_threshold as usize)
.map(|res| res.map(|(address, account)| (keccak256(address), account)))
.collect::<Result<BTreeMap<_, _>, _>>()?;
// next key of iterator
let next_key = accounts.next()?;
let mut hashes = tx.cursor_write::<tables::HashedAccount>()?;
// iterate and append presorted hashed accounts
hashed_batch.into_iter().try_for_each(|(k, v)| hashes.append(k, v))?;
next_key
};
tx.commit()?;
if let Some((next_key, _)) = next_key {
first_key = next_key;
continue
}
break
}
} else {
// Aggregate all transition changesets and and make list of account that have been
// changed.
tx.cursor_read::<tables::AccountChangeSet>()?
.walk(from_transition)?
.take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition).unwrap_or_default())
.collect::<Result<Vec<_>, _>>()?
.into_iter()
// fold all account to one set of changed accounts
.fold(BTreeSet::new(), |mut accounts: BTreeSet<Address>, (_, account_before)| {
accounts.insert(account_before.address);
accounts
})
.into_iter()
// iterate over plain state and get newest value.
// Assumption we are okay to make is that plainstate represent
// `previous_stage_progress` state.
.map(|address| tx.get::<tables::PlainAccountState>(address).map(|a| (address, a)))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.try_for_each(|(address, account)| {
let hashed_address = keccak256(address);
if let Some(account) = account {
tx.put::<tables::HashedAccount>(hashed_address, account)
} else {
tx.delete::<tables::HashedAccount>(hashed_address, None).map(|_| ())
}
})?;
}
info!(target: "sync::stages::hashing_account", "Stage finished");
Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
}
/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
// There is no threshold on account unwind, we will always take changesets and
// apply past values to HashedAccount table.
let from_transition_rev = tx.get_block_transition(input.unwind_to)?;
let to_transition_rev = tx.get_block_transition(input.stage_progress)?;
// Aggregate all transition changesets and and make list of account that have been changed.
tx.cursor_read::<tables::AccountChangeSet>()?
.walk(from_transition_rev)?
.take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition_rev).unwrap_or_default())
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Option<Account>>, (_, account_before)| {
accounts.insert(account_before.address, account_before.info);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|(address, account)| (keccak256(address), account))
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedState (if Account is None remove it);
.try_for_each(|(hashed_address, account)| {
if let Some(account) = account {
tx.put::<tables::HashedAccount>(hashed_address, account)
} else {
tx.delete::<tables::HashedAccount>(hashed_address, None).map(|_| ())
}
})?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner,
PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_interfaces::test_utils::generators::random_block_range;
use reth_primitives::{Account, SealedBlock, H256, U256};
use reth_provider::insert_canonical_block;
use test_utils::*;
stage_test_suite_ext!(AccountHashingTestRunner);
#[tokio::test]
async fn execute_below_clean_threshold() {
let (previous_stage, stage_progress) = (20, 10);
// Set up the runner
let mut runner = AccountHashingTestRunner::default();
runner.set_clean_threshold(1);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done && stage_progress == previous_stage);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}
mod test_utils {
use super::*;
use crate::{
stages::hashing_account::AccountHashingStage,
test_utils::{StageTestRunner, TestTransaction},
ExecInput, ExecOutput, UnwindInput,
};
use reth_db::{
cursor::DbCursorRO,
models::AccountBeforeTx,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::test_utils::generators::random_eoa_account_range;
pub(crate) struct AccountHashingTestRunner {
pub(crate) tx: TestTransaction,
commit_threshold: u64,
clean_threshold: u64,
}
impl AccountHashingTestRunner {
pub(crate) fn set_clean_threshold(&mut self, threshold: u64) {
self.clean_threshold = threshold;
}
#[allow(dead_code)]
pub(crate) fn set_commit_threshold(&mut self, threshold: u64) {
self.commit_threshold = threshold;
}
pub(crate) fn insert_blocks(
&self,
blocks: Vec<SealedBlock>,
) -> Result<(), TestRunnerError> {
let mut blocks_iter = blocks.iter();
while let Some(block) = blocks_iter.next() {
self.tx.commit(|tx| {
insert_canonical_block(tx, block, true).unwrap();
Ok(())
})?;
}
Ok(())
}
pub(crate) fn insert_accounts(
&self,
accounts: &Vec<(Address, Account)>,
) -> Result<(), TestRunnerError> {
let mut accs_iter = accounts.iter();
while let Some((addr, acc)) = accs_iter.next() {
self.tx.commit(|tx| {
tx.put::<tables::PlainAccountState>(*addr, *acc)?;
Ok(())
})?;
}
Ok(())
}
/// Iterates over PlainAccount table and checks that the accounts match the ones
/// in the HashedAccount table
pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> {
self.tx.query(|tx| {
let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccount>()?;
while let Some((address, account)) = acc_cursor.next()? {
let hashed_addr = keccak256(address);
if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
assert_eq!(acc, account)
}
}
Ok(())
})?;
Ok(())
}
/// Same as check_hashed_accounts, only that checks with the old account state,
/// namely, the same account with nonce - 1 and balance - 1.
pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> {
self.tx.query(|tx| {
let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccount>()?;
while let Some((address, account)) = acc_cursor.next()? {
let Account { nonce, balance, .. } = account;
let old_acc = Account {
nonce: nonce - 1,
balance: balance - U256::from(1),
bytecode_hash: None,
};
let hashed_addr = keccak256(address);
if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
assert_eq!(acc, old_acc)
}
}
Ok(())
})?;
Ok(())
}
}
impl Default for AccountHashingTestRunner {
fn default() -> Self {
Self {
tx: TestTransaction::default(),
commit_threshold: 1000,
clean_threshold: 1000,
}
}
}
impl StageTestRunner for AccountHashingTestRunner {
type S = AccountHashingStage;
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
Self::S {
commit_threshold: self.commit_threshold,
clean_threshold: self.clean_threshold,
}
}
}
#[async_trait::async_trait]
impl ExecuteStageTestRunner for AccountHashingTestRunner {
type Seed = Vec<(Address, Account)>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let end = input.previous_stage_progress() + 1;
let blocks = random_block_range(0..end, H256::zero(), 0..3);
self.insert_blocks(blocks)?;
let n_accounts = 2;
let accounts = random_eoa_account_range(&mut (0..n_accounts));
self.insert_accounts(&accounts)?;
// seed account changeset
self.tx
.commit(|tx| {
let (_, last_transition) =
tx.cursor_read::<tables::BlockTransitionIndex>()?.last()?.unwrap();
let first_transition =
last_transition.checked_sub(n_accounts).unwrap_or_default();
for (t, (addr, acc)) in (first_transition..last_transition).zip(&accounts) {
let Account { nonce, balance, .. } = acc;
let prev_acc = Account {
nonce: nonce - 1,
balance: balance - U256::from(1),
bytecode_hash: None,
};
let acc_before_tx =
AccountBeforeTx { address: *addr, info: Some(prev_acc) };
tx.put::<tables::AccountChangeSet>(t, acc_before_tx)?;
}
Ok(())
})
.unwrap();
Ok(accounts)
}
fn validate_execution(
&self,
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
let start_block = input.stage_progress.unwrap_or_default() + 1;
let end_block = output.stage_progress;
if start_block > end_block {
return Ok(())
}
}
self.check_hashed_accounts()
}
}
impl UnwindStageTestRunner for AccountHashingTestRunner {
fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
self.check_old_hashed_accounts()
}
}
}
}

View File

@@ -0,0 +1,475 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
database::Database,
models::TransitionIdAddress,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{keccak256, Address, StorageEntry, H160, H256, U256};
use std::{collections::BTreeMap, fmt::Debug};
use tracing::*;
const STORAGE_HASHING: StageId = StageId("StorageHashingStage");
/// Storage hashing stage hashes plain storage.
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
#[derive(Debug)]
pub struct StorageHashingStage {
/// The threshold for switching from incremental hashing
/// of changes to whole storage hashing. Num of transitions.
pub clean_threshold: u64,
/// The size of inserted items after which the control
/// flow will be returned to the pipeline for commit
pub commit_threshold: u64,
}
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for StorageHashingStage {
/// Return the id of the stage
fn id(&self) -> StageId {
STORAGE_HASHING
}
/// Execute the stage.
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let previous_stage_progress = input.previous_stage_progress();
// read storage changeset, merge it into one changeset and calculate storage hashes.
let from_transition = tx.get_block_transition(stage_progress)? + 1;
let to_transition = tx.get_block_transition(previous_stage_progress)? + 1;
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to
// AccountHashing table
if to_transition - from_transition > self.clean_threshold {
tx.clear::<tables::HashedStorage>()?;
tx.commit()?;
let mut first_key = H160::zero();
loop {
let next_key = {
let mut storage = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let hashed_batch = storage
.walk(first_key)?
.take(self.commit_threshold as usize)
.map(|res| {
res.map(|(address, mut slot)| {
// both account address and storage slot key are hashed for merkle
// tree.
slot.key = keccak256(slot.key);
(keccak256(address), slot)
})
})
.collect::<Result<BTreeMap<_, _>, _>>()?;
// next key of iterator
let next_key = storage.next()?;
let mut hashes = tx.cursor_write::<tables::HashedStorage>()?;
// iterate and append presorted hashed slots
hashed_batch.into_iter().try_for_each(|(k, v)| hashes.append(k, v))?;
next_key
};
tx.commit()?;
if let Some((next_key, _)) = next_key {
first_key = next_key;
continue
}
break
}
} else {
let mut plain_storage = tx.cursor_dup_read::<tables::PlainStorageState>()?;
// Aggregate all transition changesets and and make list of storages that have been
// changed.
tx.cursor_read::<tables::StorageChangeSet>()?
.walk((from_transition, H160::zero()).into())?
.take_while(|res| {
res.as_ref().map(|(k, _)| k.0 .0 < to_transition).unwrap_or_default()
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
// fold all storages and save its old state so we can remove it from HashedStorage
// it is needed as it is dup table.
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, BTreeMap<H256, U256>>,
(TransitionIdAddress((_, address)), storage_entry)| {
accounts
.entry(address)
.or_default()
.insert(storage_entry.key, storage_entry.value);
accounts
},
)
.into_iter()
// iterate over plain state and get newest storage value.
// Assumption we are okay with is that plain state represent
// `previous_stage_progress` state.
.map(|(address, storage)| {
storage
.into_iter()
.map(|(key, val)| {
plain_storage
.seek_by_key_subkey(address, key)
.map(|ret| (key, (val, ret.map(|e| e.value))))
})
.collect::<Result<BTreeMap<_, _>, _>>()
.map(|storage| (address, storage))
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
// Hash the address and key and apply them to HashedStorage (if Storage is None
// just remove it);
.try_for_each(|(address, storage)| {
let hashed_address = keccak256(address);
storage.into_iter().try_for_each(
|(key, (old_val, new_val))| -> Result<(), StageError> {
let key = keccak256(key);
tx.delete::<tables::HashedStorage>(
hashed_address,
Some(StorageEntry { key, value: old_val }),
)?;
if let Some(value) = new_val {
let val = StorageEntry { key, value };
tx.put::<tables::HashedStorage>(hashed_address, val)?
}
Ok(())
},
)
})?;
}
info!(target: "sync::stages::hashing_storage", "Stage finished");
Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
}
/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let from_transition_rev = tx.get_block_transition(input.unwind_to)? + 1;
let to_transition_rev = tx.get_block_transition(input.stage_progress)? + 1;
let mut hashed_storage = tx.cursor_dup_write::<tables::HashedStorage>()?;
// Aggregate all transition changesets and make list of accounts that have been changed.
tx.cursor_read::<tables::StorageChangeSet>()?
.walk((from_transition_rev, H160::zero()).into())?
.take_while(|res| {
res.as_ref()
.map(|(TransitionIdAddress((k, _)), _)| *k < to_transition_rev)
.unwrap_or_default()
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, H256), U256>,
(TransitionIdAddress((_, address)), storage_entry)| {
accounts.insert((address, storage_entry.key), storage_entry.value);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|((address, key), value)| ((keccak256(address), keccak256(key)), value))
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedStorage (if Value is zero remove it);
.try_for_each(|((address, key), value)| -> Result<(), StageError> {
hashed_storage.seek_by_key_subkey(address, key)?;
hashed_storage.delete_current()?;
if value != U256::ZERO {
hashed_storage.append_dup(address, StorageEntry { key, value })?;
}
Ok(())
})?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_db::{
cursor::DbCursorRW,
mdbx::{tx::Tx, WriteMap, RW},
models::{BlockNumHash, StoredBlockBody, TransitionIdAddress},
};
use reth_interfaces::test_utils::generators::random_block_range;
use reth_primitives::{
SealedBlock, StorageEntry, Transaction, TransactionKind, TxLegacy, H256, U256,
};
stage_test_suite_ext!(StorageHashingTestRunner);
/// Execute with low clean threshold so as to hash whole storage
#[tokio::test]
async fn execute_clean() {
let (previous_stage, stage_progress) = (500, 100);
// Set up the runner
let mut runner = StorageHashingTestRunner::default();
// set low threshold so we hash the whole storage
runner.set_clean_threshold(1);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
// Assert the successful result
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}
struct StorageHashingTestRunner {
tx: TestTransaction,
commit_threshold: u64,
clean_threshold: u64,
}
impl Default for StorageHashingTestRunner {
fn default() -> Self {
Self { tx: TestTransaction::default(), commit_threshold: 1000, clean_threshold: 1000 }
}
}
impl StageTestRunner for StorageHashingTestRunner {
type S = StorageHashingStage;
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
Self::S {
commit_threshold: self.commit_threshold,
clean_threshold: self.clean_threshold,
}
}
}
#[async_trait::async_trait]
impl ExecuteStageTestRunner for StorageHashingTestRunner {
type Seed = Vec<SealedBlock>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let end = input.previous_stage_progress() + 1;
let blocks = random_block_range(stage_progress..end, H256::zero(), 0..3);
self.tx.insert_headers(blocks.iter().map(|block| &block.header))?;
let mut iter = blocks.iter();
let (mut transition_id, mut tx_id) = (0, 0);
while let Some(progress) = iter.next() {
// Insert last progress data
self.tx.commit(|tx| {
let key: BlockNumHash = (progress.number, progress.hash()).into();
let body = StoredBlockBody {
start_tx_id: tx_id,
tx_count: progress.body.len() as u64,
};
progress.body.iter().try_for_each(|transaction| {
tx.put::<tables::TxHashNumber>(transaction.hash(), tx_id)?;
tx.put::<tables::Transactions>(tx_id, transaction.clone())?;
tx.put::<tables::TxTransitionIndex>(tx_id, transition_id)?;
tx_id += 1;
transition_id += 1;
let (to, value) = match transaction.transaction {
Transaction::Legacy(TxLegacy {
to: TransactionKind::Call(to),
value,
..
}) => (to, value),
_ => unreachable!(),
};
let new_entry =
StorageEntry { key: keccak256("transfers"), value: U256::from(value) };
self.insert_storage_entry(
tx,
(transition_id, to).into(),
new_entry,
progress.header.number == stage_progress,
)
})?;
// Randomize rewards
let has_reward: bool = rand::random();
if has_reward {
transition_id += 1;
self.insert_storage_entry(
tx,
(transition_id, Address::random()).into(),
StorageEntry {
key: keccak256("mining"),
value: U256::from(rand::random::<u32>()),
},
progress.header.number == stage_progress,
)?;
}
tx.put::<tables::BlockTransitionIndex>(key.number(), transition_id)?;
tx.put::<tables::BlockBodies>(key, body)
})?;
}
Ok(blocks)
}
fn validate_execution(
&self,
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
let start_block = input.stage_progress.unwrap_or_default() + 1;
let end_block = output.stage_progress;
if start_block > end_block {
return Ok(())
}
}
self.check_hashed_storage()
}
}
impl UnwindStageTestRunner for StorageHashingTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.unwind_storage(input)?;
self.check_hashed_storage()
}
}
impl StorageHashingTestRunner {
fn set_clean_threshold(&mut self, threshold: u64) {
self.clean_threshold = threshold;
}
fn check_hashed_storage(&self) -> Result<(), TestRunnerError> {
self.tx
.query(|tx| {
let mut storage_cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut hashed_storage_cursor =
tx.cursor_dup_read::<tables::HashedStorage>()?;
let mut expected = 0;
while let Some((address, entry)) = storage_cursor.next()? {
let key = keccak256(entry.key);
let got =
hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?;
assert_eq!(
got,
Some(StorageEntry { key, ..entry }),
"{expected}: {address:?}"
);
expected += 1;
}
let count = tx
.cursor_dup_read::<tables::HashedStorage>()?
.walk([0; 32].into())?
.count();
assert_eq!(count, expected);
Ok(())
})
.map_err(|e| e.into())
}
fn insert_storage_entry(
&self,
tx: &Tx<'_, RW, WriteMap>,
tid_address: TransitionIdAddress,
entry: StorageEntry,
hash: bool,
) -> Result<(), reth_db::Error> {
let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
let prev_entry = storage_cursor
.seek_by_key_subkey(tid_address.address(), entry.key)?
.and_then(|e| {
storage_cursor.delete_current().expect("failed to delete entry");
Some(e)
})
.unwrap_or(StorageEntry { key: entry.key, value: U256::from(0) });
if hash {
tx.cursor_dup_write::<tables::HashedStorage>()?.append_dup(
keccak256(tid_address.address()),
StorageEntry { key: keccak256(entry.key), value: entry.value },
)?;
}
storage_cursor.append_dup(tid_address.address(), entry)?;
tx.cursor_dup_write::<tables::StorageChangeSet>()?
.append_dup(tid_address, prev_entry)?;
Ok(())
}
fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
tracing::debug!("unwinding storage...");
let target_transition = self
.tx
.inner()
.get_block_transition(input.unwind_to)
.map_err(|e| TestRunnerError::Internal(Box::new(e)))?;
self.tx.commit(|tx| {
let mut storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
let mut changeset_cursor = tx.cursor_dup_read::<tables::StorageChangeSet>()?;
let mut row = changeset_cursor.last()?;
while let Some((tid_address, entry)) = row {
if tid_address.transition_id() <= target_transition {
break
}
storage_cursor.seek_by_key_subkey(tid_address.address(), entry.key)?;
storage_cursor.delete_current()?;
if entry.value != U256::ZERO {
storage_cursor.append_dup(tid_address.address(), entry)?;
}
row = changeset_cursor.prev()?;
}
Ok(())
})?;
Ok(())
}
}
}

View File

@@ -0,0 +1,64 @@
use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_db::database::Database;
use std::fmt::Debug;
use tracing::*;
const MERKLE_EXECUTION: StageId = StageId("MerkleExecuteStage");
const MERKLE_UNWIND: StageId = StageId("MerkleUnwindStage");
/// Merkle stage uses input from [AccountHashingStage] and [StorageHashingStage] stages
/// and calculated intermediate hashed and state root.
/// This stage depends on the Account and Storage stages. It will be executed after them during
/// execution, and before them during unwinding.
#[derive(Debug)]
pub struct MerkleStage {
/// Flag if true would do `execute` but skip unwind but if it false it would skip execution but
/// do unwind.
pub is_execute: bool,
}
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for MerkleStage {
/// Return the id of the stage
fn id(&self) -> StageId {
if self.is_execute {
MERKLE_EXECUTION
} else {
MERKLE_UNWIND
}
}
/// Execute the stage.
async fn execute(
&mut self,
_tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if !self.is_execute {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
}
// Iterate over changeset (similar to Hashing stages) and take new values
info!(target: "sync::stages::merkle::exec", "Stage finished");
Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
}
/// Unwind the stage.
async fn unwind(
&mut self,
_tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
if self.is_execute {
info!(target: "sync::stages::merkle::exec", "Stage is always skipped");
return Ok(UnwindOutput { stage_progress: input.unwind_to })
}
info!(target: "sync::stages::merkle::unwind", "Stage finished");
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}

View File

@@ -2,8 +2,14 @@
pub mod bodies;
/// The execution stage that generates state diff.
pub mod execution;
/// Account hashing stage.
pub mod hashing_account;
/// Storage hashing stage.
pub mod hashing_storage;
/// The headers stage.
pub mod headers;
/// Intermediate hashes and creating merkle root
pub mod merkle;
/// The sender recovery stage.
pub mod sender_recovery;
/// The total difficulty stage

View File

@@ -33,7 +33,7 @@ pub enum TableType {
}
/// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 23] = [
pub const TABLES: [(TableType, &str); 25] = [
(TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()),
@@ -54,6 +54,8 @@ pub const TABLES: [(TableType, &str); 23] = [
(TableType::Table, StorageHistory::const_name()),
(TableType::DupSort, AccountChangeSet::const_name()),
(TableType::DupSort, StorageChangeSet::const_name()),
(TableType::Table, HashedAccount::const_name()),
(TableType::DupSort, HashedStorage::const_name()),
(TableType::Table, TxSenders::const_name()),
(TableType::Table, Config::const_name()),
(TableType::Table, SyncStage::const_name()),
@@ -163,11 +165,6 @@ table!(
( Logs ) TxNumber | Receipt
);
table!(
/// Stores the current state of an [`Account`].
( PlainAccountState ) Address | Account
);
table!(
/// Stores all smart contract bytecodes.
/// There will be multiple accounts that have same bytecode
@@ -190,6 +187,11 @@ table!(
( TxTransitionIndex ) TxNumber | TransitionId
);
table!(
/// Stores the current state of an [`Account`].
( PlainAccountState ) Address | Account
);
dupsort!(
/// Stores the current value of a storage key.
( PlainStorageState ) Address | [H256] StorageEntry
@@ -260,6 +262,22 @@ dupsort!(
( StorageChangeSet ) TransitionIdAddress | [H256] StorageEntry
);
table!(
/// Stores the current state of an [`Account`] indexed with `keccak256(Address)`
/// This table is in preparation for merkelization and calculation of state root.
/// We are saving whole account data as it is needed for partial update when
/// part of storage is changed. Benefit for merkelization is that hashed addresses are sorted.
( HashedAccount ) H256 | Account
);
dupsort!(
/// Stores the current storage values indexed with `keccak256(Address)` and
/// hash of storage key `keccak256(key)`.
/// This table is in preparation for merkelization and calculation of state root.
/// Benefit for merklization is that hashed addresses/keys are sorted.
( HashedStorage ) H256 | [H256] StorageEntry
);
table!(
/// Stores the transaction sender for each transaction.
/// It is needed to speed up execution stage and allows fetching signer without doing