Fix transition id in storage hashing stage (#1192)

This commit is contained in:
rakita
2023-02-06 16:43:38 +01:00
committed by GitHub
parent 6151c2772a
commit 23f1a902b0
4 changed files with 83 additions and 57 deletions

View File

@@ -328,7 +328,11 @@ mod tests {
output,
Ok(ExecOutput { stage_progress, done: true }) if stage_progress > first_run_progress
);
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
assert_matches!(
runner.validate_execution(input, output.ok()),
Ok(_),
"execution validation"
);
}
/// Checks that the stage unwinds correctly, even if a transaction in a block is missing.
@@ -382,7 +386,7 @@ mod tests {
Ok(UnwindOutput { stage_progress }) if stage_progress == 1
);
assert!(runner.validate_unwind(input).is_ok(), "unwind validation");
assert_matches!(runner.validate_unwind(input), Ok(_), "unwind validation");
}
mod test_utils {

View File

@@ -2,7 +2,7 @@ use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_db::{
cursor::DbCursorRO,
cursor::{DbCursorRO, DbCursorRW},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
@@ -91,6 +91,9 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
break
}
} else {
let mut plain_accounts = tx.cursor_read::<tables::PlainAccountState>()?;
let mut hashed_accounts = tx.cursor_write::<tables::HashedAccount>()?;
// Aggregate all transition changesets and and make list of account that have been
// changed.
tx.cursor_read::<tables::AccountChangeSet>()?
@@ -106,16 +109,19 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
// iterate over plain state and get newest value.
// Assumption we are okay to make is that plainstate represent
// `previous_stage_progress` state.
.map(|address| tx.get::<tables::PlainAccountState>(address).map(|a| (address, a)))
.map(|address| {
plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.try_for_each(|(address, account)| {
.try_for_each(|(address, account)| -> Result<(), StageError> {
let hashed_address = keccak256(address);
if let Some(account) = account {
tx.put::<tables::HashedAccount>(hashed_address, account)
} else {
tx.delete::<tables::HashedAccount>(hashed_address, None).map(|_| ())
hashed_accounts.upsert(hashed_address, account)?
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
})?;
}
@@ -135,6 +141,8 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
let from_transition_rev = tx.get_block_transition(input.unwind_to)?;
let to_transition_rev = tx.get_block_transition(input.stage_progress)?;
let mut hashed_accounts = tx.cursor_write::<tables::HashedAccount>()?;
// Aggregate all transition changesets and and make list of account that have been changed.
tx.cursor_read::<tables::AccountChangeSet>()?
.walk_range(from_transition_rev..to_transition_rev)?
@@ -156,12 +164,13 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedState (if Account is None remove it);
.try_for_each(|(hashed_address, account)| {
.try_for_each(|(hashed_address, account)| -> Result<(), StageError> {
if let Some(account) = account {
tx.put::<tables::HashedAccount>(hashed_address, account)
} else {
tx.delete::<tables::HashedAccount>(hashed_address, None).map(|_| ())
hashed_accounts.upsert(hashed_address, account)?;
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
}
Ok(())
})?;
Ok(UnwindOutput { stage_progress: input.unwind_to })

View File

@@ -2,14 +2,17 @@ use crate::{
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
models::TransitionIdAddress,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{keccak256, Address, StorageEntry, H160, H256, U256};
use std::{collections::BTreeMap, fmt::Debug};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
};
use tracing::*;
/// The [`StageId`] of the storage hashing stage.
@@ -49,8 +52,8 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
let previous_stage_progress = input.previous_stage_progress();
// read storage changeset, merge it into one changeset and calculate storage hashes.
let from_transition = tx.get_block_transition(stage_progress)? + 1;
let to_transition = tx.get_block_transition(previous_stage_progress)? + 1;
let from_transition = tx.get_block_transition(stage_progress)?;
let to_transition = tx.get_block_transition(previous_stage_progress)?;
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to
@@ -95,6 +98,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
}
} else {
let mut plain_storage = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut hashed_storage = tx.cursor_dup_write::<tables::HashedStorage>()?;
// Aggregate all transition changesets and and make list of storages that have been
// changed.
@@ -109,12 +113,9 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
// it is needed as it is dup table.
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, BTreeMap<H256, U256>>,
|mut accounts: BTreeMap<Address, BTreeSet<H256>>,
(TransitionIdAddress((_, address)), storage_entry)| {
accounts
.entry(address)
.or_default()
.insert(storage_entry.key, storage_entry.value);
accounts.entry(address).or_default().insert(storage_entry.key);
accounts
},
)
@@ -125,10 +126,10 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
.map(|(address, storage)| {
storage
.into_iter()
.map(|(key, val)| {
.map(|key| {
plain_storage
.seek_by_key_subkey(address, key)
.map(|ret| (keccak256(key), (val, ret.map(|e| e.value))))
.map(|ret| (keccak256(key), ret.map(|e| e.value)))
})
.collect::<Result<BTreeMap<_, _>, _>>()
.map(|storage| (keccak256(address), storage))
@@ -138,19 +139,20 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
// Hash the address and key and apply them to HashedStorage (if Storage is None
// just remove it);
.try_for_each(|(address, storage)| {
storage.into_iter().try_for_each(
|(key, (old_val, new_val))| -> Result<(), StageError> {
tx.delete::<tables::HashedStorage>(
address,
Some(StorageEntry { key, value: old_val }),
)?;
if let Some(value) = new_val {
let val = StorageEntry { key, value };
tx.put::<tables::HashedStorage>(address, val)?
}
Ok(())
},
)
storage.into_iter().try_for_each(|(key, val)| -> Result<(), StageError> {
if hashed_storage
.seek_by_key_subkey(address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if let Some(value) = val {
hashed_storage.upsert(address, StorageEntry { key, value })?;
}
Ok(())
})
})?;
}
@@ -164,8 +166,8 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let from_transition_rev = tx.get_block_transition(input.unwind_to)? + 1;
let to_transition_rev = tx.get_block_transition(input.stage_progress)? + 1;
let from_transition_rev = tx.get_block_transition(input.unwind_to)?;
let to_transition_rev = tx.get_block_transition(input.stage_progress)?;
let mut hashed_storage = tx.cursor_dup_write::<tables::HashedStorage>()?;
@@ -193,13 +195,18 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
.map(|((address, key), value)| ((keccak256(address), keccak256(key)), value))
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedStorage (if Value is zero remove it);
// Apply values to HashedStorage (if Value is zero just remove it);
.try_for_each(|((address, key), value)| -> Result<(), StageError> {
hashed_storage.seek_by_key_subkey(address, key)?;
hashed_storage.delete_current()?;
if hashed_storage
.seek_by_key_subkey(address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if value != U256::ZERO {
hashed_storage.append_dup(address, StorageEntry { key, value })?;
hashed_storage.upsert(address, StorageEntry { key, value })?;
}
Ok(())
})?;
@@ -317,8 +324,6 @@ mod tests {
tx.put::<tables::TxHashNumber>(transaction.hash(), tx_id)?;
tx.put::<tables::Transactions>(tx_id, transaction.clone())?;
tx.put::<tables::TxTransitionIndex>(tx_id, transition_id)?;
tx_id += 1;
transition_id += 1;
let (addr, _) = accounts
.get_mut(rand::random::<usize>() % n_accounts as usize)
@@ -333,13 +338,15 @@ mod tests {
(transition_id, *addr).into(),
new_entry,
progress.header.number == stage_progress,
)
)?;
tx_id += 1;
transition_id += 1;
Ok(())
})?;
// Randomize rewards
let has_reward: bool = rand::random();
if has_reward {
transition_id += 1;
self.insert_storage_entry(
tx,
(transition_id, Address::random()).into(),
@@ -349,6 +356,7 @@ mod tests {
},
progress.header.number == stage_progress,
)?;
transition_id += 1;
}
tx.put::<tables::BlockTransitionIndex>(key.number(), transition_id)?;
@@ -442,7 +450,7 @@ mod tests {
if let Some(e) = tx
.cursor_dup_write::<tables::HashedStorage>()?
.seek_by_key_subkey(hashed_address, hashed_entry.key)?
.and_then(|e| if e.key == hashed_entry.key { Some(e) } else { None })
.filter(|e| e.key == hashed_entry.key)
{
tx.delete::<tables::HashedStorage>(hashed_address, Some(e))
.expect("failed to delete entry");
@@ -470,15 +478,20 @@ mod tests {
let mut rev_changeset_walker = changeset_cursor.walk_back(None)?;
while let Some((tid_address, entry)) = rev_changeset_walker.next().transpose()? {
if tid_address.transition_id() <= target_transition {
if tid_address.transition_id() < target_transition {
break
}
storage_cursor.seek_by_key_subkey(tid_address.address(), entry.key)?;
storage_cursor.delete_current()?;
if storage_cursor
.seek_by_key_subkey(tid_address.address(), entry.key)?
.filter(|e| e.key == entry.key)
.is_some()
{
storage_cursor.delete_current()?;
}
if entry.value != U256::ZERO {
tx.put::<tables::PlainStorageState>(tid_address.address(), entry)?;
storage_cursor.upsert(tid_address.address(), entry)?;
}
}
Ok(())

View File

@@ -18,7 +18,7 @@ macro_rules! stage_test_suite {
assert_matches::assert_matches!(result, Ok(_));
// Validate the stage execution
assert!(runner.validate_execution(input, result.unwrap().ok()).is_ok(), "execution validation");
assert_matches::assert_matches!(runner.validate_execution(input, result.unwrap().ok()),Ok(_), "execution validation");
}
// Run the complete stage execution flow.
@@ -47,7 +47,7 @@ macro_rules! stage_test_suite {
);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
assert_matches::assert_matches!(runner.validate_execution(input, result.ok()),Ok(_), "execution validation");
}
// Check that unwind does not panic on no new entries within the input range.
@@ -70,7 +70,7 @@ macro_rules! stage_test_suite {
);
// Validate the stage unwind
assert!(runner.validate_unwind(input).is_ok(), "unwind validation");
assert_matches::assert_matches!(runner.validate_unwind(input),Ok(_), "unwind validation");
}
// Run complete execute and unwind flow.
@@ -97,7 +97,7 @@ macro_rules! stage_test_suite {
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
);
assert!(runner.validate_execution(execute_input, result.ok()).is_ok(), "execution validation");
assert_matches::assert_matches!(runner.validate_execution(execute_input, result.ok()),Ok(_), "execution validation");
// Run stage unwind
@@ -115,7 +115,7 @@ macro_rules! stage_test_suite {
);
// Validate the stage unwind
assert!(runner.validate_unwind(unwind_input).is_ok(), "unwind validation");
assert_matches::assert_matches!(runner.validate_unwind(unwind_input),Ok(_), "unwind validation");
}
}
};
@@ -156,7 +156,7 @@ macro_rules! stage_test_suite_ext {
);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
assert_matches::assert_matches!(runner.validate_execution(input, result.ok()),Ok(_), "execution validation");
}
}
};